Features
Cache and materialize feature aggregations
Machine learning teams often build and maintain separate pre-aggregation pipelines that relay data—as standalone features—into a feature store to reduce the latency of computing expensive features.
With Chalk, teams don’t need to spin up separate pipelines. Developers can materialize aggregations to improve performance, without compromising on flexibility, with just a single line of code:
from chalk.streams import Windowed, windowed
@features
class User:
id: int
transactions: DataFrame[Transactions]
total_transaction_amount: Windowed[int] = windowed(
"1d",
"7d",
"30d",
materialization={"bucket_duration": "1d"}, # materialize your features in a single line
expression=_.transactions[_.amount].sum(),
)
Windowed features are typically computed using either raw data or pre-aggregated data. Raw data is the ground truth, but aggregating it can be slow if you request long time windows or large volumes of data. Some systems improve performance by serving features from pre-aggregated batch data. Pre-aggregated data mitigates high latency with the trade-off of not having access to the freshest data.
Chalk balances accuracy and performance by combining both approaches. Chalk aggregates historical data while continuously updating your features as new data arrives. Chalk automatically rolls new data into the appropriate buckets and reconstructs the aggregations. Buckets that become stale i.e. no longer relevant to any of your active time windows, are automatically cleaned up.
Chalk aligns buckets on Unix epoch (ignoring leap seconds). When serving windowed queries, Chalk uses all buckets containing any overlap with the requested time window.
Users can materialize a feature aggregation in Chalk by supplying the materialization
parameter of the windowed
function on the feature you want to aggregate.
In the example above, we specify a bucket_duration
of "1d"
.
The materialization parameter takes in a dictionary that adheres to the schema outlined by MaterializationWindowConfig
:
"10m"
bucket durations for your"1d"
window interval and"3d"
bucket durations for your "30d"
window interval. We recommend explicitly mapping bucket durations to each window if your window intervals have a wide discrepancy e.g. ten minute bucket duration with a one year window interval. Any window intervals not explicitly included in the "bucket_durations"
dictionary will use your supplied "bucket_duration"
by default."* * * * *"
or "1h"
. See CronTab for more details.Chalk provides multiple ways to backfill and update your windowed aggregations.
You can control how your aggregations are backfilled by:
backfill_schedule
keyword parameter.continuous_buffer_duration
.chalk aggregate backfill
.@features
class User:
id: int
transactions: DataFrame[Transactions]
total_transaction_amount: Windowed[int] = windowed(
"10d",
"90d",
materialization={
"bucket_duration": "1d",
"backfill_schedule": "0 0 * * *",
"continuous_buffer_duration": "36h",
},
expression=_.transactions[_.amount].sum(),
)
In the example above, we supply a cron expression that’s set to run daily at midnight ("0 0 * * *"
) into the
backfill_schedule
parameter of the materialization
config.
Now backfill_schedule
has been set, the total_transaction_amount
feature will be backfilled every day.
In addition to backfilling your aggregation with the recurring batch job, you can also integrate fresh data into your
windowed aggregation with continuous backfills.
Continuous backfills can be configured by supplying a Duration] to the continuous_buffer_duration
e.g. "36h"
.
Chalk will compute data within your continuous_buffer_duration
directly from your online resolvers.
If continuous_buffer_duration
is not set, then Chalk will only serve data from the backfilled aggregations.
If continuous_buffer_duration
is set, and its value is less than the duration between the end of the last bucket
and the current time, Chalk will run online resolvers to compute the data for thistime window.
If the continuous_buffer_duration
is longer than the duration between the end of the last bucket and the current time,
Chalk will run online resolvers to compute the data from the start of the most recently filled bucket to now.
Use chalk aggregate backfill
to trigger a backfill aggregation for a windowed feature.
This command is useful if you change your feature’s time windows or bucket_duration
values.
To view existing aggregations, use chalk aggregate list
.
Series Namespace Group Agg Bucket Retention Aggregation Dependent Features
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
1 transaction user_id merchant_id amount 1d 30d sum user.txn_sum_by_merchant merchant.txn_sum_by_user
1 transaction user_id merchant_id amount 1d 30d count user.txn_count_by_merchant
2 transaction user_id amount 1d 30d sum user.txn_sum
The series column shows the unique ID of the timeseries data underlying our aggregation system.
Each unique combination of the Namespace
,
Group
(see group_by_windowed
), and Agg
(the aggregated feature) columns represents a separate timeseries.
When possible, Chalk will use the same timeseries data to serve multiple features.
Other useful information here is the Bucket
column, which shows the current bucket size.
The Retention
column shows the maximum time window of any feature that depends on the given timeseries.
The Dependent Features
column lists the features that are served by the given timeseries.