Streaming
Computing aggregate functions on streams.
With streams, it is common to compute aggregations over events that fall within a time window. Chalk gives a simple way to express these computations, known as streaming window aggregate functions.
Windowed features are features defined over time ranges.
You can declare a feature to compute with a window function using
the typeclass chalk.streams.Windowed
.
from chalk.features import features
from chalk.streams import Windowed, windowed
@features
class User:
num_failed_logins: Windowed[int] = windowed("10m", "30m", "1d")
...
Here, the num_failed_logins
feature supports three windows:
the number of failed logins in the last 10 minutes, 30 minutes, and 1 day.
A windowed feature is always referenced by the desired window. For example:
User.num_failed_logins("10m")
User.num_failed_logins("30m")
User.num_failed_logins("1d")
Windowed features support much of the same functionality of a normal feature. For example, you can specify a maximum staleness for a windowed feature or add a description.
@features
class User:
# Description of the num_failed_logins event
# :owner: trust-and-safety
num_failed_logins: Windowed[int] = windowed("10m", max_staleness="10m")
...
Windowed features can be required by resolvers much like a normal feature.
@realtime
def account_under_attack(
failed_logins_30m: User.num_failed_logins('30m'),
failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
return failed_logins_30m > 10 or failed_logins_1d > 100
Streaming window aggregate resolvers produce features from a list of messages in the window period.
The messages on your stream are collected for the window period, and provided to your resolver. The resolver then outputs the features value.
Note that you don’t need to write a resolver for each of the windows — Chalk will handle invoking the resolver above for each of the supplied window periods. Stream resolver code should be agnostic to which window it is operating on, allowing Chalk to transparently perform performance optimizations.
You can request the data in many forms:
You can request the features as a list of Pydantic models, where the Pydantic model for the list specifies JSON encoded messages on the topic.
@features
class User:
id: int
num_failed_logins: Windowed[int] = windowed("10m", "30m")
class LoginMessage(BaseModel):
user_id: int
failed: bool
@stream(source=..., mode="tumbling")
def failed_logins(events: list[LoginMessage]) -> Features[
User.id,
User.num_failed_logins
]:
return sum(1 for x in events if x.failed)
Instead of a list, you can request the features as a DataFrame.
If you’re expecting lots of messages in your window, prefer using
DataFrame
to list
, as it uses a more efficient encoding.
from chalk.features import DataFrame
@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> Features[
User.id,
User.num_failed_logins
]:
return User(
id=events["id"].max(),
num_failed_logins=events["failed"].sum(),
)
Resolvers that take their messages in as a DataFrame can execute SQL against the messages in the stream.
from chalk.features import DataFrame
@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> DataFrame[
User.id,
User.num_failed_logins
]:
return f"""
select
user_id as id,
count(*) as num_failed_logins
from {events}
where failed = 1
group by 1
"""
Simply add the DataFrame in to the SQL string using a Python f-string. Chalk uses DuckDB to execute the SQL resolvers.
By default, Chalk computes tumbling
window aggregations.
However, Chalk also supports continuous
windows.
Aggregation mode is specified in the stream decorator.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged bins of equal width.
Tumbling windows are most often used alongside max_staleness
to allow the features
to be sent to the online store and offline store after each window period.
Unlike tumbling window, continuous windows are overlapping and exact.
When you request the value of a continuous window feature, Chalk looks
at all the messages received in the window and computes the value on-demand.
Continuous windows require an additional ‘keys’ argument.
Messages will be grouped by key before being consumed by your resolver.
from chalk.streams import Windowed, windowed
@stream(...,keys={"user_id": Feature.user_id, "val": Feature.value}, mode='continuous')
def my_resolver(...) -> ...:
...
Chalk lets you configure whether resolvers should accept late-arriving stream messages.
By default, Chalk attempts to consider any late arriving in stream resolvers.
However, you can tune this behavior with the late_arrival_deadline
argument
to you stream source:
from chalk.streams import KafkaSource
# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")
Even when a message is within the window, there are cases where late arrivals do not impact the aggregated value.
For continuous windows, the aggregation value is computed at query-time. If a message arrives after query time, it is not possible for Chalk to consider the late arriving message in the already-executed query.
Otherwise, if the late-arriving message is within the late_arrival_deadline
period, it is added to the continuous window.
Tumbling windows are executed once the tumbling window has elapsed. If a message arrives after the resolver has been executed, it is not possible for Chalk to consider the late arriving message.
Otherwise, if the late-arriving message is within the late_arrival_deadline
period, it is added to the tumbling window.
from chalk.features import features, Features
from chalk.streams import stream, Windowed, windowed, KafkaSource
from pydantic import BaseModel
@features
class User:
id: int
num_failed_logins: Windowed[int] = windowed("10m", "30m")
class LoginMessage(BaseModel):
user_id: int
failed: bool
source = KafkaSource(name="LOGINS")
@stream(source=source, mode="continuous")
def failed_logins(events: list[LoginMessage]) -> Features[
User.id,
User.num_failed_logins
]:
return sum(1 for x in events if x.failed)