Chalk home page
Docs
API
CLI
  1. Streaming
  2. Aggregations

With streams, it is common to compute aggregations over events that fall within a time window. Chalk enables users to express these computations, known as streaming window aggregate functions.


Windowed Features

Windowed features are features defined over time ranges. You can declare a feature to compute with a window function using the typeclass chalk.streams.Windowed.

Example

from chalk.features import features
from chalk.streams import Windowed, windowed

@features
class User:
    num_failed_logins: Windowed[int] = windowed("10m", "30m", "1d", default=0)
    ...

Here, the num_failed_logins feature supports three windows: the number of failed logins in the last 10 minutes, 30 minutes, and 1 day.

A windowed feature can be referenced in a query or a resolver in the following, equivalent ways. Each column below shows the possible syntax variants for a given time window.

# Note: The last value for each list is the time converted to seconds
User.num_failed_logins("10m")    User.num_failed_logins("1d")       User.num_failed_logins("1h30m")
User.num_failed_logins["10m"]    User.num_failed_logins["1d"]       User.num_failed_logins["1h30m"]
User.num_failed_logins_10m       User.num_failed_logins_1d          User.num_failed_logins_1h30m
User.num_failed_logins__10m__    User.num_failed_logins__1d__       User.num_failed_logins__1h30m__
User.num_failed_logins__600__    User.num_failed_logins__86400__    User.num_failed_logins__5400__

Windowed features support much of the same functionality as a normal feature. Like features for mapping resolvers, windowed features are most often used alongside max_staleness and etl_offline_to_online to allow the features to be sent to online store and offline store after each window period. Windowed features are often set with default values for when there are no messages within a time period.

@features
class User:
    # Description of the num_failed_logins event
    # :owner: trust-and-safety
    num_failed_logins: Windowed[int] = windowed("10m", max_staleness="10m")
    ...

Requesting Windowed Features

Windowed features can be inputs to resolvers much like normal features.

