Streaming
Carry state across aggregations
Many stream processing tasks are able to look at a single event and produce features. Sometimes, however, a resolver will need to remember information from many messages in a stream to produce feature values. These resolvers are called stateful resolvers.
Chalk serializes your state and creates initial default values for new stream key values. To do so, it needs a schema for the state, which can be defined as a Python Dataclass or Pydantic Model.
To specify the state for a resolver function, wrap your schema
with chalk.State
:
from dataclasses import dataclass
from chalk import State
from chalk.streams import stream
@dataclass
class MyState:
...
@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
...
from pydantic import BaseModel
from chalk import State
from chalk.streams import stream
class MyState(BaseModel):
...
@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
...
The first time a stream resolver runs for a given key, Chalk needs to create an initial value for the state.
Chalk will look first for a default value for the state argument in the resolver to construct the initial state:
from dataclasses import dataclass
@dataclass
class LoginState:
unique_ips: set[str]
@stream(...)
def login_stream(..., state: State[LoginState] = LoginState(unique_ips=set())) -> ...:
...
In the case above, the initial value for the state will be LoginState(unique_ips=set())
.
If an initial state is not given in the function signature, Chalk will try to construct the default state with no arguments. To construct the state dataclass or Pydantic model without arguments, all fields of the state class must have a default value:
from chalk import State
from dataclasses import field, dataclass
@dataclass
class LoginState:
unique_ips: set[str] = field(default_factory=set)
@stream(...)
def login_stream(..., state: State[LoginState]) -> ...:
...
As in the first example, the initial value for the state will be LoginState(unique_ips=set())
.
State is updated by mutating the state argument of the resolver.
class LoginEvent(BaseModel):
user_id: str
ip_address: str
@dataclass
class LoginState:
unique_ips: set[str] = field(default_factory=set)
@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> ...:
state.unique_ips.add(event.ip_address)
count = len(state.unique_ips)
...
The above stream resolver tracks all the unique IP addresses that have
been seen for a specific key. The unique_ips
set starts out empty,
and on every message, the resolver adds the IP address from the
event to the state. After the resolver has run, Chalk will save
the updated state.
A unique state object is stored for each key in the stream. By default, the key is specified by the key on the stream message. In some cases, we may want to rekey the messages in a custom manner. Chalk supports rekeying based on the resolver inputs.
Given the input message LoginEvent
, if we want to explicitly want to key by user_id
,
we can add a keys
argument to the @stream
decorator.
This will track the number of unique IP addresses a user has visited over time.
The keys
argument must be a Python dictionary that maps from BaseModel
attribute to Chalk feature.
Every time a message is processed with a new ip_address
, we can add it to our State and update the User
feature.
Read more about keys here.
from typing import Set
from pydantic import BaseModel
from chalk import State
from chalk.features import features, Features
from chalk.streams import stream, KafkaSource
source = KafkaSource(name="...")
class LoginEvent(BaseModel):
user_id: str
ip_address: str
@features
class User:
id: str
unique_ip_count: int
class LoginState(BaseModel):
unique_ips: Set[str] = 0
@stream(source=source, keys={"user_id": User.id})
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> Features[
User.id,
User.unique_ip_count
]:
state.unique_ips.add(event.ip_address)
return User(id=event.user_id, unique_ip_count=len(state.unique_ips))