Chalk home page
Docs
API
CLI
  1. Streaming
  2. Stream Sources

Chalk supports Kafka and Kinesis as streaming sources for @stream resolvers.


Configuration options

You can configure the Kafka- or Kinesis-specific options either explicitly or through the Chalk dashboard.

Named integration

If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.

Options
namestring
The name of the integration as configured in your Chalk dashboard.

Explicit configuration

You can instead choose to configure the source in your code directly. For Kafka, the following configuration parameters must be set:

Kafka Options
bootstrap_serverstring | string[]
The Kafka broker's host name without the security protocol. You can specify multiple brokers by passing a list of strings. Example: `"localhost:9092"` or `["localhost:9092", "localhost:9093"]`
topicstring | string[]
The topic or topics to subscribe to.
ssl_keystore_locationstring?
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
client_id_prefixstring?
Optionally, you may specify a prefix that will be used for client ids generated by `@stream` resolvers that consume this source.
group_id_prefixstring?
Optionally, you may specify a prefix that will be used for group ids generated by `@stream` resolvers that consume this source.
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
Security protocol passed directly to Kafka. Defaults to 'PLAINTEXT'.
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Defaults to 'PLAIN'.
sasl_usernamestring?
Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.
sasl_passwordstring?
Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication. Defaults to null.

For Kinesis, these parameters must be set:

Kinesis Options
stream_namestring?
The name of your stream. Either this or the stream_arn must be specified.
stream_arnstring?
The ARN of your stream. Either this or the stream_name must be specified.
region_namestring
The AWS region, e.g. "us-east-2".
aws_access_key_idstring?
The AWS access key id credential, if not already set in the environment.
aws_secret_access_keystring?
The AWS secret access key credential, if not already set in the environment.
aws_session_tokenstring?
The AWS session token credential, if not already set in the environment.
endpoint_urlstring?
An optional endpoint to hit the Kinesis server.
consumer_role_arnstring?
An optional role ARN for the consumer to assume.

Example

from pydantic import BaseModel
from chalk import stream, Features, feature
from chalk.streams import KafkaSource
from chalk.features import features

@features
class User:
    id: str
    favorite_color: str = feature(max_staleness='30m')

class UserUpdateBody(BaseModel):
    user_id: str
    favorite_color: str

kafka_source = KafkaSource(
    bootstrap_server='kafka.website.com:9092',
    topic='user_favorite_color_updates'
)

@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
    return User(
        id=message.value.user_id,
        favorite_color=message.value.favorite_color
    )