Chalk home page
Docs
API
CLI
  1. Streaming
  2. Kafka

Chalk supports Kafka as a streaming source which works with @stream resolvers.


Configuration options

You can configure the Kafka-specific consumer options either explicitly or through the Chalk dashboard.

Named integration

If configuring through the dashboard, your Kafka integration will be given a name. You can then reference this name in your stream configuration.

Options
namestring
The name of the integration as configured in your Chalk dashboard.

Explicit configuration

If you choose instead to configure the Kafka topic in your code directly, you must supply the following configuration parameters:

Options
bootstrap_serverstring | string[]
The Kafka broker's host name without the security protocol. You can specify multiple brokers by passing a list of strings. Example: `"localhost:9092"` or `["localhost:9092", "localhost:9093"]`
topicstring | string[]
The topic or topics to subscribe to.
ssl_keystore_locationstring?
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
client_id_prefixstring?
Optionally, you may specify a prefix that will be used for client ids generated by `@stream` resolvers that consume this source.
group_id_prefixstring?
Optionally, you may specify a prefix that will be used for group ids generated by `@stream` resolvers that consume this source.
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
Security protocol passed directly to Kafka. Defaults to 'PLAINTEXT'.
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Defaults to 'PLAIN'.
sasl_usernamestring?
Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
sasl_passwordstring?
Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.

Late arrivals

Chalk lets you configure how to handle late arriving messages in your stream. The Kafka source supports a keyword argument late_arrival_deadline which can be set to a Duration or datetime.timedelta. By default, the late_arrival_deadline is set to "infinity", meaning Chalk will try to handle every message, regardless of how late it has arrived.

How late arrivals impact your resolver depends on the streaming mode. See Late Arrivals for Window Streams and Late Arrivals for Mapping Streams for detailed information.


Example

from pydantic import BaseModel
from chalk import stream
from chalk.streams import KafkaSource

class UserUpdateBody(BaseModel):
    user_id: str
    favorite_color: str
    ...

kafka_source = KafkaSource(
    bootstrap_server='kafka.website.com:9092',
    topic='user_favorite_color_updates'
)

@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
    return User(
        uid=message.value.user_id,
        favorite_color=message.value.favorite_color
    )