Chalk home page
Docs
API
CLI
  1. Features
  2. Materialized Windowed Aggregations

Machine learning teams often build and maintain separate pre-aggregation pipelines that relay data—as standalone features—into a feature store to reduce the latency of computing expensive features.

With Chalk, teams don’t need to spin up separate pipelines. Developers can materialize aggregations to improve performance, without compromising on flexibility, with just a single line of code:

from chalk.streams import Windowed, windowed

@features
class User:
    id: int
    transactions: DataFrame[Transactions]
    total_transaction_amount: Windowed[int] = windowed(
        "1d",
        "7d",
        "30d",
        materialization={"bucket_duration": "1d"}, # materialize your features in a single line
        expression=_.transactions[_.amount].sum(),
    )

Why are materialized aggregations useful?

Windowed features are typically computed using either raw data or pre-aggregated data. Raw data is the ground truth, but aggregating it can be slow if you request long time windows or large volumes of data. Some systems improve performance by serving features from pre-aggregated batch data. Pre-aggregated data mitigates high latency with the trade-off of not having access to the freshest data.

Chalk balances accuracy and performance by combining both approaches. Chalk aggregates historical data while continuously updating your features as new data arrives. Chalk automatically rolls new data into the appropriate buckets and reconstructs the aggregations. Buckets that become stale i.e. no longer relevant to any of your active time windows, are automatically cleaned up.

Chalk aligns buckets on Unix epoch (ignoring leap seconds). When serving windowed queries, Chalk uses all buckets containing any overlap with the requested time window.

Materialized Windowed Aggregations

How do I use materialized aggregations with Chalk?

Users can materialize a feature aggregation in Chalk by supplying the materialization parameter of the windowed function on the feature you want to aggregate. In the example above, we specify a bucket_duration of "1d".

The materialization parameter takes in a dictionary that adheres to the schema outlined by MaterializationWindowConfig:

The Duration of each bucket in the window e.g. "1h"
A dictionary that specifically maps bucket durations to your window intervals e.g. if you wanted to use "10m" bucket durations for your"1d" window interval and"3d" bucket durations for your "30d" window interval. We recommend explicitly mapping bucket durations to each window if your window intervals have a wide discrepancy e.g. ten minute bucket duration with a one year window interval. Any window intervals not explicitly included in the "bucket_durations"dictionary will use your supplied "bucket_duration" by default.
The schedule on which to automatically backfill the aggregation. For example, "* * * * *"or "1h". See CronTab for more details.
The minimum period of time for which to sample data directly via online query, rather than from the backfilled aggregations.

Backfilling data and managing aggregation

Chalk provides multiple ways to backfill and update your windowed aggregations.

You can control how your aggregations are backfilled by:

  1. Supplying a cron expression (CronTab) to the backfill_schedule keyword parameter.
  2. Supplying a Duration to continuous_buffer_duration.
  3. Triggering a backfill job using the Chalk CLI with chalk aggregate backfill.

Cron Schedule Expression

@features
class User:
    id: int
    transactions: DataFrame[Transactions]
    total_transaction_amount: Windowed[int] = windowed(
        "10d",
        "90d",
        materialization={
            "bucket_duration": "1d",
            "backfill_schedule": "0 0 * * *",
            "continuous_buffer_duration": "36h",
        },
        expression=_.transactions[_.amount].sum(),
    )

In the example above, we supply a cron expression that’s set to run daily at midnight ("0 0 * * *") into the backfill_schedule parameter of the materialization config. Now backfill_schedule has been set, the total_transaction_amount feature will be backfilled every day.

Continuous Buffer

In addition to backfilling your aggregation with the recurring batch job, you can also integrate fresh data into your windowed aggregation with continuous backfills. Continuous backfills can be configured by supplying a Duration] to the continuous_buffer_duration e.g. "36h". Chalk will compute data within your continuous_buffer_duration directly from your online resolvers.

Note: Chalk includes all the data contained in overlapping buckets. We suggest shortening the bucket duration, if the time delta between the end of your window interval and the boundary of the last overlapping bucket exceeds the constraints of your use case.

If continuous_buffer_duration is not set, then Chalk will only serve data from the backfilled aggregations.

no continuous buffer duration

If continuous_buffer_duration is set, and its value is less than the duration between the end of the last bucket and the current time, Chalk will run online resolvers to compute the data for thistime window.

12h continuous buffer duration

If the continuous_buffer_duration is longer than the duration between the end of the last bucket and the current time, Chalk will run online resolvers to compute the data from the start of the most recently filled bucket to now.

20h continuous buffer duration

Triggering a backfill through the Chalk CLI

Use chalk aggregate backfill to trigger a backfill aggregation for a windowed feature. This command is useful if you change your feature’s time windows or bucket_duration values.

To view existing aggregations, use chalk aggregate list.

 Series  Namespace    Group                Agg     Bucket  Retention  Aggregation  Dependent Features
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 1       transaction  user_id merchant_id  amount  1d      30d        sum          user.txn_sum_by_merchant merchant.txn_sum_by_user
 1       transaction  user_id merchant_id  amount  1d      30d        count        user.txn_count_by_merchant
 2       transaction  user_id              amount  1d      30d        sum          user.txn_sum

The series column shows the unique ID of the timeseries data underlying our aggregation system. Each unique combination of the Namespace, Group (see group_by_windowed), and Agg (the aggregated feature) columns represents a separate timeseries. When possible, Chalk will use the same timeseries data to serve multiple features.

Other useful information here is the Bucket column, which shows the current bucket size. The Retention column shows the maximum time window of any feature that depends on the given timeseries. The Dependent Features column lists the features that are served by the given timeseries.