Chalk supports Kafka, Kinesis, and PubSub as streaming sources for @stream resolvers.


Configuration options

You can configure the Kafka-, Kinesis-, or PubSub-specific options either explicitly or through the Chalk dashboard.

Named integration

If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.

Options
namestring
The name of the integration as configured in your Chalk dashboard.

Explicit configuration

You can instead choose to configure the source in your code directly for the following source types.

Kafka Options
bootstrap_serverstring | string[]
The Kafka broker's host name without the security protocol. You can specify multiple brokers by passing a list of strings. Example: "localhost:9092" or ["localhost:9092", "localhost:9093"]
topicstring | string[]
The topic or topics to subscribe to.
ssl_keystore_locationstring?
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
client_id_prefixstring?
Optionally, you may specify a prefix that will be used for client ids generated by @streamresolvers that consume this source.
group_id_prefixstring?
Optionally, you may specify a prefix that will be used for group ids generated by @streamresolvers that consume this source.
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
Security protocol passed directly to Kafka. Defaults to 'PLAINTEXT'.
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Defaults to 'PLAIN'.
sasl_usernamestring?
Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
sasl_passwordstring?
Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
Kinesis Options
stream_namestring?
The name of your stream. Either this or the stream_arn must be specified.
stream_arnstring?
The ARN of your stream. Either this or the stream_name must be specified.
region_namestring
The AWS region, e.g. "us-east-2".
aws_access_key_idstring?
The AWS access key id credential, if not already set in the environment.
aws_secret_access_keystring?
The AWS secret access key credential, if not already set in the environment.
aws_session_tokenstring?
The AWS session token credential, if not already set in the environment.
endpoint_urlstring?
An optional endpoint to hit the Kinesis server.
consumer_role_arnstring?
An optional role ARN for the consumer to assume.
PubSub Options
project_idstring?
The project id that your pubsub source resides in.
subscription_idstring?
The subscription id of your pubsub source.

Example

from pydantic import BaseModel
from chalk import stream, feature
from chalk.streams import KafkaSource
from chalk.features import features, Features

@features
class User:
    id: str
    favorite_color: str = feature(max_staleness='30m')

class UserUpdateBody(BaseModel):
    user_id: str
    favorite_color: str

kafka_source = KafkaSource(name="user_favorite_color_updates")

@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
    return User(
        id=message.value.user_id,
        favorite_color=message.value.favorite_color
    )

Additional Configurations

For Kafka data sources, Chalk supports the following Kafka configuration values for your integration. You can set these configurations within your Kafka Data Source either by constructing a dict of {"your.kafka.config": "config_value"} and setting that under the ADDITIONAL_ENGINE_ARGUMENTS field, or by passing in KafkaSource(additional_kafka_args=...) in your code-based data source configuration.

Supported Kafka Configurations

Fetch & Polling Configuration

  • fetch.min.bytes - Minimum amount of data the server should return for a fetch request
  • fetch.max.bytes - Maximum amount of data the server should return for a fetch request
  • fetch.max.wait.ms - Maximum time the server will block before answering the fetch request
  • max.partition.fetch.bytes - Maximum amount of data per-partition the server will return
  • max.poll.records - Maximum number of records returned in a single call to poll()
  • max.poll.interval.ms - Maximum delay between invocations of poll()

Timeout Configuration

  • session.timeout.ms - Timeout used to detect consumer failures
  • heartbeat.interval.ms - Expected time between heartbeats to the consumer coordinator
  • request.timeout.ms - Maximum amount of time the client will wait for the response of a request
  • consumer.timeout.ms - Timeout for consumer operations
  • poll.timeout - User-friendly alias for consumer.timeout.ms
  • connections.max.idle.ms - Close idle connections after this many milliseconds

Retry & Reliability

  • retry.backoff.ms - Time to wait before attempting to retry a failed request
  • reconnect.backoff.ms - Base amount of time to wait before attempting to reconnect
  • reconnect.backoff.max.ms - Maximum amount of time to wait when reconnecting

Metadata & Discovery

  • metadata.max.age.ms - Period of time before forcing a refresh of metadata

Message Processing

  • check.crcs - Automatically check the CRC32 of the records consumed
  • isolation.level - Controls how to read messages written transactionally
  • exclude.internal.topics - Whether internal topics should be exposed to the consumer

Protocol Settings

  • receive.buffer.bytes - Size of the TCP receive buffer (SO_RCVBUF)
  • send.buffer.bytes - Size of the TCP send buffer (SO_SNDBUF)