Streaming
Consuming Kafka streams.
Chalk supports Kafka, Kinesis, and PubSub as streaming sources
for @stream
resolvers.
You can configure the Kafka-, Kinesis-, or PubSub-specific options either explicitly or through the Chalk dashboard.
If configuring through the dashboard, your Kafka/Kinesis integration must be given a name. You can then reference this name in your stream configuration.
namestring
You can instead choose to configure the source in your code directly for the following source types.
bootstrap_serverstring | string[]
topicstring | string[]
ssl_keystore_locationstring?
client_id_prefixstring?
group_id_prefixstring?
security_protocol'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'
sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'
sasl_usernamestring?
sasl_passwordstring?
stream_namestring?
stream_arnstring?
region_namestring
aws_access_key_idstring?
aws_secret_access_keystring?
aws_session_tokenstring?
endpoint_urlstring?
consumer_role_arnstring?
project_idstring?
subscription_idstring?
from pydantic import BaseModel
from chalk import stream, feature
from chalk.streams import KafkaSource
from chalk.features import features, Features
@features
class User:
id: str
favorite_color: str = feature(max_staleness='30m')
class UserUpdateBody(BaseModel):
user_id: str
favorite_color: str
kafka_source = KafkaSource(
bootstrap_server='kafka.website.com:9092',
topic='user_favorite_color_updates'
)
@stream(source=kafka_source)
def fn(message: UserUpdateBody) -> Features[User.uid, User.favorite_color]:
return User(
id=message.value.user_id,
favorite_color=message.value.favorite_color
)