Chalk supports Kafka, Kinesis, and PubSub as streaming sources for @stream resolvers.


Configuration options

You can configure the Kafka-, Kinesis-, or PubSub-specific options in two places: through the Chalk dashboard as a named integration, or explicitly in code on the source object. See Choosing where to configure for the trade-offs.

Named integration

If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.

Options
namestring
The name of the integration as configured in your Chalk dashboard.

Explicit configuration

You can instead choose to configure the source in your code directly for the following source types.

Kafka Options
bootstrap_serverstring | string[]
The Kafka broker's host name without the security protocol. You can specify multiple brokers by passing a list of strings. Example: "localhost:9092" or ["localhost:9092", "localhost:9093"]
topicstring | string[]
The topic or topics to subscribe to.
ssl_keystore_locationstring?
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
client_id_prefixstring?
Optionally, you may specify a prefix that will be used for client ids generated by @streamresolvers that consume this source.
group_id_prefixstring?
Optionally, you may specify a prefix that will be used for group ids generated by @streamresolvers that consume this source.
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
Security protocol passed directly to Kafka. Defaults to 'PLAINTEXT'.
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Defaults to 'PLAIN'.
sasl_usernamestring?
Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
sasl_passwordstring?
Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
Kinesis Options
stream_namestring?
The name of your stream. Either this or the stream_arn must be specified.
stream_arnstring?
The ARN of your stream. Either this or the stream_name must be specified.
region_namestring
The AWS region, e.g. "us-east-2".
aws_access_key_idstring?
The AWS access key id credential, if not already set in the environment.
aws_secret_access_keystring?
The AWS secret access key credential, if not already set in the environment.
aws_session_tokenstring?
The AWS session token credential, if not already set in the environment.
endpoint_urlstring?
An optional endpoint to hit the Kinesis server.
consumer_role_arnstring?
An optional role ARN for the consumer to assume.
PubSub Options
project_idstring?
The project id that your pubsub source resides in.
subscription_idstring?
The subscription id of your pubsub source.

Choosing where to configure

Both approaches resolve to the same source configuration at runtime, so the choice is about where the values live and how they are managed, not about capability.

Named integration (dashboard)Explicit configuration (code)
Where values liveStored in your environment and backed by your cloud secret managerIn your source code / repository
Secrets (passwords, keys)Encrypted at rest, never committed to the repoEncrypted at rest as a secret; Can manage state via repo or agent OR through the environment Secret page
Changing a data source valueEdit in the dashboard and chalk applyEdit code and chalk apply
DiscoverabilityLives under data sources in the environmentLives in repo and/or Secret page in UI
Referenced in code asKafkaSource(name="...")KafkaSource(bootstrap_server=..., topic=..., ...)
  • The source carries secrets (SASL password, SSL keystore, AWS keys). The dashboard stores them in your cloud secret manager and injects them into the running containers, so credentials never live in the repo.
  • Configuring the named integration through the dashboard allows all information pertaining to the the Kafka data source to live with the definition of the data source in the UI. The code base only needs to reference by name and topic, keeping environment secret management out of the codebase itself.

For more information on configuring data sources and previewing messages from a stream, checkout Configuring Data Sources.

from chalk.streams import KafkaSource

transaction_stream_source = KafkaSource(
    name="transaction_stream",  # name of Kafka data source in UI
)

When to configure explicitly in code

  • You want to manage secrets through your code base and upload via chalk secret command.
  • You want agents to be able to rewrite secrets.
  • Legacy users of streams.
import os
from chalk.streams import KafkaSource

transaction_stream_source = KafkaSource(
    name="transaction_stream",
    bootstrap_server=os.getenv("CHALK_KAFKA_BOOTSTRAP_SERVER"),
    sasl_username=os.getenv("CHALK_KAFKA_SASL_USERNAME"),
    sasl_password=os.getenv("CHALK_KAFKA_SASL_PASSWORD"),
    sasl_mechanism=os.getenv("CHALK_KAFKA_SASL_MECHANISM"), 
    security_protocol=os.getenv("CHALK_KAFKA_SECURITY_PROTOCOL"),
    topic="transactions.v1",
)

Example

from pydantic import BaseModel
from chalk import stream, feature
from chalk.streams import KafkaSource
from chalk.features import features, Features

@features
class User:
    id: str
    favorite_color: str = feature(max_staleness='30m')

class UserUpdateBody(BaseModel):
    user_id: str
    favorite_color: str

kafka_source = KafkaSource(name="user_favorite_color_updates")

@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
    return User(
        id=message.value.user_id,
        favorite_color=message.value.favorite_color
    )

Additional Configurations

For Kafka data sources, Chalk supports the following Kafka configuration values for your integration. You can set these configurations within your Kafka Data Source either by constructing a dict of {"your.kafka.config": "config_value"} and setting that under the ADDITIONAL_ENGINE_ARGUMENTS field, or by passing in KafkaSource(additional_kafka_args=...) in your code-based data source configuration.

Supported Kafka Configurations

Fetch & Polling Configuration

  • fetch.min.bytes - Minimum amount of data the server should return for a fetch request
  • fetch.max.bytes - Maximum amount of data the server should return for a fetch request
  • fetch.max.wait.ms - Maximum time the server will block before answering the fetch request
  • max.partition.fetch.bytes - Maximum amount of data per-partition the server will return
  • max.poll.records - Maximum number of records returned in a single call to poll()
  • max.poll.interval.ms - Maximum delay between invocations of poll()

Timeout Configuration

  • session.timeout.ms - Timeout used to detect consumer failures
  • heartbeat.interval.ms - Expected time between heartbeats to the consumer coordinator
  • request.timeout.ms - Maximum amount of time the client will wait for the response of a request
  • consumer.timeout.ms - Timeout for consumer operations
  • poll.timeout - User-friendly alias for consumer.timeout.ms
  • connections.max.idle.ms - Close idle connections after this many milliseconds

Retry & Reliability

  • retry.backoff.ms - Time to wait before attempting to retry a failed request
  • reconnect.backoff.ms - Base amount of time to wait before attempting to reconnect
  • reconnect.backoff.max.ms - Maximum amount of time to wait when reconnecting

Metadata & Discovery

  • metadata.max.age.ms - Period of time before forcing a refresh of metadata

Message Processing

  • check.crcs - Automatically check the CRC32 of the records consumed
  • isolation.level - Controls how to read messages written transactionally
  • exclude.internal.topics - Whether internal topics should be exposed to the consumer

Protocol Settings

  • receive.buffer.bytes - Size of the TCP receive buffer (SO_RCVBUF)
  • send.buffer.bytes - Size of the TCP send buffer (SO_SNDBUF)