Chalk home page
Docs
API
CLI
  1. Streaming
  2. Aggregations

With streams, it is common to compute aggregations over events that fall within a time window. Chalk gives a simple way to express these computations, known as streaming window aggregate functions.


Windowed Features

Windowed features are features defined over time ranges. You can declare a feature to compute with a window function using the typeclass chalk.streams.Windowed.

Example

from chalk.features import features
from chalk.streams import Windowed, windowed

@features
class User:
    num_failed_logins: Windowed[int] = windowed("10m", "30m", "1d")
    ...

Here, the num_failed_logins feature supports three windows: the number of failed logins in the last 10 minutes, 30 minutes, and 1 day.

A windowed feature is always referenced by the desired window. For example:

User.num_failed_logins("10m")
User.num_failed_logins("30m")
User.num_failed_logins("1d")

Windowed features support much of the same functionality of a normal feature. For example, you can specify a maximum staleness for a windowed feature or add a description.

@features
class User:
    # Description of the num_failed_logins event
    # :owner: trust-and-safety
    num_failed_logins: Windowed[int] = windowed("10m", max_staleness="10m")
    ...

Requesting Windowed Features

Windowed features can be required by resolvers much like a normal feature.

@realtime
def account_under_attack(
    failed_logins_30m: User.num_failed_logins('30m'),
    failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
    return failed_logins_30m > 10 or failed_logins_1d > 100

Windowed Stream Resolvers

Streaming window aggregate resolvers produce features from a list of messages in the window period.

The messages on your stream are collected for the window period, and provided to your resolver. The resolver then outputs the features value.

Note that you don’t need to write a resolver for each of the windows — Chalk will handle invoking the resolver above for each of the supplied window periods. Stream resolver code should be agnostic to which window it is operating on, allowing Chalk to transparently perform performance optimizations.

You can request the data in many forms:

Using lists

You can request the features as a list of Pydantic models, where the Pydantic model for the list specifies JSON encoded messages on the topic.

@features
class User:
    id: int
    num_failed_logins: Windowed[int] = windowed("10m", "30m")

class LoginMessage(BaseModel):
    user_id: int
    failed: bool

@stream(source=..., mode="tumbling")
def failed_logins(events: list[LoginMessage]) -> Features[
    User.id,
    User.num_failed_logins
]:
    return sum(1 for x in events if x.failed)

Using DataFrames

Instead of a list, you can request the features as a DataFrame. If you’re expecting lots of messages in your window, prefer using DataFrame to list, as it uses a more efficient encoding.

from chalk.features import DataFrame

@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> Features[
    User.id,
    User.num_failed_logins
]:
    return User(
        id=events["id"].max(),
        num_failed_logins=events["failed"].sum(),
    )

Using SQL

Resolvers that take their messages in as a DataFrame can execute SQL against the messages in the stream.

from chalk.features import DataFrame

@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> DataFrame[
    User.id,
    User.num_failed_logins
]:
    return f"""
        select
          user_id as id,
          count(*) as num_failed_logins
        from {events}
        where failed = 1
        group by 1
    """

Simply add the DataFrame in to the SQL string using a Python f-string. Chalk uses DuckDB to execute the SQL resolvers.


Window Modes

By default, Chalk computes tumbling window aggregations. However, Chalk also supports continuous windows. Aggregation mode is specified in the stream decorator.

Tumbling Windowing

Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged bins of equal width.

Tumbling windows are most often used alongside max_staleness to allow the features to be sent to the online store and offline store after each window period.

Continuous Windowing

Unlike tumbling window, continuous windows are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand. Continuous windows require an additional ‘keys’ argument.
Messages will be grouped by key before being consumed by your resolver.

from chalk.streams import Windowed, windowed

@stream(...,keys={"user_id": Feature.user_id, "val": Feature.value}, mode='continuous')
def my_resolver(...) -> ...:
    ...

Late arrivals

Chalk lets you configure whether resolvers should accept late-arriving stream messages. By default, Chalk attempts to consider any late arriving in stream resolvers. However, you can tune this behavior with the late_arrival_deadline argument to you stream source:

from chalk.streams import KafkaSource

# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")

Even when a message is within the window, there are cases where late arrivals do not impact the aggregated value.

Continuous windows

For continuous windows, the aggregation value is computed at query-time. If a message arrives after query time, it is not possible for Chalk to consider the late arriving message in the already-executed query.

Otherwise, if the late-arriving message is within the late_arrival_deadline period, it is added to the continuous window.

Tumbling windows

Tumbling windows are executed once the tumbling window has elapsed. If a message arrives after the resolver has been executed, it is not possible for Chalk to consider the late arriving message.

Otherwise, if the late-arriving message is within the late_arrival_deadline period, it is added to the tumbling window.


Complete Example

from chalk.features import features, Features
from chalk.streams import stream, Windowed, windowed, KafkaSource
from pydantic import BaseModel

@features
class User:
    id: int
    num_failed_logins: Windowed[int] = windowed("10m", "30m")

class LoginMessage(BaseModel):
    user_id: int
    failed: bool

source = KafkaSource(name="LOGINS")

@stream(source=source, mode="continuous")
def failed_logins(events: list[LoginMessage]) -> Features[
    User.id,
    User.num_failed_logins
]:
    return sum(1 for x in events if x.failed)