Feature Engine
Cache and materialize feature aggregations
Machine learning teams often build and maintain separate pre-aggregation pipelines that relay data as standalone features into a feature store to reduce the latency of computing expensive features.
With Chalk, developers don’t need to spin up separate pipelines to materialize aggregations. This can be done with just a single line of code without compromising on flexibility:
from chalk.features import features, Vector, _
from chalk import DataFrame, Windowed, windowed
from datetime import datetime
@features
class Transaction:
id: int
amount: float
category: int
user_id: "User.id"
timestamp: datetime
embedding: Vector[384]
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_transaction_amount: Windowed[float] = windowed(
"1d", "7d", "30d",
materialization={"bucket_duration": "1d"},
expression=_.transactions[
_.amount,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].sum(),
)
most_active_categories: Windowed[list[int]] = windowed(
"30d", "60d", "all",
materialization={"bucket_duration": "1h"},
# The approximate top 10 categories that the user has
# spent at in each of the windowed buckets.
expression=_.transactions[
_.category,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].approx_top_k(k=10)
)
avg_embedding: Windowed[Vector[384]] = windowed(
"30d", "60d", "all",
materialization={"bucket_duration": "1h"},
# Vector mean will be computed as the element-wise
# mean for each component of the vector
expression=_.transactions[
_.embedding,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].mean(),
default=[0.0] * 384
)Windowed features are typically computed using either raw data or pre-aggregated data. Raw data is the ground truth, but aggregating it can be slow if you request long time windows or large volumes of data. Some systems improve performance by serving features from pre-aggregated batch data, mitigating high latency with the trade-off of not having access to the freshest data.
Chalk balances accuracy and performance by combining both approaches. Chalk aggregates historical data into buckets in the online store and continuously updates those buckets as new data arrives. Chalk automatically rolls new data into the appropriate buckets and reconstructs the aggregations. Buckets that become stale i.e. no longer relevant to any of your active time windows, are automatically cleaned up. For each bucket, Chalk stores the minimal required partial aggregate state, and at query time Chalk computes the requested aggregation by merging the partial aggregate states for every bucket that overlaps the requested time window.
Chalk aligns buckets on Unix epoch (ignoring leap seconds) by default. However, if you’d like to customize
the bucket starts, you can use the bucket_start field on the MaterializationWindowConfig in your definition.
When serving windowed queries, Chalk uses all buckets containing any overlap with the requested time window.