@online
def account_under_attack(
    failed_logins_30m: User.num_failed_logins('30m'),
    failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
    return failed_logins_30m > 10 or failed_logins_1d > 100

Windowed Stream Resolvers

Streaming resolvers are defined as windowed if they output at least one windowed feature. Streaming window aggregate resolvers produce features from a list or DataFrame of messages that have arrived within the window period.

The messages on your stream are collected for the window period, and provided to your resolver. The resolver then outputs the feature values.

Note that you don’t need to write a resolver for each of the windows — Chalk will handle invoking the resolver above for each of the supplied window periods. Stream resolver code should be agnostic to which window it is operating on, allowing Chalk to transparently perform performance optimizations.

You can request the data in many forms:

Using lists

You can request the features as a list of Pydantic BaseModels, where the Pydantic model for the list specifies JSON encoded messages on the topic.

@features
class User:
    id: int
    num_failed_logins: Windowed[int] = windowed("10m", "30m")

class LoginMessage(BaseModel):
    user_id: int
    failed: bool

@stream(source=..., mode="tumbling")
def failed_logins(events: list[LoginMessage]) -> Features[
    User.id,
    User.num_failed_logins
]:
    return User(
        id=events[0].user_id,
        num_failed_logins=sum(1 for x in events if x.failed)
    )

Using DataFrames

Instead of a list, you can request the features as a DataFrame. If you’re expecting lots of messages in your window, prefer using DataFrame to list, as it uses a more efficient encoding.

from chalk.features import DataFrame

@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> Features[
    User.id,
    User.num_failed_logins
]:
    return User(
        id=events["id"].max(),
        num_failed_logins=events["failed"].sum(),
    )

Using SQL

Resolvers that take their messages in as a DataFrame can execute SQL against the messages in the stream.

from chalk.features import DataFrame

@stream(...)
def failed_logins(events: DataFrame[LoginMessage]) -> DataFrame[
    User.id,
    User.num_failed_logins
]:
    return f"""
        select
          user_id as id,
          count(*) as num_failed_logins
        from {events}
        where failed = 1
        group by 1
    """

Simply add the DataFrame in to the SQL string using a Python f-string. Chalk uses DuckDB to execute the SQL resolvers.


Window Modes

By default, Chalk computes tumbling window aggregations. However, Chalk also supports continuous windows. Aggregation mode is specified in the stream decorator.

Tumbling Windowing

Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged, discrete bins of equal width.

Like mapping resolvers, tumbling windowed resolvers insert their outputs into online and offline store when max_staleness and etl_offline_to_online are set. Unlike mapping resolvers, tumbling windowed resolvers only run once per resolver and time window, rather than with every message.

Continuous Windowing

As opposed to tumbling windows, continuous windows are overlapping and exact. When you request the value of a continuous window feature via online query, Chalk gathers all the messages received in the window and computes the value on-demand.

Keys

Chalk supplies an additional keys argument to the stream resolver decorator to define how aggregations are conducted. All messages with the same key value(s) are aggregated and invoked in a resolver. This feature enables users to redefine the keying of messages beyond the default keys specified by the stream source.

Continuous windows require the keys argument. The keys argument is a dictionary mapping from Pydantic BaseModel attribute to Chalk feature attribute. The BaseModel attributes belong to the input to your resolver, enabling Chalk to aggregate together all messages in the time period with the same attribute values. The Chalk features describe values that must be supplied upon query to filter the messages for aggregation.

This keys mapping argument also allows us to group by multiple values. Consider the following situation, where our stream messages are produced for every interaction between a card and a user. We would like to keep track of how many interactions between a specific card and user occur within the window, and ultimately query by the user id and card id pair.

Note that the following resolver works for both bucket window durations, and the inputs to the resolver will be restricted to the unique user/card pair.

from chalk import features, has_one, DataFrame
from chalk.streams import Windowed, stream, windowed

from pydantic import BaseModel

@features(etl_offline_to_online=True, max_staleness="infinity")
class UserCardInteraction:
    id: str
    user_id: str
    card_id: str
    interaction_type: str
    count: Windowed[int] = windowed(days=[1, 7], max_staleness="0s", default=0)
    user: User = has_one(lambda: User.id == UserCardInteraction.user_id)
    card: Card = has_one(lambda: Card.id == UserCardInteraction.card_id)

class KafkaMessage(BaseModel):
    card_id: str
    user_id: str
    interaction_id: str

@stream(
    source=stream_source,
    mode="continuous",
    keys={
        "card_id": UserCardInteraction.card_id,
        "user_id": UserCardInteraction.user_id,
    },
)
def user_card_aggregation(
    messages: list[KafkaMessage],
) -> DataFrame[
    UserCardInteraction.id,
    UserCardInteraction.user_id,
    UserCardInteraction.card_id,
    UserCardInteraction.count,
]:
    if len(messages) == 0:
        return DataFrame([])
    user_id = messages[0].user_id
    card_id = messages[0].card_id
    return DataFrame(
        [
            UserCardInteraction(
                id=f"{user_id}-{card_id}",
                user_id=user_id,
                card_id=card_id,
                count=len(messages)
            )
        ]
    )

Custom Event Timestamping

By default, most streaming systems (i.e. Kafka, Kinesis, etc) associate an event with the time it was received by the system. However, in many cases, the event’s effective timestamp to be used for windowed aggregations is different from the time it was received by the system.

If you need to specify a custom event timestamp, you can match the timestamp argument to a property of the BaseModel input to your @stream resolver. Crucially, custom timestamps must be properly timezoned.

from datetime import datetime
from pydantic import BaseModel

class Message(BaseModel):
    id: str
    value: str
    event_timestamp: datetime


@stream(source=source, timestamp="event_timestamp")
def stream_resolver(message: Message) -> Features[StreamFeature.id, StreamFeature.value]:
    return StreamFeature(id=message.id, value=message.value)

This will mark the returned features as being observed at event_timestamp when deciding whether a message lies within a specific time window.

Full Example

Building off the previous example in streams, we’d like to additionally capture the count of messages with the same id that fall within continuous windows. We use the same parse function from the previous example, which enables us to specify a timezoned timestamp in the message body itself rather than Kafka’s automatic timestamp.

Note that our keys argument contains the Chalk feature primary key, as we are planning to query our features by id.

from datetime import datetime, timezone
from dateutil import parser
from pydantic import BaseModel
from chalk import features, DataFrame
from chalk.streams import stream, KafkaSource, windowed, Windowed

source = KafkaSource(name="...")

class KafkaMessage(BaseModel):
    id: str
    value: str
    naive_timestamp_str: str

class ParsedMessage(BaseModel):
    id: str
    value: str
    event_timestamp: datetime


@features(max_staleness="1d", etl_offline_to_online=True)
class StreamFeature:
    id: str
    value: str
    event_timestamp: datetime
    count: Windowed[int] = windowed(days=[1, 7], default=0)

def parse_message(kafka_message: KafkaMessage) -> ParsedMessage:
    parsed_timestamp: datetime = parser.parse(kafka_message.naive_timestamp_str)
    timezoned_timestamp = parsed_timestamp.replace(tzinfo=timezone.utc)
    return ParsedMessage(
        id=kafka_message.id,
        value=kafka_message.value,
        event_timestamp=timezoned_timestamp
    )

@stream(source=source, mode="continuous", keys={"id": StreamFeature.id}, timestamp="event_timestamp")
def stream_resolver(messages: DataFrame[ParsedMessage]) -> StreamFeature.count:
    return len(messages)

Another thing about keys: in the above resolver, we don’t need to include StreamFeature.id in the return since it is specified in the keys argument. As long it is specified there, the Chalk feature and key value will automatically be returned.

Now, we can online query our windowed feature!

from chalk.client import ChalkClient


client = ChalkClient(...)
data = client.query(
        input={
            StreamFeature.id: "12345",
        },
        output=[StreamFeature.count["7d"]],
    )

This will output the number of messages with id = "12345" whose custom timestamp lies between seven days ago and now.