Streaming
Integrate with streaming data sources.
Stream resolvers function almost exactly like webhook resolvers.
The first step is to create a streaming source from one of Chalk’s integrations:
from chalk.streams import KafkaSource
source = KafkaSource(name="...")
If you’re using JSON as the encoding for messages on your topic,
you can optionally also specify a
Pydantic Model
for the messages on the topic.
Chalk will validate the value against the model.
from pydantic import BaseModel
class Message(BaseModel):
user_id: str
...
source = KafkaSource(name="...")
After creating your streaming source, you can start processing messages and creating feature values.
Once you’ve defined your stream source,
you can write a resolver for processing
stream updates with the @stream
decorator:
from chalk.streams import stream
@stream(source=source)
def fn(message: Message) -> Features[...]:
...
Streams can produce feature values in the same manner as online and offline resolvers.
Chalk lets you configure whether resolvers should accept late-arriving stream messages.
By default, Chalk attempts to consider any late arriving in stream resolvers.
However, you can tune this behavior with the late_arrival_deadline
argument
to you stream source:
from chalk.streams import KafkaSource
# By default, the late_arrival_deadline is set to "infinity".
source = KafkaSource(late_arrival_deadline="30m")
If a message is older than the late_arrival_deadline
when it arrives,
its resolver will not run.
Some stream services, such as Kafka, support sending multiple types in a single schema.
To support this use case, Chalk allows developers to supply a parse
function to preprocess messages.
The parse function runs before the resolver, and can transform the message into a format that the
stream resolver understands. If the parse function returns None
, the resolver will be skipped.
In the below example, only UserEventMessages which have a “click_event” property will be processed by “resolve_clicks”.
# Child messages
class UserLoginEvent:
...
class UserClickEvent:
...
#Parent message contains one of the chile message types
class UserEventMessage:
login_event: Optional[UserLoginEvent]
click_event: Optional[UserClickEvent]
def get_click_event(event: UserEventMessage) -> UserClickEvent
return event.click_event
@stream(source=str_source, parse=get_click_event)
def resolve_clicks(message: UserClickEvent) -> Features[...]:
...
from pydantic import BaseModel
from chalk.streams import stream, KafkaSource
class StreamMessage(BaseModel):
transaction_id: str
status: str
source = KafkaSource(name="...")
@stream(source=source)
def fn(message: StreamMessage) -> Features[
Transaction.uuid,
Transaction.status,
]:
return Transaction(
uuid=message.value.transaction_id,
status=message.value.status,
)