Integrate with streaming data sources.
Stream resolvers function almost exactly like webhook resolvers.
The first step is to create a streaming source from one of Chalk’s integrations:
from chalk.streams import KafkaSource source = KafkaSource(name="...")
If you’re using JSON as the encoding for messages on your topic,
you can optionally also specify a
for the messages on the topic.
Chalk will validate the value against the model.
from pydantic import BaseModel class Message(BaseModel): user_id: str ... source = KafkaSource(name="...")
After creating your streaming source, you can start processing messages and creating feature values.
Once you’ve defined your stream source,
you can write a resolver for processing
stream updates with the
from chalk.streams import stream @stream(source=source) def fn(message: Message) -> Features[...]: ...
Streams can produce feature values in the same manner as online and offline resolvers.
Chalk lets you configure whether resolvers should accept late-arriving stream messages.
By default, Chalk attempts to consider any late arriving in stream resolvers.
However, you can tune this behavior with the
to you stream source:
from chalk.streams import KafkaSource # By default, the late_arrival_deadline is set to "infinity". source = KafkaSource(late_arrival_deadline="30m")
If a message is older than the
late_arrival_deadline when it arrives,
its resolver will not run.
Some stream services, such as Kafka, support sending multiple types in a single schema.
To support this use case, Chalk allows developers to supply a
parse function to preprocess messages.
The parse function runs before the resolver, and can transform the message into a format that the
stream resolver understands. If the parse function returns
None, the resolver will be skipped.
In the below example, only UserEventMessages which have a “click_event” property will be processed by “resolve_clicks”.
# Child messages class UserLoginEvent: ... class UserClickEvent: ... #Parent message contains one of the chile message types class UserEventMessage: login_event: Optional[UserLoginEvent] click_event: Optional[UserClickEvent] def get_click_event(event: UserEventMessage) -> UserClickEvent return event.click_event @stream(source=str_source, parse=get_click_event) def resolve_clicks(message: UserClickEvent) -> Features[...]: ...
from pydantic import BaseModel from chalk.streams import stream, KafkaSource class StreamMessage(BaseModel): transaction_id: str status: str source = KafkaSource(name="...") @stream(source=source) def fn(message: StreamMessage) -> Features[ Transaction.uuid, Transaction.status, ]: return Transaction( uuid=message.value.transaction_id, status=message.value.status, )