Streaming
Exporting data from Chalk to your data warehouse.
You can export data from Chalk to other data warehouses or services using sinks.
Sinks are declared with the decorator @sink
, and take several keyword arguments,
similar to resolvers.
Chalk has integrations that cover several common patterns for exporting data. However, if there’s a data source destination that Chalk doesn’t support, you can write a sink to put data anywhere you like:
from chalk.features import sink
@sink
def send_to_service(
uid: User.uuid,
title: User.account.title,
full_name: User.full_name
):
my_microservice.update_account_name(uid, title, full_name)
As with resolvers, you can specify the arguments to your function using the type signature. In the body of your sink function, you can call arbitrary functions with the input values.
You can also use custom sinks to handle many events in bulk. Simply request the data you want as a DataFrame with the features you’d like to have:
@sink(
buffer_size=100,
debounce="100ms",
max_delay="1s"
)
def fn(data: DataFrame[User.uuid, User.account.title, User.full_name]):
...
Chalk has several keyword arguments to control the execution of the sink.
You can specify how many updates to buffer using the buffer_size
argument.
In this example, Chalk will wait at least 100ms for more events to hit the buffer
before calling the sink function. However, using the max_delay
argument,
you can specify that the function will be called no more than 1s
after an
event hit the buffer.
Chalk has a tight integration with SQLAlchemy for simple data export. You can use this integration for any of the supported SQL-like integrations.
By default, Chalk creates a new row in your table for every run of the function.
To specify the integration, pass the SQL source via the integration
keyword argument.
pg_db = PostgreSQLSource()
@sink(integration=pg_db)
def user_data_export(
uid: User.uuid,
account_title: User.bank_account.title,
user_name: User.identity.full_name,
) -> UserSQL:
return UserSQL(
user_id=uid,
account_title=account_title,
user_name=user_name,
)
Sometimes, you’ll want to run upserts on the rows in your exported table
rather than adding new rows for each run of the function.
To do so, add the flag upsert
:
@sink(integration=pg_db, upsert=True)
def aggregate_user_transactions(
uid: User.uuid,
successful_txns: User.transfers[
Transfer.status = "successful",
after(days_ago=60)
],
) -> TransactionStats:
return TransactionStats(
user_id=uid,
successful_count_60=successful_txns.count(),
successful_vol_60=successful_txns[Transaction.amount].sum(),
)
In this example, we show a common pattern of updating a windowed value.