Feature Engine
Consuming Kafka streams.
Chalk supports Kafka, Kinesis, and PubSub as streaming sources
for @stream resolvers.
You can configure the Kafka-, Kinesis-, or PubSub-specific options in two places: through the Chalk dashboard as a named integration, or explicitly in code on the source object. See Choosing where to configure for the trade-offs.
If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.
namestringYou can instead choose to configure the source in your code directly for the following source types.
bootstrap_serverstring | string[]"localhost:9092" or ["localhost:9092", "localhost:9093"]topicstring | string[]ssl_keystore_locationstring?client_id_prefixstring?@streamresolvers that consume this source.group_id_prefixstring?@streamresolvers that consume this source.security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'sasl_usernamestring?sasl_passwordstring?stream_namestring?stream_arnstring?region_namestringaws_access_key_idstring?aws_secret_access_keystring?aws_session_tokenstring?endpoint_urlstring?consumer_role_arnstring?project_idstring?subscription_idstring?Both approaches resolve to the same source configuration at runtime, so the choice is about where the values live and how they are managed, not about capability.
| Named integration (dashboard) | Explicit configuration (code) | |
|---|---|---|
| Where values live | Stored in your environment and backed by your cloud secret manager | In your source code / repository |
| Secrets (passwords, keys) | Encrypted at rest, never committed to the repo | Encrypted at rest as a secret; Can manage state via repo or agent OR through the environment Secret page |
| Changing a data source value | Edit in the dashboard and chalk apply | Edit code and chalk apply |
| Discoverability | Lives under data sources in the environment | Lives in repo and/or Secret page in UI |
| Referenced in code as | KafkaSource(name="...") | KafkaSource(bootstrap_server=..., topic=..., ...) |
For more information on configuring data sources and previewing messages from a stream, checkout Configuring Data Sources.
from chalk.streams import KafkaSource
transaction_stream_source = KafkaSource(
name="transaction_stream", # name of Kafka data source in UI
)chalk secret command.import os
from chalk.streams import KafkaSource
transaction_stream_source = KafkaSource(
name="transaction_stream",
bootstrap_server=os.getenv("CHALK_KAFKA_BOOTSTRAP_SERVER"),
sasl_username=os.getenv("CHALK_KAFKA_SASL_USERNAME"),
sasl_password=os.getenv("CHALK_KAFKA_SASL_PASSWORD"),
sasl_mechanism=os.getenv("CHALK_KAFKA_SASL_MECHANISM"),
security_protocol=os.getenv("CHALK_KAFKA_SECURITY_PROTOCOL"),
topic="transactions.v1",
)from pydantic import BaseModel
from chalk import stream, feature
from chalk.streams import KafkaSource
from chalk.features import features, Features
@features
class User:
id: str
favorite_color: str = feature(max_staleness='30m')
class UserUpdateBody(BaseModel):
user_id: str
favorite_color: str
kafka_source = KafkaSource(name="user_favorite_color_updates")
@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
return User(
id=message.value.user_id,
favorite_color=message.value.favorite_color
)For Kafka data sources, Chalk supports the following Kafka configuration values
for your integration. You can set these configurations within your Kafka Data Source either by constructing a dict of
{"your.kafka.config": "config_value"} and setting that under the ADDITIONAL_ENGINE_ARGUMENTS field, or by passing in
KafkaSource(additional_kafka_args=...) in your code-based data source configuration.
Fetch & Polling Configuration
fetch.min.bytes - Minimum amount of data the server should return for a fetch requestfetch.max.bytes - Maximum amount of data the server should return for a fetch requestfetch.max.wait.ms - Maximum time the server will block before answering the fetch requestmax.partition.fetch.bytes - Maximum amount of data per-partition the server will returnmax.poll.records - Maximum number of records returned in a single call to poll()max.poll.interval.ms - Maximum delay between invocations of poll()Timeout Configuration
session.timeout.ms - Timeout used to detect consumer failuresheartbeat.interval.ms - Expected time between heartbeats to the consumer coordinatorrequest.timeout.ms - Maximum amount of time the client will wait for the response of a requestconsumer.timeout.ms - Timeout for consumer operationspoll.timeout - User-friendly alias for consumer.timeout.msconnections.max.idle.ms - Close idle connections after this many millisecondsRetry & Reliability
retry.backoff.ms - Time to wait before attempting to retry a failed requestreconnect.backoff.ms - Base amount of time to wait before attempting to reconnectreconnect.backoff.max.ms - Maximum amount of time to wait when reconnectingMetadata & Discovery
metadata.max.age.ms - Period of time before forcing a refresh of metadataMessage Processing
check.crcs - Automatically check the CRC32 of the records consumedisolation.level - Controls how to read messages written transactionallyexclude.internal.topics - Whether internal topics should be exposed to the consumerProtocol Settings
receive.buffer.bytes - Size of the TCP receive buffer (SO_RCVBUF)send.buffer.bytes - Size of the TCP send buffer (SO_SNDBUF)