Streaming
Computing aggregate functions on streams.
Windowed features represent aggregated data pertaining to discrete time buckets. Using materialized aggregations in concert with streams, you can update aggregations on the fly as new streaming data arrives.
Materialized aggregations are a powerful tool to compute aggregations over time windows, combining representations of historical data with incoming real-time data to produce accurate and fresh results quickly. If you have a stream resolver that outputs a feature that is related to a feature that uses a materialized aggregation (more on this later), the materialized aggregations will be updated as new messages arrive.
Setting up features for materialized aggregation with streaming is almost identical to the method described in the materialized_aggregations docs.
from datetime import datetime
from chalk import features, has_many, DataFrame, _
from chalk.streams import Windowed, windowed
@features
class Transaction:
id: str
user_id: 'User.id'
amount: float
ts: datetime
@features
class User:
id: str
name: str
transactions: DataFrame[Transaction]
sum_transaction_amount: Windowed[float] = windowed(
"30d",
expression=_.transactions[_.amount].sum(),
materialization=True,
default=0.0,
)
mean_transaction_amount: Windowed[float] = windowed(
"30d",
expression=_.transactions[_.amount].mean(),
materialization=True,
default=0.0,
)
Now, set up your streaming resolvers referencing these features.
from pydantic import BaseModel
from chalk import features, has_many, DataFrame, Features, _
from chalk.streams import stream, KafkaSource
class TransactionMsg(BaseModel):
id: str
user_id: str
amount: float
@stream(source=KafkaSource(name='...'))
def process_transaction_topic(
value: TransactionMsg,
) -> Features[Transaction.id, Transaction.user_id, Transaction.amount]:
return Transaction(
id=value.id,
user_id=value.user_id,
amount=value.amount,
)
That’s it! Now, when messages are received on the streams, Chalk will infer whether the output features of the stream resolver are relevant to materialized aggregations, and if so, will update and store the data in the existing materialized aggregations.
The resolver process_transaction_topic
returns Transaction.id
, Transaction.user_id
, and Transaction.amount
.
These features are impact the materialized aggregate features User.sum_transaction_amount
and
User.mean_transaction_amount
. Both require two features on transaction:
Transaction.user_id
(referenced through the join from _.transactions
) and
Transaction.amount
(referenced by _.amount
in the aggregation expression).
When a streaming messages comes through, the resolver process_transaction_topic
will update
the materialized aggregations with the new values for User.sum_transaction_amount
and
User.mean_transaction_amount
.
To opt out of updating materialized aggregations, set updates_materialized_aggregations=False
on the stream resolver.