# Streaming with Materialized Aggregations
source: https://docs.chalk.ai/docs/windowed-streaming

## Computing aggregate functions on streams.

Windowed features represent aggregated data pertaining to discrete time buckets.
Using materialized aggregations in concert with streams,
you can update aggregations on the fly as new streaming data arrives.

Materialized aggregations are a powerful tool
to compute aggregations over time windows, combining representations of historical data
with incoming real-time data to produce accurate and fresh results quickly.
If you have a stream resolver that outputs a feature that is related to a feature that uses
a materialized aggregation (more on this later), the materialized aggregations will be updated
as new messages arrive.

### Setting up a materialized aggregation

Setting up features for materialized aggregation with streaming is almost identical to the method described in the
materialized_aggregations docs.

```
from datetime import datetime
from chalk import features, has_many, DataFrame, _
from chalk.streams import Windowed, windowed

@features
class Transaction:
    id: str
    user_id: 'User.id'
    amount: float
    ts: datetime

@features
class User:
    id: str
    name: str
    transactions: DataFrame[Transaction]
    sum_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].sum(),
        materialization=True,
        default=0.0,
    )
    mean_transaction_amount: Windowed[float] = windowed(
        "30d",
        expression=_.transactions[_.amount].mean(),
        materialization=True,
        default=0.0,
    )
```

Now, set up your streaming resolvers referencing these features using a stream resolver:

```
from pydantic import BaseModel
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.streams import KafkaSource

class TransactionMsg(BaseModel):
    id: str
    user_id: str
    amount: float

process_transaction_topic = make_stream_resolver(
    name="process_transaction_topic",
    source=KafkaSource(name="transactions"),
    message_type=TransactionMsg,
    output_features={
        Transaction.id: _.id,
        Transaction.user_id: _.user_id,
        Transaction.amount: _.amount,
    },
)
```

That's it! When a message arrives on the stream, Chalk identifies which materialized
aggregates the resolver's output features can update, and applies the new row to those
aggregates' bucket state.

For the resolver above:

Drives the _.transactions traversal from User, so the new row lands in the right user's bucket.

Feeds both User.sum_transaction_amount and User.mean_transaction_amount.

Each Kafka message is therefore folded into the buckets of both aggregates for the
matching User.id.

A few notes on what counts as "relevant":

- Version match required. An aggregate whose aggregate_on column is a different
version of the column the resolver actually writes is skipped — see
Per-version isolation below.
- Opt-out per resolver. Set updates_materialized_aggregations=False on the stream
resolver to skip mat-agg updates for that resolver entirely.

### Per-version isolation

When a versioned scalar backs versioned aggregates, a stream resolver's writes only
flow into the aggregates sourced from the version that resolver actually writes. An
aggregate whose aggregate_on column is a different version of the freshly written
column is skipped for that batch.

```
from chalk import DataFrame, FeatureTime, Primary, feature, features
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.streams import Windowed, windowed

@features
class Transaction:
    id: Primary[str]
    user_id: "User.id"
    amount: float = feature(
        versions={1: feature(), 2: feature()},
        default_version=1,
    )
    ts: FeatureTime

@features
class User:
    id: Primary[str]
    transactions: DataFrame[Transaction]
    total_spend: "Windowed[float]" = feature(
        versions={
            1: windowed(
                "30d",
                expression=_.transactions[_.amount].sum(),
                materialization=True,
                default=0.0,
            ),
            2: windowed(
                "30d",
                expression=_.transactions[_.amount @ 2].sum(),
                materialization=True,
                default=0.0,
            ),
        },
        default_version=1,
    )

class TransactionMsg(BaseModel):
    id: str
    user_id: str
    amount: float

# Stream resolver writes amount @ 2 (the parity column).
make_stream_resolver(
    name="process_transaction_topic",
    source=transaction_source,
    message_type=TransactionMsg,
    output_features={
        Transaction.id: _.id,
        Transaction.user_id: _.user_id,
        Transaction.amount @ 2: _.amount,  # noqa: F722
    },
)
```

For each message, only User.total_spend["30d"] @ 2 is updated. User.total_spend["30d"]
(v1, sourced from _.amount) is left untouched, even though the new Transaction row
exists at the same primary key — its v1 column wasn't part of this resolver's output.

The isolation only kicks in when the source feature (the column referenced inside the
aggregation expression) is itself versioned. If the underlying scalar is unversioned,
every version of an aggregate that references it updates normally on every write.

For a full guide to defining, querying, and backfilling versioned windowed aggregations, see Versioned Windowed Aggregations.

### Online Storage

Similar to materialized aggregations that are backfilled from a non-streaming
data source, when the Chalk streaming server receives a message on a stream, if the streaming resolver is used
to compute a materialized aggregation, then the relevant data will be used to update the correct bucket for the
materialized aggregation. When a stream resolver is used to compute a materialized aggregation, the minimum amount of
data required to compute the aggregation will be stored for each bucket. For example, for a sum aggregation, the
sum of all values in the bucket will be stored. Then when new messages arrive, the appropriate bucket based on the
timestamp of the message will be updated with the new value. Because this data is stored online, you can access the
values through both online and offline queries.





