You can export data from Chalk to other data warehouses or services using sinks.
Sinks are declared with the decorator
@sink, and take several keyword arguments,
similar to resolvers.
Chalk has integrations that cover several common patterns for exporting data. However, if there’s a data source destination that Chalk doesn’t support, you can write a sink to put data anywhere you like:
from chalk.features import sink @sink def send_to_service( uid: User.uuid, title: User.account.title, full_name: User.full_name ): my_microservice.update_account_name(uid, title, full_name)
As with resolvers, you can specify the arguments to your function using the type signature. In the body of your sink function, you can call arbitrary functions with the input values.
You can also use custom sinks to handle many events in bulk. Simply request the data you want as a dataframe with the features you’d like to have:
@sink( buffer_size=100, debounce="100ms", max_delay="1s" ) def fn(data: DataFrame[User.uuid, User.account.title, User.full_name]): ...
Chalk has several keyword arguments to control the execution of the sink.
You can specify how many updates to buffer using the
In this example, Chalk will wait at least 100ms for more events to hit the buffer
before calling the sink function. However, using the
you can specify that the function will be called no more than
1s after an
event hit the buffer.
Chalk has a tight integration with SQLAlchemy for simple data export. You can use this integration for any of the supported sql-like integrations.
By default, Chalk creates a new row in your table for every run of the function.
To specify the integration, pass the sql source via the
integration keyword argument.
pg_db = PostgreSQLSource() @sink(integration=pg_db) def user_data_export( uid: User.uuid, account_title: User.bank_account.title, user_name: User.identity.full_name, ) -> UserSQL: return UserSQL( user_id=uid, account_title=account_title, user_name=user_name, )
Sometimes, you’ll want to run upserts on the rows in your exported table
rather than adding new rows for each run of the function.
To do so, add the flag
@sink(integration=pg_db, upsert=True) def aggregate_user_transactions( uid: User.uuid, successful_txns: User.transfers[ Transfer.status = "successful", after(days_ago=60) ], ) -> TransactionStats: return TransactionStats( user_id=uid, successful_count_60=successful_txns.count(), successful_vol_60=successful_txns[Transaction.amount].sum(), )
In this example, we show a common pattern of updating a windowed value.