Chalk home page
Docs
SDK
CLI
  1. Streaming
  2. Streaming with Materialized Aggregations

Windowed features represent aggregated data pertaining to discrete time buckets. Using materialized aggregations in concert with streams, you can update aggregations on the fly as new streaming data arrives.

Materialized aggregations are a powerful tool to compute aggregations over time windows, combining representations of historical data with incoming real-time data to produce accurate and fresh results quickly. If you have a stream resolver that outputs a feature that is related to a feature that uses a materialized aggregation (more on this later), the materialized aggregations will be updated as new messages arrive.

Setting up a materialized aggregation

Setting up features for materialized aggregation with streaming is almost identical to the method described in the materialized_aggregations docs. Just remember to set a max staleness and set etl_offline_to_online=True on the feature class, as with all other streaming features.

from chalk import features, has_many, DataFrame, FeatureTime, _
from chalk.streams import Windowed, windowed

@features(etl_offline_to_online=True)
class Transaction:
    id: str
    user_id: str
    amount: float
    ts = FeatureTime

@features(max_staleness="infinity", etl_offline_to_online=True)
class User:
    id: str
    name: str
    transactions: DataFrame[Transaction] = has_many(lambda: User.id == Transaction.user_id, max_staleness="30d")
    sum_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].sum(),
        materialization=True,
        default=0.0,
    )
    mean_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].mean(),
        materialization=True,
        default=0.0,
    )

Now, set up your streaming resolvers referencing these features.

from pydantic import BaseModel

from chalk import features, has_many, DataFrame, Features, FeatureTime, _
from chalk.streams import Windowed, windowed
from chalk.streams import stream


class UserMsg(BaseModel):
    id: str
    name: str

class TransactionMsg(BaseModel):
    id: str
    user_id: str
    amount: float

@stream(source=user_source)
def user_stream_source(
    value: UserMsg,
) -> Features[User.id, User.name]:
    return User(id=value.id, name=value.name)

@stream(source=transaction_source)
def transaction_stream_source(
    value: TransactionMsg,
) -> Features[Transaction.id, Transaction.user_id, Transaction.amount]:
    return Transaction(id=value.id, user_id=value.user_id, amount=value.amount)

That’s it! Now, when messages are received on the streams, Chalk will infer whether the output features of the stream resolver are relevant to materialized aggregations, and if so, will update and store the data in the existing materialized aggregations.

There are two separate streams here (one for users and one for transactions), but only one of the stream resolvers will update materialized aggregations.

  • The resolver user_stream_source returns User.id and User.name, and neither of these features are related to the materialized aggregation features User.sum_transaction_amount and User.mean_transaction_amount. This means that this resolver will simply update the online store with the new values for User.name and User.id, and continue.
  • The resolver transaction_stream_source returns Transaction.id, Transaction.user_id, and Transaction.amount. These features are related to the materialized aggregation features: both User.sum_transaction_amount and User.mean_transaction_amount reference _.amount in their expressions, which corresponds to Transaction.amount. In addition, the feature Transaction.user_id is necessary to compute the join between User and Transaction. Thus, this resolver will update the materialized aggregations with the new values for User.sum_transaction_amount and User.mean_transaction_amount as these transaction messages are received.

To opt out of updating materialized aggregations, set updates_materialized_aggregations=False on the stream resolver.