Chalk home page
Docs
SDK
CLI
  1. Streaming
  2. Streaming with Materialized Aggregations

Windowed features represent aggregated data pertaining to discrete time buckets. Using materialized aggregations in concert with streams, you can update aggregations on the fly as new streaming data arrives.

Materialized aggregations are a powerful tool to compute aggregations over time windows, combining representations of historical data with incoming real-time data to produce accurate and fresh results quickly. If you have a stream resolver that outputs a feature that is related to a feature that uses a materialized aggregation (more on this later), the materialized aggregations will be updated as new messages arrive.


Setting up a materialized aggregation

Setting up features for materialized aggregation with streaming is almost identical to the method described in the materialized_aggregations docs.

from datetime import datetime
from chalk import features, has_many, DataFrame, _
from chalk.streams import Windowed, windowed

@features
class Transaction:
    id: str
    user_id: 'User.id'
    amount: float
    ts: datetime

@features
class User:
    id: str
    name: str
    transactions: DataFrame[Transaction]
    sum_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].sum(),
        materialization=True,
        default=0.0,
    )
    mean_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].mean(),
        materialization=True,
        default=0.0,
    )

Now, set up your streaming resolvers referencing these features.

from pydantic import BaseModel
from chalk import features, has_many, DataFrame, Features, _
from chalk.streams import stream, KafkaSource

class TransactionMsg(BaseModel):
    id: str
    user_id: str
    amount: float

@stream(source=KafkaSource(name='...'))
def process_transaction_topic(
    value: TransactionMsg,
) -> Features[Transaction.id, Transaction.user_id, Transaction.amount]:
    return Transaction(
        id=value.id,
        user_id=value.user_id,
        amount=value.amount,
    )

That’s it! Now, when messages are received on the streams, Chalk will infer whether the output features of the stream resolver are relevant to materialized aggregations, and if so, will update and store the data in the existing materialized aggregations.

The resolver process_transaction_topic returns Transaction.id, Transaction.user_id, and Transaction.amount. These features are impact the materialized aggregate features User.sum_transaction_amount and User.mean_transaction_amount. Both require two features on transaction: Transaction.user_id (referenced through the join from _.transactions) and Transaction.amount (referenced by _.amount in the aggregation expression). When a streaming messages comes through, the resolver process_transaction_topic will update the materialized aggregations with the new values for User.sum_transaction_amount and User.mean_transaction_amount.

To opt out of updating materialized aggregations, set updates_materialized_aggregations=False on the stream resolver.