Chalk supports a number of aggregations out-of-the-box.
Chalk builtins are very performant as they’re optimized and run natively in C++ and Rust.
Aggregations automatically skip null or None values.
The following table lists the supported aggregation functions:
| Function | Notes |
|---|---|
sum | Support for vector and scalar feature aggregations. |
min | |
max | |
min_by_n | Returns a list of the n values for the minimum by values. |
max_by_n | Returns a list of the n values for the maximum by values. |
mean | Feature type must be Vector, float or float | None. None values are skipped, meaning they are not included in the mean calculation. |
count | |
std | Standard deviation. Requires at least 2 values. |
var | Variance. Same requirements as std. |
approx_count_distinct | An approximation of the cardinality of non-null data. Uses Apache DataSketches CPC. |
approx_top_k | An approximation of the most common values in a field. Takes in a required k parameter, such as approx_top_k(k=5) for the five approximately-most common values. Up to k values will be returned, sometimes fewer. For a weighted count of the most common values, you can use the parameter by, and set return_total_weight to True or False, which will return the top k values based on the by parameter, optionally with the total weight as well. |
approx_percentile | An approximation of the value a given percentage of your data falls below. Accepts quantile parameter, between 0 and 1, such as approx_percentile(quantile=.5). |
These aggregations can be applied to DataFrame features that represent a has-many join relationship between two feature classes. These joins are defined using a join key, which can either be implicit like below, or explicit through the has-one function.
from chalk.features import features, _
from chalk import DataFrame, Windowed, windowed
from datetime import datetime
@features
class Transaction:
id: int
amount: float
timestamp: datetime
user_id: "User.id"
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_transaction_amount: Windowed[float] = windowed(
"1d", "7d", "30d",
# materialize your features in a single line
materialization={"bucket_duration": "1d"},
expression=_.transactions[
_.amount,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].sum(),
)
Materialized aggregations can also be applied to DataFrame features that are defined using a composite join key.
from chalk.features import features, has_many, _
from chalk import DataFrame, Windowed, windowed
from datetime import datetime
@features
class Transaction:
id: int
bank: str
user_id: str
timestamp: datetime
amount: float
@features
class User:
id: int
bank: str
transactions: DataFrame[Transaction] = has_many(
lambda: (Transaction.user_id == User.id) & (Transaction.bank == User.bank)
)
total_transaction_amount: Windowed[float] = windowed(
"1d", "7d", "30d",
# materialize your features in a single line
materialization={"bucket_duration": "1d"},
expression=_.transactions[
_.amount,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].sum(),
)
top_5_bank_amounts: Windowed[list[str]] = windowed(
"7d", "14d",
materialization={"bucket_duration": "1d"},
expression=_.transactions[
_.bank,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].max_by_n(_.amount, 5),
)
lowest_5_bank_amounts: Windowed[list[str]] = windowed(
"7d", "14d",
materialization={"bucket_duration": "1d"},
expression=_.transactions[
_.bank,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].min_by_n(_.amount, 5),
)
The approx_count_distinct aggregation provides an efficient way to estimate the number of unique values in your data using the
Compressed Probability Counting (CPC) sketch
algorithm.
Computing exact distinct counts for large datasets can be memory-intensive and slow, especially for materialized aggregations where you need to track uniqueness across many time buckets. The CPC sketch algorithm provides:
Users can materialize a feature aggregation in Chalk by supplying the materialization
parameter of the windowed function on the feature you want to aggregate.
In the example above, we specify a bucket_duration of "1d".
The materialization parameter takes in a dictionary that adheres to the schema outlined by MaterializationWindowConfig:
bucket_durationDurationbucket_durationsMapping[Duration, Sequence[Duration] | Duration]"10m" bucket durations for your"1d" window interval and"3d" bucket durations for your "30d" window interval. We recommend explicitly mapping bucket durations to each window if your window intervals have a wide discrepancy e.g. ten minute bucket duration with a one year window interval. Any window intervals not explicitly included in the "bucket_durations"dictionary will use your supplied "bucket_duration" by default.backfill_scheduleCronTab | None"* * * * *"or "1h". See CronTab for more details.continuous_buffer_durationDuration | Nonebackfill_tagslist[list[str]] | None[[], ["source:a"], ["source:b"]]will create three separate scheduled backfill jobs — one with no tags, one tagged source:a, and one tagged source:b. See Running backfills with tags for more details.When you query for a specific window_interval, Chalk aggregates over all buckets that intersect with the window interval.
If you want to use one bucket size for all of your window intervals, you can just use the bucket_duration parameter. If your window intervals have a wide discrepancy (say, 1d and 365d), you can use the bucket_durations parameter to explicitly configure creation of differently-sized buckets tailored for each window interval.
For example:
materialization={
"bucket_duration": "1d",
"bucket_durations": {
"1d": ["7d", "30d"],
"5d": ["60d", "90d", "180d"],
},
}Selecting the right bucket duration is a tradeoff between accuracy and storage. Shorter bucket durations will yield more accurate results, but will require more storage.
To optimize for accuracy given a storage constraint, you can compute the approximate storage costs by computing the number of buckets needed for your materialized aggregation, and then computing the storage per bucket.
Buckets of the same size will be reused across all relevant window interval calculations, but each configured bucket duration creates a separate set of buckets.
The storage used per bucket is the sum of the size of the online storage key representation for the primary key, as well as the size of the bucket state.
Size of the bucket state depends on the type of aggregation:
For more details on determining the tradeoff between accuracy and storage space, please reach out to the Chalk team.
Chalk provides multiple ways to backfill and update your windowed aggregations.
You can control how your aggregations are backfilled by:
backfill_schedule keyword parameter.continuous_buffer_duration.chalk aggregate backfill.from chalk.features import features, _
from chalk import DataFrame, Windowed, windowed
from datetime import datetime
@features
class Transaction:
id: int
amount: float
timestamp: datetime
user_id: "User.id"
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_transaction_amount: Windowed[int] = windowed(
"10d", "90d",
materialization={
"bucket_duration": "1d",
"backfill_schedule": "0 0 * * *",
"continuous_buffer_duration": "36h",
},
expression=_.transactions[
_.amount,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].sum(),
)In the example above, we supply a cron expression that’s set to run daily at midnight ("0 0 * * *") into the
backfill_schedule parameter of the materialization config.
Now backfill_schedule has been set, the total_transaction_amount feature will be backfilled every day.
In addition to backfilling your aggregation with the recurring batch job, you can also keep recent windows fresh with live lookups through a continuous buffer duration.
Configure this by supplying a Duration to continuous_buffer_duration, for example "36h".
Chalk will compute data within your continuous_buffer_duration directly from your online resolvers.
Note: Chalk includes all the data contained in the bucket partially overlapping with the continuous buffer duration. We suggest shortening the bucket duration, if the time delta between the end of your window interval and the boundary of the last overlapping bucket exceeds the constraints of your use case.
If continuous_buffer_duration is not set, then Chalk will only serve data from the backfilled aggregations.

