Feature Engine
Consuming Kafka streams.
Chalk supports Kafka, Kinesis, and PubSub as streaming sources
for @stream resolvers.
You can configure the Kafka-, Kinesis-, or PubSub-specific options either explicitly or through the Chalk dashboard.
If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.
namestringYou can instead choose to configure the source in your code directly for the following source types.
bootstrap_serverstring | string[]"localhost:9092" or ["localhost:9092", "localhost:9093"]topicstring | string[]ssl_keystore_locationstring?client_id_prefixstring?@streamresolvers that consume this source.group_id_prefixstring?@streamresolvers that consume this source.security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'sasl_usernamestring?sasl_passwordstring?stream_namestring?stream_arnstring?region_namestringaws_access_key_idstring?aws_secret_access_keystring?aws_session_tokenstring?endpoint_urlstring?consumer_role_arnstring?project_idstring?subscription_idstring?from pydantic import BaseModel
from chalk import stream, feature
from chalk.streams import KafkaSource
from chalk.features import features, Features
@features
class User:
id: str
favorite_color: str = feature(max_staleness='30m')
class UserUpdateBody(BaseModel):
user_id: str
favorite_color: str
kafka_source = KafkaSource(name="user_favorite_color_updates")
@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
return User(
id=message.value.user_id,
favorite_color=message.value.favorite_color
)For Kafka data sources, Chalk supports the following Kafka configuration values
for your integration. You can set these configurations within your Kafka Data Source either by constructing a dict of
{"your.kafka.config": "config_value"} and setting that under the ADDITIONAL_ENGINE_ARGUMENTS field, or by passing in
KafkaSource(additional_kafka_args=...) in your code-based data source configuration.
Fetch & Polling Configuration
fetch.min.bytes - Minimum amount of data the server should return for a fetch requestfetch.max.bytes - Maximum amount of data the server should return for a fetch requestfetch.max.wait.ms - Maximum time the server will block before answering the fetch requestmax.partition.fetch.bytes - Maximum amount of data per-partition the server will returnmax.poll.records - Maximum number of records returned in a single call to poll()max.poll.interval.ms - Maximum delay between invocations of poll()Timeout Configuration
session.timeout.ms - Timeout used to detect consumer failuresheartbeat.interval.ms - Expected time between heartbeats to the consumer coordinatorrequest.timeout.ms - Maximum amount of time the client will wait for the response of a requestconsumer.timeout.ms - Timeout for consumer operationspoll.timeout - User-friendly alias for consumer.timeout.msconnections.max.idle.ms - Close idle connections after this many millisecondsRetry & Reliability
retry.backoff.ms - Time to wait before attempting to retry a failed requestreconnect.backoff.ms - Base amount of time to wait before attempting to reconnectreconnect.backoff.max.ms - Maximum amount of time to wait when reconnectingMetadata & Discovery
metadata.max.age.ms - Period of time before forcing a refresh of metadataMessage Processing
check.crcs - Automatically check the CRC32 of the records consumedisolation.level - Controls how to read messages written transactionallyexclude.internal.topics - Whether internal topics should be exposed to the consumerProtocol Settings
receive.buffer.bytes - Size of the TCP receive buffer (SO_RCVBUF)send.buffer.bytes - Size of the TCP send buffer (SO_SNDBUF)