Chalk enables users to convert and aggregate streaming messages into Chalk features in realtime.
The first step towards building a streaming environment with Chalk is creating a streaming source from one of Chalk’s integrations. Chalk supports Kafka and Kinesis for streaming. Like other integration sources, source parameters and credentials may be specified in either the dashboard or the source instantiation in code.
from chalk.streams import KafkaSource, KinesisSource kafka_source = KafkaSource(name="...") kinesis_source = KinesisSource(name="...")
Now, let’s set up the Chalk features you’d like to materialize upon stream message ingestion.
from chalk import features, Features @features(max_staleness="1d", etl_offline_to_online=True) class StreamFeature: id: str value: str
Streaming resolvers produce feature values in a similar manner as online and offline resolvers.
These features must be marked with a max staleness in order to store result values in offline cache.
etl_offline_to_online should be set to
True to access these values in online cache as well.
If both of these parameters are set, once these features are created,
they can accessed via online and offline query and used downstream by other online and offline resolvers.
Chalk supports three kinds of streaming resolvers: mapping, windowed, and continuous. On this page, we will focus on mapping, which creates one Chalk feature instance per message. Both windowed and continuous streaming will be covered in the aggregations page.
This is an example of a mapping resolver that maps from a
bytes message to an instance of
from chalk.streams import stream @stream(source=source) def stream_resolver(message: bytes) -> Features[StreamFeature.id, StreamFeature.value]: id, value = some_bytes_processing_step(message) return StreamFeature(id=id, value=value)
In this resolver, we pass in the raw bytes from the stream. If you’re using JSON as the encoding for messages on your topic, you can optionally specify a Pydantic Model as a wrapper for messages on the topic. Chalk will validate the encoding against the model.
from pydantic import BaseModel class Message(BaseModel): id: str value: str ... @stream(source=source) def stream_resolver(message: Message) -> Features[StreamFeature.id, StreamFeature.value]: return StreamFeature(id=message.id, value=message.value)
Chalk lets you configure whether resolvers should accept late-arriving stream messages.
By default, Chalk attempts to consider any late arriving in stream resolvers.
However, you can tune this behavior with the
to you stream source:
from chalk.streams import KafkaSource # By default, the late_arrival_deadline is set to "infinity". source = KafkaSource(late_arrival_deadline="30m")
If a message is older than the
late_arrival_deadline when it is consumed,
its resolver will not run.
Sometimes, messages must be processed before resolver execution.
Streaming resolvers can optionally support a
parse function that preprocesses messages.
Possible use cases include
The parse function runs before the resolver, and can transform the message into a format that the
stream resolver understands. If the parse function returns
None, the resolver will be skipped.
A simple parse function can ingest bytes into a
BaseModel, which will be used as input for the streaming resolver.
from pydantic import BaseModel class Message(BaseModel): id: str value: str def parse_bytes(data: bytes) -> Message: id, value = some_bytes_processing_step(data) return Message(id=id, value=value) @stream(source=source, parse=parse_bytes) def stream_resolver(message: Message) -> Features[StreamFeature.id, StreamFeature.value]: return StreamFeature(id=message.id, value=message.value)
In the below example, only
UserEventMessages which have a
click_event property will be processed by
Those where it is equal to
None will not be processed.
# Child messages class UserLoginEvent(BaseModel): ... class UserClickEvent(BaseModel): ... #Parent message contains one of the child message types class UserEventMessage(BaseModel): login_event: Optional[UserLoginEvent] = None click_event: Optional[UserClickEvent] = None def get_click_event(event: UserEventMessage) -> UserClickEvent: return event.click_event @stream(source=str_source, parse=get_click_event) def resolve_clicks(message: UserClickEvent) -> Features[...]: ...
In this example, each of the messages from our Kafka source will be converted to a
Our streaming message, embodied by
KafkaMessage, contains a string representation of a naive datetime
we would like to convert to a timezoned datetime.
Refer to the aggregations page for why timezoned timestamps
may be useful for streaming use cases.
Upon message arrival, the bytes are first parsed into
then run through the parse function
The intermediate output
ParsedMessage is fed to the
stream_resolver, which produces Chalk features.
Because we have specified max staleness and
we can expect
StreamFeature to be queryable in both online and offline contexts.
from datetime import datetime, timezone from dateutil import parser from pydantic import BaseModel from chalk import features, Features from chalk.streams import stream, KafkaSource source = KafkaSource(name="...") class KafkaMessage(BaseModel): id: str value: str naive_timestamp_str: str class ParsedMessage(BaseModel): id: str value: str event_timestamp: datetime @features(max_staleness="1d", etl_offline_to_online=True) class StreamFeature: id: str value: str event_timestamp: datetime def parse_message(kafka_message: KafkaMessage) -> ParsedMessage: parsed_timestamp: datetime = parser.parse(kafka_message.naive_timestamp_str) timezoned_timestamp = parsed_timestamp.replace(tzinfo=timezone.utc) return ParsedMessage( id=kafka_message.id, value=kafka_message.value, event_timestamp=timezoned_timestamp ) @stream(source=source) def stream_resolver(message: ParsedMessage) -> Features[StreamFeature.id, StreamFeature.value]: return StreamFeature(id=message.id, value=message.value, event_timestamp=message.event_timestamp)
Testing stream resolvers can be difficult: streaming is a complex paradigm that involves multiple services that can be difficult to replicate.
If your streaming resolvers error in production, the errors will be logged and displayed on the dashboard for the resolver. But if you’re looking to unit test your stream resolvers, you have a few options.
One is to fetch data off your streaming source and feed it into the resolver directly.
If your streaming resolver intakes
bytes, you can do this without manipulation,
since stream data is in bytes natively.
but if it takes a
BaseModel, you will have to perform a
model.parse_raw() call on the raw bytes.
@stream(source=some_kinesis_source) def kinesis_stream( message_raw: bytes, ) -> Features[KinesisPaymentFeatures.id, KinesisPaymentFeatures.amount]: ...
Assuming you have the above resolver, you can pull off messages from Kinesis and just insert them into your resolver. Like regular resolvers, stream resolvers should be unit-testable as regular Python functions.
import boto3 @stream(source=some_kinesis_source) def kinesis_stream_resolver( message_raw: bytes, ) -> Features[KinesisPaymentFeatures.id, KinesisPaymentFeatures.amount]: ... def fetch_messages(): session = boto3.session.Session() stream_name=... client = session.client( "kinesis", region_name=..., aws_access_key_id=..., aws_secret_access_key=... ) response = client.describe_stream(StreamName=stream_name) details = response['StreamDescription'] response = client.get_shard_iterator( StreamName=stream_name, ShardId=details['Shards']['ShardId'], ShardIteratorType='LATEST') shard_iter = response['ShardIterator'] records =  while len(records) < 10: # polling Kinesis for records... response = client.get_records(ShardIterator=shard_iter, Limit=10) records.extend(response['Records']) shard_iter = response['NextShardIterator'] messages = [record["Data"] for record in records] return messages stream_messages = fetch_messages() for message in stream_messages: result = kinesis_stream_resolver(message) print(result)
However, if you have a parse function, states, rekeying,
custom timestamps, or other advanced functionality you want to test,
you may have to send messages to the streaming server.
Read more on how to use the
ChalkClient to test streaming resolvers
from chalk.client import ChalkClient stream_messages = fetch_messages() keys = ["my_key"] * len(stream_messages) client = ChalkClient() resp = client.test_streaming_resolver( resolver="kinesis_stream_resolver", message_keys=keys, message_bodies=stream_messages, ) print(resp.features)