Chalk home page
Docs
API
CLI
  1. Streaming
  2. State

Many stream processing tasks are able to look at a single event and produce features. Sometimes, however, a resolver will need to remember information from many messages in a stream to produce feature values. These resolvers are called stateful resolvers.


State Schema

Chalk serializes your state and creates initial default values for new partition ids. To do so, it needs a schema for the state, which can be defined as a Python Dataclass or Pydantic Model.

To specify the state for a resolver function, wrap your schema with chalk.State:

Using Dataclass

from dataclasses import dataclass
from chalk import State
from chalk.streams import stream

@dataclass
class MyState:
    ...

@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
    ...

Using Pydantic Models

from pydantic import BaseModel
from chalk import State
from chalk.streams import stream

class MyState(BaseModel):
    ...

@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
    ...

Initial Values

The first time a stream resolver runs for a given partition key, Chalk needs to create an initial value for the state.

Default Argument

Chalk will look first for a default value for the state argument in the resolver to construct the initial state:

from dataclasses import dataclass

@dataclass
class LoginState:
    unique_ips: set[str]

@stream(...)
def login_stream(..., state: State[LoginState] = LoginState(unique_ips=set())) -> ...:
    ...

In the case above, the initial value for the state will be LoginState(unique_ips=set()).

Default Constructor

If an initial state is not given in the function signature, Chalk will try to construct the default state with no arguments. To construct the state dataclass or Pydantic model without arguments, all fields of the state class have a default value:

from chalk import State
from dataclasses import field, dataclass

@dataclass
class LoginState:
    unique_ips: set[str] = field(default_factory=set)

@stream(...)
def login_stream(..., state: State[LoginState]) -> ...:
    ...

As in the first example, the initial value for the state will be LoginState(unique_ips=set()).


Updating State

State is updated by mutating the state argument of the resolver.

@dataclass
class LoginState:
    unique_ips: set[str] = field(default_factory=set)

@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> ...:
    state.unique_ips.add(event.ip_address)
    count = len(state.unique_ips)
    ...

The above stream resolver tracks all the unique IP addresses that have been seen for a user. The unique_ips set starts out empty, and on every message, the resolver adds the IP address from the event to the state. After the resolver has run, Chalk will save the updated state.


Complete Example

from typing import Set
from pydantic import BaseModel
from chalk import State
from chalk.features import features, Features
from chalk.streams import stream, KafkaSource

source = KafkaSource(name="...")

class LoginEvent(BaseModel):
    user_id: str
    ip_address: str

@features
class User:
    id: str
    unique_ip_count: int

class LoginState(BaseModel):
    unique_ips: Set[str] = 0

@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> Features[
    User.id,
    User.unique_ip_count
]:
    state.unique_ips.add(event.ip_address)
    return User(id=event.user_id, unique_ip_count=len(state.unique_ips))