Chalk home page
Docs
API
CLI
  1. Streaming
  2. State

Many stream processing tasks are able to look at a single event and produce features. Sometimes, however, a resolver will need to remember information from many messages in a stream to produce feature values. These resolvers are called stateful resolvers.


State Schema

Chalk serializes your state and creates initial default values for new stream key values. To do so, it needs a schema for the state, which can be defined as a Python Dataclass or Pydantic Model.

To specify the state for a resolver function, wrap your schema with chalk.State:

Using Dataclass

from dataclasses import dataclass
from chalk import State
from chalk.streams import stream

@dataclass
class MyState:
    ...

@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
    ...

Using Pydantic Models

from pydantic import BaseModel
from chalk import State
from chalk.streams import stream

class MyState(BaseModel):
    ...

@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
    ...

Initial Values

The first time a stream resolver runs for a given key, Chalk needs to create an initial value for the state.

Default Argument

Chalk will look first for a default value for the state argument in the resolver to construct the initial state:

from dataclasses import dataclass

@dataclass
class LoginState:
    unique_ips: set[str]

@stream(...)
def login_stream(..., state: State[LoginState] = LoginState(unique_ips=set())) -> ...:
    ...

In the case above, the initial value for the state will be LoginState(unique_ips=set()).

Default Constructor

If an initial state is not given in the function signature, Chalk will try to construct the default state with no arguments. To construct the state dataclass or Pydantic model without arguments, all fields of the state class must have a default value:

from chalk import State
from dataclasses import field, dataclass

@dataclass
class LoginState:
    unique_ips: set[str] = field(default_factory=set)

@stream(...)
def login_stream(..., state: State[LoginState]) -> ...:
    ...

As in the first example, the initial value for the state will be LoginState(unique_ips=set()).


Updating State

State is updated by mutating the state argument of the resolver.

class LoginEvent(BaseModel):
    user_id: str
    ip_address: str

@dataclass
class LoginState:
    unique_ips: set[str] = field(default_factory=set)

@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> ...:
    state.unique_ips.add(event.ip_address)
    count = len(state.unique_ips)
    ...

The above stream resolver tracks all the unique IP addresses that have been seen for a specific key. The unique_ips set starts out empty, and on every message, the resolver adds the IP address from the event to the state. After the resolver has run, Chalk will save the updated state.


Rekeying

A unique state object is stored for each key in the stream. By default, the key is specified by the key on the stream message. In some cases, we may want to rekey the messages in a custom manner. Chalk supports rekeying based on the resolver inputs.

Given the input message LoginEvent, if we want to explicitly want to key by user_id, we can add a keys argument to the @stream decorator. This will track the number of unique IP addresses a user has visited over time. The keys argument must be a Python dictionary that maps from BaseModel attribute to Chalk feature.

Every time a message is processed with a new ip_address, we can add it to our State and update the User feature. Read more about keys here.

from typing import Set
from pydantic import BaseModel
from chalk import State
from chalk.features import features, Features
from chalk.streams import stream, KafkaSource

source = KafkaSource(name="...")

class LoginEvent(BaseModel):
    user_id: str
    ip_address: str

@features
class User:
    id: str
    unique_ip_count: int

class LoginState(BaseModel):
    unique_ips: Set[str] = 0

@stream(source=source, keys={"user_id": User.id})
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> Features[
    User.id,
    User.unique_ip_count
]:
    state.unique_ips.add(event.ip_address)
    return User(id=event.user_id, unique_ip_count=len(state.unique_ips))