Feature Engine
Computing aggregate functions on streams.
Windowed features represent aggregated data pertaining to discrete time buckets. Using materialized aggregations in concert with streams, you can update aggregations on the fly as new streaming data arrives.
Materialized aggregations are a powerful tool to compute aggregations over time windows, combining representations of historical data with incoming real-time data to produce accurate and fresh results quickly. If you have a stream resolver that outputs a feature that is related to a feature that uses a materialized aggregation (more on this later), the materialized aggregations will be updated as new messages arrive.
Setting up features for materialized aggregation with streaming is almost identical to the method described in the materialized_aggregations docs.
from datetime import datetime
from chalk import features, has_many, DataFrame, _
from chalk.streams import Windowed, windowed
@features
class Transaction:
id: str
user_id: 'User.id'
amount: float
ts: datetime
@features
class User:
id: str
name: str
transactions: DataFrame[Transaction]
sum_transaction_amount: Windowed[float] = windowed(
"30d",
expression=_.transactions[_.amount].sum(),
materialization=True,
default=0.0,
)
mean_transaction_amount: Windowed[float] = windowed(
"30d",
expression=_.transactions[_.amount].mean(),
materialization=True,
default=0.0,
)Now, set up your streaming resolvers referencing these features using a native streaming resolver:
from pydantic import BaseModel
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.streams import KafkaSource
class TransactionMsg(BaseModel):
id: str
user_id: str
amount: float
process_transaction_topic = make_stream_resolver(
name="process_transaction_topic",
source=KafkaSource(name="transactions"),
message_type=TransactionMsg,
output_features={
Transaction.id: _.id,
Transaction.user_id: _.user_id,
Transaction.amount: _.amount,
},
)That’s it! When a message arrives on the stream, Chalk identifies which materialized aggregates the resolver’s output features can update, and applies the new row to those aggregates’ bucket state.
For the resolver above:
Each Kafka message is therefore folded into the buckets of both aggregates for the
matching User.id.
A few notes on what counts as “relevant”:
aggregate_on column is a different
version of the column the resolver actually writes is skipped — see
Per-version isolation below.updates_materialized_aggregations=False on the stream
resolver to skip mat-agg updates for that resolver entirely.When a versioned scalar backs versioned aggregates, a stream resolver’s writes only
flow into the aggregates sourced from the version that resolver actually writes. An
aggregate whose aggregate_on column is a different version of the freshly written
column is skipped for that batch.
from chalk import DataFrame, FeatureTime, Primary, feature, features
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.streams import Windowed, windowed
@features
class Transaction:
id: Primary[str]
user_id: "User.id"
amount: float = feature(
versions={1: feature(), 2: feature()},
default_version=1,
)
ts: FeatureTime
@features
class User:
id: Primary[str]
transactions: DataFrame[Transaction]
total_spend: "Windowed[float]" = feature(
versions={
1: windowed(
"30d",
expression=_.transactions[_.amount].sum(),
materialization=True,
default=0.0,
),
2: windowed(
"30d",
expression=_.transactions[_.amount @ 2].sum(),
materialization=True,
default=0.0,
),
},
default_version=1,
)
class TransactionMsg(BaseModel):
id: str
user_id: str
amount: float
# Stream resolver writes amount @ 2 (the parity column).
make_stream_resolver(
name="process_transaction_topic",
source=transaction_source,
message_type=TransactionMsg,
output_features={
Transaction.id: _.id,
Transaction.user_id: _.user_id,
Transaction.amount @ 2: _.amount, # noqa: F722
},
)For each message, only User.total_spend["30d"] @ 2 is updated. User.total_spend["30d"]
(v1, sourced from _.amount) is left untouched, even though the new Transaction row
exists at the same primary key — its v1 column wasn’t part of this resolver’s output.
The isolation only kicks in when the source feature (the column referenced inside the aggregation expression) is itself versioned. If the underlying scalar is unversioned, every version of an aggregate that references it updates normally on every write.
Similar to materialized aggregations that are backfilled from a non-streaming data source, when the Chalk streaming server receives a message on a stream, if the streaming resolver is used to compute a materialized aggregation, then the relevant data will be used to update the correct bucket for the materialized aggregation. When a stream resolver is used to compute a materialized aggregation, the minimum amount of data required to compute the aggregation will be stored for each bucket. For example, for a sum aggregation, the sum of all values in the bucket will be stored. Then when new messages arrive, the appropriate bucket based on the timestamp of the message will be updated with the new value. Because this data is stored online, you can access the values through both online and offline queries.