Streaming
Computing aggregate functions on streams.
Windowed features can be used to aggregate data from streaming data sources. On this page, you’ll learn how to write resolvers to convert streaming data into features.
Streaming resolvers are defined as windowed if they output at least one windowed feature.
Streaming window aggregate resolvers produce features from a
list
or DataFrame
of messages that have arrived within the window period.
The messages on your stream are collected for the window period, and provided to your resolver. The resolver then outputs the feature values.
Note that you don’t need to write a resolver for each of the windows — Chalk will handle invoking the resolver above for each of the supplied window periods. Stream resolver code should be agnostic to which window it is operating on, allowing Chalk to transparently perform performance optimizations.
You can request the data in many forms:
You can request the features as a list of Pydantic BaseModel
s, where the Pydantic model for the list
specifies JSON encoded messages on the topic.
@features
class User:
id: int
num_failed_logins: Windowed[int] = windowed("10m", "30m")
class LoginMessage(BaseModel):
user_id: int
failed: bool
@stream(source=..., mode="tumbling")
def failed_logins(events: list[LoginMessage]) -> Features[
User.id,
User.num_failed_logins
]:
return User(
id=events[0].user_id,
num_failed_logins=sum(1 for x in events if x.failed)
)
Instead of a list, you can request the features as a DataFrame.
If you’re expecting lots of messages in your window, prefer using
DataFrame
to list
, as it uses a more efficient encoding.
from chalk.features import DataFrame
@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> Features[
User.id,
User.num_failed_logins
]:
return User(
id=events["id"].max(),
num_failed_logins=events["failed"].sum(),
)
Resolvers that take their messages in as a DataFrame can execute SQL against the messages in the stream.
from chalk.features import DataFrame
@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> DataFrame[
User.id,
User.num_failed_logins
]:
return f"""
select
user_id as id,
count(*) as num_failed_logins
from {events}
where failed = 1
group by 1
"""
Simply add the DataFrame in to the SQL string using a Python f-string. Chalk uses DuckDB to execute the SQL resolvers.
By default, Chalk computes tumbling
window aggregations.
However, Chalk also supports continuous
windows.
Aggregation mode is specified in the stream decorator.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged, discrete bins of equal width.
Like mapping resolvers, tumbling windowed resolvers insert their outputs into online and offline store
when max_staleness
and etl_offline_to_online
are set.
Unlike mapping resolvers, tumbling windowed resolvers
only run once per resolver and time window, rather than with every message.
As opposed to tumbling windows, continuous windows are overlapping and exact. When you request the value of a continuous window feature via online query, Chalk gathers all the messages received in the window and computes the value on-demand.
When using continuous windows, the materialization
parameter is ignored.
Chalk supplies an additional keys argument to the stream resolver decorator to define how aggregations are conducted. All messages with the same key value(s) are aggregated and invoked in a resolver. This feature enables users to redefine the keying of messages beyond the default keys specified by the stream source.
Continuous windows require the keys
argument.
The keys
argument is a dictionary mapping from Pydantic BaseModel
attribute to Chalk feature attribute.
The BaseModel
attributes belong to the input to your resolver, enabling Chalk to aggregate together all
messages in the time period with the same attribute values.
The Chalk features describe values that must be supplied upon query to filter the messages for aggregation.
This keys
mapping argument also allows us to group by multiple values.
Consider the following situation,
where our stream messages are produced for every interaction between a card and a user.
We would like to keep track of how many interactions between a specific card and user occur within the window,
and ultimately query by the user id and card id pair.
Note that the following resolver works for both bucket window durations, and the inputs to the resolver will be restricted to the unique user/card pair.
from chalk import features, has_one, DataFrame
from chalk.streams import Windowed, stream, windowed
from pydantic import BaseModel
@features(etl_offline_to_online=True, max_staleness="infinity")
class UserCardInteraction:
id: str
user_id: str
card_id: str
interaction_type: str
count: Windowed[int] = windowed(days=[1, 7], max_staleness="0s", default=0)
user: User = has_one(lambda: User.id == UserCardInteraction.user_id)
card: Card = has_one(lambda: Card.id == UserCardInteraction.card_id)
class KafkaMessage(BaseModel):
card_id: str
user_id: str
interaction_id: str
@stream(
source=stream_source,
mode="continuous",
keys={
"card_id": UserCardInteraction.card_id,
"user_id": UserCardInteraction.user_id,
},
)
def user_card_aggregation(
messages: list[KafkaMessage],
) -> DataFrame[
UserCardInteraction.id,
UserCardInteraction.user_id,
UserCardInteraction.card_id,
UserCardInteraction.count,
]:
if len(messages) == 0:
return DataFrame([])
user_id = messages[0].user_id
card_id = messages[0].card_id
return DataFrame(
[
UserCardInteraction(
id=f"{user_id}-{card_id}",
user_id=user_id,
card_id=card_id,
count=len(messages)
)
]
)
By default, most streaming systems (i.e. Kafka, Kinesis, etc) associate an event with the time it was received by the system. However, in many cases, the event’s effective timestamp to be used for windowed aggregations is different from the time it was received by the system.
If you need to specify a custom event timestamp, you can match the timestamp
argument
to a property of the BaseModel input to your @stream
resolver.
Crucially, custom timestamps must be properly timezoned.
from datetime import datetime
from pydantic import BaseModel
class Message(BaseModel):
id: str
value: str
event_timestamp: datetime
@stream(source=source, timestamp="event_timestamp")
def stream_resolver(message: Message) -> Features[StreamFeature.id, StreamFeature.value]:
return StreamFeature(id=message.id, value=message.value)
This will mark the returned features as being observed at event_timestamp
when deciding whether a message lies within a specific time window.
materialization is not supported for streaming resolvers.
Building off the previous example in streams, we’d like to additionally capture the count of messages with the same id that fall within continuous windows. We use the same parse function from the previous example, which enables us to specify a timezoned timestamp in the message body itself rather than Kafka’s automatic timestamp.
Note that our keys
argument contains the Chalk feature primary key, as we are planning to query our features by id
.
from datetime import datetime, timezone
from dateutil import parser
from pydantic import BaseModel
from chalk.features import features, DataFrame
from chalk.streams import stream, KafkaSource, windowed, Windowed
source = KafkaSource(name="...")
class KafkaMessage(BaseModel):
id: str
value: str
naive_timestamp_str: str
class ParsedMessage(BaseModel):
id: str
value: str
event_timestamp: datetime
@features(max_staleness="1d", etl_offline_to_online=True)
class StreamFeature:
id: str
value: str
event_timestamp: datetime
count: Windowed[int] = windowed(days=[1, 7], default=0)
def parse_message(kafka_message: KafkaMessage) -> ParsedMessage:
parsed_timestamp: datetime = parser.parse(kafka_message.naive_timestamp_str)
timezoned_timestamp = parsed_timestamp.replace(tzinfo=timezone.utc)
return ParsedMessage(
id=kafka_message.id,
value=kafka_message.value,
event_timestamp=timezoned_timestamp
)
@stream(source=source, mode="continuous", keys={"id": StreamFeature.id}, timestamp="event_timestamp")
def stream_resolver(messages: DataFrame[ParsedMessage]) -> StreamFeature.count:
return len(messages)
Another thing about keys
:
in the above resolver, we don’t need to include StreamFeature.id
in the return since it is specified in the keys
argument.
As long it is specified there, the Chalk feature and key value will automatically be returned.
Now, we can online query our windowed feature!
from chalk.client import ChalkClient
client = ChalkClient(...)
data = client.query(
input={
StreamFeature.id: "12345",
},
output=[StreamFeature.count["7d"]],
)
This code will output the number of messages with id = "12345"
whose custom timestamp lies between seven days ago and now.