Streaming
Consuming Kafka streams.
Chalk supports Kafka as a streaming source
which works with @stream
resolvers.
You can configure the Kafka-specific consumer options either explicitly or through the Chalk dashboard.
If configuring through the dashboard, your Kafka integration will be given a name. You can then reference this name in your stream configuration.
namestring
If you choose instead to configure the Kafka topic in your code directly, you must supply the following configuration parameters:
bootstrap_serverstring | string[]
topicstring | string[]
ssl_keystore_locationstring?
client_id_prefixstring?
group_id_prefixstring?
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
sasl_usernamestring?
sasl_passwordstring?
Chalk lets you configure how to handle late arriving messages in your stream.
The Kafka source supports a keyword argument late_arrival_deadline
which can
be set to a Duration or
datetime.timedelta.
By default, the late_arrival_deadline
is set to "infinity"
, meaning Chalk
will try to handle every message, regardless of how late it has arrived.
How late arrivals impact your resolver depends on the streaming mode. See Late Arrivals for Window Streams and Late Arrivals for Mapping Streams for detailed information.
from pydantic import BaseModel
from chalk import stream
from chalk.streams import KafkaSource
class UserUpdateBody(BaseModel):
user_id: str
favorite_color: str
...
kafka_source = KafkaSource(
bootstrap_server='kafka.website.com:9092',
topic='user_favorite_color_updates'
)
@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
return User(
uid=message.value.user_id,
favorite_color=message.value.favorite_color
)