Stream resolvers function almost exactly like webhook resolvers.

The first step is to create a streaming source from one of Chalk’s integrations:

from chalk.streams import KafkaSource

source = KafkaSource(name="...")

If you’re using JSON as the encoding for messages on your topic, you can optionally also specify a Pydantic Model for the messages on the topic.
Chalk will validate the value against the model.

from pydantic import BaseModel

class Message(BaseModel):
    user_id: str

source = KafkaSource(name="...")

After creating your streaming source, you can start processing messages and creating feature values.

Once you’ve defined your stream source, you can write a resolver for processing stream updates with the @stream decorator:

from chalk.streams import stream

def fn(message: Message) -> Features[...]:

Streams can produce feature values in the same manner as online and offline resolvers.

Late arrivals

Chalk lets you configure whether resolvers should accept late-arriving stream messages. By default, Chalk attempts to consider any late arriving in stream resolvers. However, you can tune this behavior with the late_arrival_deadline argument to you stream source:

from chalk.streams import KafkaSource

# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")

If a message is older than the late_arrival_deadline when it arrives, its resolver will not run.


Some stream services, such as Kafka, support sending multiple types in a single schema. To support this use case, Chalk allows developers to supply a parse function to preprocess messages.

The parse function runs before the resolver, and can transform the message into a format that the stream resolver understands. If the parse function returns None, the resolver will be skipped.

In the below example, only UserEventMessages which have a “click_event” property will be processed by “resolve_clicks”.

# Child messages
class UserLoginEvent:

class UserClickEvent:

#Parent message contains one of the chile message types
class UserEventMessage:
    login_event: Optional[UserLoginEvent]
    click_event: Optional[UserClickEvent]

def get_click_event(event: UserEventMessage) -> UserClickEvent
    return event.click_event

@stream(source=str_source, parse=get_click_event)
def resolve_clicks(message: UserClickEvent) -> Features[...]:

Full example

from pydantic import BaseModel
from chalk.streams import stream, KafkaSource

class StreamMessage(BaseModel):
    transaction_id: str
    status: str

source = KafkaSource(name="...")

def fn(message: StreamMessage) -> Features[
    return Transaction(