Streaming
Consuming Kafka streams.
Chalk supports Kafka and Kinesis as streaming sources
for @stream
resolvers.
You can configure the Kafka- or Kinesis-specific options either explicitly or through the Chalk dashboard.
If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.
namestring
You can instead choose to configure the source in your code directly. For Kafka, the following configuration parameters must be set:
bootstrap_serverstring | string[]
topicstring | string[]
ssl_keystore_locationstring?
client_id_prefixstring?
group_id_prefixstring?
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
sasl_usernamestring?
sasl_passwordstring?
For Kinesis, these parameters must be set:
stream_namestring?
stream_arnstring?
region_namestring
aws_access_key_idstring?
aws_secret_access_keystring?
aws_session_tokenstring?
endpoint_urlstring?
consumer_role_arnstring?
from pydantic import BaseModel
from chalk import stream, Features, feature
from chalk.streams import KafkaSource
from chalk.features import features
@features
class User:
id: str
favorite_color: str = feature(max_staleness='30m')
class UserUpdateBody(BaseModel):
user_id: str
favorite_color: str
kafka_source = KafkaSource(
bootstrap_server='kafka.website.com:9092',
topic='user_favorite_color_updates'
)
@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
return User(
id=message.value.user_id,
favorite_color=message.value.favorite_color
)