Chalk home page
Docs
API
CLI
  1. Streaming
  2. Sinks

You can export data from Chalk to other data warehouses or services using sinks.

Sinks are declared with the decorator @sink, and take several keyword arguments, similar to resolvers.

Custom sinks

Chalk has integrations that cover several common patterns for exporting data. However, if there’s a data source destination that Chalk doesn’t support, you can write a sink to put data anywhere you like:

from chalk.features import sink

@sink
def send_to_service(
    uid: User.uuid,
    title: User.account.title,
    full_name: User.full_name
):
    my_microservice.update_account_name(uid, title, full_name)

As with resolvers, you can specify the arguments to your function using the type signature. In the body of your sink function, you can call arbitrary functions with the input values.

Batch custom sinks

You can also use custom sinks to handle many events in bulk. Simply request the data you want as a dataframe with the features you’d like to have:

@sink(
    buffer_size=100,
    debounce="100ms",
    max_delay="1s"
)
def fn(data: DataFrame[User.uuid, User.account.title, User.full_name]):
    ...

Chalk has several keyword arguments to control the execution of the sink. You can specify how many updates to buffer using the buffer_size argument. In this example, Chalk will wait at least 100ms for more events to hit the buffer before calling the sink function. However, using the max_delay argument, you can specify that the function will be called no more than 1s after an event hit the buffer.

SQLAlchemy integration

Chalk has a tight integration with SQLAlchemy for simple data export. You can use this integration for any of the supported sql-like integrations.

Inserts

By default, Chalk creates a new row in your table for every run of the function. To specify the integration, pass the sql source via the integration keyword argument.

pg_db = PostgreSQLSource()

@sink(integration=pg_db)
def user_data_export(
    uid: User.uuid,
    account_title: User.bank_account.title,
    user_name: User.identity.full_name,
) -> UserSQL:
    return UserSQL(
        user_id=uid,
        account_title=account_title,
        user_name=user_name,
    )

Upserts

Sometimes, you’ll want to run upserts on the rows in your exported table rather than adding new rows for each run of the function. To do so, add the flag upsert:

@sink(integration=pg_db, upsert=True)
def aggregate_user_transactions(
    uid: User.uuid,
    successful_txns: User.transfers[
        Transfer.status = "successful",
        after(days_ago=60)
    ],
) -> TransactionStats:
    return TransactionStats(
        user_id=uid,
        successful_count_60=successful_txns.count(),
        successful_vol_60=successful_txns[Transaction.amount].sum(),
    )

In this example, we show a common pattern of updating a windowed value.