Chalk home page
Docs
API
CLI
  1. Streaming
  2. Streams

Stream resolvers function almost exactly like webhook resolvers.

The first step is to create a streaming source from one of Chalk’s integrations:

from chalk.streams import KafkaSource

source = KafkaSource(name="...")

If you’re using JSON as the encoding for messages on your topic, you can optionally also specify a Pydantic Model for the messages on the topic.
Chalk will validate the value against the model.

from pydantic import BaseModel

class Message(BaseModel):
    user_id: str
    ...

source = KafkaSource(name="...")

After creating your streaming source, you can start processing messages and creating feature values.

Once you’ve defined your stream source, you can write a resolver for processing stream updates with the @stream decorator:

from chalk.streams import stream

@stream(source=source)
def fn(message: Message) -> Features[...]:
    ...

Streams can produce feature values in the same manner as online and offline resolvers.


Late arrivals

Chalk lets you configure whether resolvers should accept late-arriving stream messages. By default, Chalk attempts to consider any late arriving in stream resolvers. However, you can tune this behavior with the late_arrival_deadline argument to you stream source:

from chalk.streams import KafkaSource

# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")

If a message is older than the late_arrival_deadline when it arrives, its resolver will not run.


Parsing

Some stream services, such as Kafka, support sending multiple types in a single schema. To support this use case, Chalk allows developers to supply a parse function to preprocess messages.

The parse function runs before the resolver, and can transform the message into a format that the stream resolver understands. If the parse function returns None, the resolver will be skipped.

In the below example, only UserEventMessages which have a “click_event” property will be processed by “resolve_clicks”.

# Child messages
class UserLoginEvent:
    ...

class UserClickEvent:
    ...

#Parent message contains one of the chile message types
class UserEventMessage:
    login_event: Optional[UserLoginEvent]
    click_event: Optional[UserClickEvent]

def get_click_event(event: UserEventMessage) -> UserClickEvent
    return event.click_event


@stream(source=str_source, parse=get_click_event)
def resolve_clicks(message: UserClickEvent) -> Features[...]:
    ...

Full example

from pydantic import BaseModel
from chalk.streams import stream, KafkaSource

class StreamMessage(BaseModel):
    transaction_id: str
    status: str

source = KafkaSource(name="...")

@stream(source=source)
def fn(message: StreamMessage) -> Features[
    Transaction.uuid,
    Transaction.status,
]:
    return Transaction(
        uuid=message.value.transaction_id,
        status=message.value.status,
    )