Streaming
Carry state across aggregations
Many stream processing tasks are able to look at a single event and produce features. Sometimes, however, a resolver will need to remember information from many messages in a stream to produce feature values. These resolvers are called stateful resolvers.
Chalk serializes your state and creates initial default values for new partition ids. To do so, it needs a schema for the state, which can be defined as a Python Dataclass or Pydantic Model.
To specify the state for a resolver function, wrap your schema
with chalk.State
:
from dataclasses import dataclass
from chalk import State
from chalk.streams import stream
@dataclass
class MyState:
...
@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
...
from pydantic import BaseModel
from chalk import State
from chalk.streams import stream
class MyState(BaseModel):
...
@stream(...)
def stateful_stream(..., current: State[MyState]) -> ...:
...
The first time a stream resolver runs for a given partition key, Chalk needs to create an initial value for the state.
Chalk will look first for a default value for the state argument in the resolver to construct the initial state:
from dataclasses import dataclass
@dataclass
class LoginState:
unique_ips: set[str]
@stream(...)
def login_stream(..., state: State[LoginState] = LoginState(unique_ips=set())) -> ...:
...
In the case above, the initial value for the state will be LoginState(unique_ips=set())
.
If an initial state is not given in the function signature, Chalk will try to construct the default state with no arguments. To construct the state dataclass or Pydantic model without arguments, all fields of the state class have a default value:
from chalk import State
from dataclasses import field, dataclass
@dataclass
class LoginState:
unique_ips: set[str] = field(default_factory=set)
@stream(...)
def login_stream(..., state: State[LoginState]) -> ...:
...
As in the first example, the initial value for the state will be LoginState(unique_ips=set())
.
State is updated by mutating the state argument of the resolver.
@dataclass
class LoginState:
unique_ips: set[str] = field(default_factory=set)
@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> ...:
state.unique_ips.add(event.ip_address)
count = len(state.unique_ips)
...
The above stream resolver tracks all the unique IP addresses that have
been seen for a user. The unique_ips
set starts out empty,
and on every message, the resolver adds the IP address from the
event to the state. After the resolver has run, Chalk will save
the updated state.
from typing import Set
from pydantic import BaseModel
from chalk import State
from chalk.features import features, Features
from chalk.streams import stream, KafkaSource
source = KafkaSource(name="...")
class LoginEvent(BaseModel):
user_id: str
ip_address: str
@features
class User:
id: str
unique_ip_count: int
class LoginState(BaseModel):
unique_ips: Set[str] = 0
@stream(source=source)
def track_unique_ips(event: LoginEvent, state: State[LoginState]) -> Features[
User.id,
User.unique_ip_count
]:
state.unique_ips.add(event.ip_address)
return User(id=event.user_id, unique_ip_count=len(state.unique_ips))