If continuous_buffer_duration is set, and its value is less than the duration between the end of the last bucket
and the current time, Chalk will run online resolvers to compute the data for thistime window.

If the continuous_buffer_duration is longer than the duration between the end of the last bucket and the current time,
Chalk will run online resolvers to compute the data from the start of the most recently filled bucket to now.

Use chalk aggregate backfill to trigger a backfill aggregation for a windowed feature.
This command is useful if you change your feature’s time windows or bucket_duration values.
To view existing aggregations, use chalk aggregate list.
To write pre-aggregated tiles to offline storage (S3) instead of the online store, pass --store-offline.
This is required before you can read tiles in offline queries using use_tile_store.
--store-offline requires both --lower-bound and --upper-bound as ISO 8601 instant strings.
chalk aggregate backfill \
--feature user.total_transaction_amount \
--store-offline \
--lower-bound 2025-01-01T00:00:00Z \
--upper-bound 2026-04-14T00:00:00Z Series Namespace Group Agg Bucket Retention Aggregation Dependent Features
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
1 transaction user_id merchant_id amount 1d 30d sum user.txn_sum_by_merchant merchant.txn_sum_by_user
1 transaction user_id merchant_id amount 1d 30d count user.txn_count_by_merchant
2 transaction user_id amount 1d 30d sum user.txn_sum
The series column shows the unique ID of the timeseries data underlying our aggregation system.
Each unique combination of the Namespace,
Group (see group_by_windowed), and Agg (the aggregated feature) columns represents a separate timeseries.
When possible, Chalk will use the same timeseries data to serve multiple features.
Other useful information here is the Bucket column, which shows the current bucket size.
The Retention column shows the maximum time window of any feature that depends on the given timeseries.
The Dependent Features column lists the features that are served by the given timeseries.
You can use backfill_tags to run multiple scheduled backfills for the same feature with different resolver tags.
This is useful when you have multiple resolvers from different sources that get data for different primary keys
and want each tagged resolver to independently populate buckets.
from chalk.features import features, _
from chalk import DataFrame, Windowed, windowed
from chalk.streams import MaterializationWindowConfig
from datetime import datetime
@features
class Transaction:
id: int
amount: float
timestamp: datetime
user_id: "User.id"
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_transaction_amount: Windowed[int] = windowed(
"10d", "90d",
materialization=MaterializationWindowConfig(
bucket_duration="1d",
backfill_schedule="0 0 * * *",
backfill_tags=[[], ["source:a"], ["source:b"]],
),
expression=_.transactions[
_.amount,
_.timestamp <= _.chalk_now,
_.timestamp > _.chalk_window
].sum(),
)In this example, Chalk will create three separate scheduled backfill jobs — one with no tags, one with source:a,
and one with source:b. Each run passes its tag set to the resolver as query tags, allowing you to select
different resolvers per run. Resolver tags on their own do not change which backfill or query runs; you still need
to pass the matching tags when you trigger a query or backfill manually. See Query Basics,
the Chalk Python SDK query reference, and chalk aggregate backfill.
If multiple backfills resolve data for overlapping primary keys and time buckets, later writes will overwrite earlier ones. Disjoint set of primary keys are expected, if your resolvers may return data for the same primary key under different tags the results are not additive.
For information on updating materialized aggregations with streaming data, see the example in the streaming documentation here.
By default, offline queries that request materialized windowed aggregations run resolvers (preferring offline resolvers over online resolvers) to recompute the aggregation expression from raw events. Two planner options let you improve accuracy and performance:
align_offline_chalk_window_with_materialization — rounds each input time’s lower bound down to the nearest
bucket boundary, perfectly simulating the rounding behavior of the online store.use_materialized_offline_query — bins input times to bucket boundaries and merges all complete buckets
that intersect the window interval, rather than re-scanning raw events.When tiles have been backfilled to offline storage with --store-offline (see Triggering a backfill through the Chalk CLI), you can instruct the planner to read directly from those pre-aggregated S3 tiles by also setting use_tile_store: True:
from chalk.client import ChalkClient, ResourceRequests
from datetime import datetime
client = ChalkClient()
result = client.offline_query(
input={
"user.id": [1, 2, 3]
},
output=["user.total_transaction_amount"],
run_asynchronously=True,
recompute_features=False,
planner_options={"use_tile_store": True, "use_materialized_offline_query": True},
input_times=[datetime(2026, 4, 14, 0, 0)],
resources=ResourceRequests(cpu=4, memory="16Gi"),
)This reads from pre-aggregated data in cloud storage, instead of recomputing aggregates from raw events, which can be dramatically faster for large historical aggregations. See Training Sets with Materialized Aggregations for a detailed discussion of trade-offs and use cases.
Tiles must have been backfilled to cover the requested input_times before usinguse_tile_store. If tiles are missing for part of the range, the offline query will error.
To keep offline tiles fresh on a recurring basis (for example, computing a new tile every day), use
ScheduledAggregateBackfill. This is the recommended approach for automating offline tile backfills, as
the backfill_schedule field on MaterializationWindowConfig targets the online store only.
from chalk import ScheduledAggregateBackfill, AggregateBackfillTarget
from datetime import datetime
ScheduledAggregateBackfill(
name="daily_total_transaction_amount_offline_backfill",
features=["user.total_transaction_amount"],
schedule="0 1 * * *", # daily at 1 AM UTC
target=AggregateBackfillTarget.OFFLINE,
lower_bound=datetime(2025, 1, 1),
)namestrScheduledAggregateBackfill instances in your project.featuresCollection[FeatureReference]scheduleCronTab | DurationtargetAggregateBackfillTargetAggregateBackfillTarget.OFFLINE writes pre-aggregated tiles to offline storage.AggregateBackfillTarget.ONLINE writes aggregate state to the online store.lower_bounddatetime | Noneupper_bounddatetime | Nonequery_tagsCollection[str] | Noneresource_groupstr | NoneScheduledAggregateBackfill with AggregateBackfillTarget.OFFLINE is critical for operationalizing offline tiling — it ensures a fresh tile is computed each day so that training set queries can always read from pre-aggregated data without rescanning raw events.