Feature Engine
Define streaming resolvers with Chalk expressions.
Stream resolvers consume messages from a streaming source — such as Kafka, Kinesis, or PubSub — and write Chalk features in real time. Resolvers are defined with statically-typed expressions that are compiled and executed as vectorized C++, enabling high-throughput processing.
For source configuration (credentials, topic names, late-arrival policies), see Stream Sources. For local and end-to-end testing of resolvers, see Testing Stream Resolvers.
Stream resolvers are defined using the make_stream_resolver() function. The definition takes a message type and a mapping from Chalk features to expressions. In production, raw streaming messages are first converted into the given message type. Then, the output features of the resolver are computed based on the given expressions.
Let’s start with a simple example that processes user data from a Kafka stream:
from chalk import Features, Primary, features
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk import functions as F
from pydantic import BaseModel
import pyarrow as pa
import datetime as dt
# Define the Kafka stream source
kafka_source = KafkaSource(name="user_updates")
# Define the feature class
@features(max_staleness="30d", etl_offline_to_online=True)
class User:
user_id: Primary[int]
full_name: str
email_address: str | None
updated_at: dt.datetime | None
# Define the message schema
class ContactInfo(BaseModel):
phone_number: str
email_address: str | None
class UserMessage(BaseModel):
id: int # maps to user_id
name: str
updated_at: int # epoch microseconds
contact_info: ContactInfo
# Create a stream resolver
users_streaming_resolver = make_stream_resolver(
name="get_users_stream", # The resolver name
source=kafka_source, # Your Kafka stream source
message_type=UsersMessage,
output_features={
User.user_id: _.id,
User.full_name: _.name,
User.email_address: _.contact_info.email_address,
User.updated_at: F.from_unix_seconds(_.updated_at / 1_000_000),
},
)Here, stream messages will come in as JSON strings whose structure matches the definition of the UserMessage model. The output features are defined as simple field accesses into the JSON struct. Note that nested access, such as _.contact_info.email_address, works as expected. Similar to feature definitions, scalar functions can be used to process the raw message fields. Here, we convert a raw integer timestamp in microseconds into a UTC datetime.
Note that the expressions here are essentially the same as those used in feature definitions. However, the semantics are slightly different: The "root" here (_), refers not to a namespace of features, but rather a table column whose datatype is based on the stream resolver's message_type. Thus, _.amount does not mean "get me Chalk feature amount in the current namespace" but rather "access struct field amount on this column." In addition, since these expressions represent projections on single columns, dataframe operations such as _.transactions[_.amount, _.amount > 100].sum() don't apply here.
Here’s what an incoming message might look like:
{
"id": 123,
"name": "John Smith",
"updated_at": 1700000000000000,
"contact_info": {
"phone_number": "555-555-5555",
"email_address": "john.smith@gmail.com",
}
}This message will get parsed into the following data:
User(
id=123,
full_name="John Smith",
email_address="john.smith@gmail.com",
updated_at=datetime.datetime(2023, 11, 14, 22, 13, 20, tzinfo=datetime.timezone.utc)
)The following message types are supported:
BaseModels or python @dataclasses: Incoming string messages will be deserialized from JSON into structs.str or bytes: The incoming messages can also be treated as simple strings or bytes. In this case, your feature expressions can’t access struct fields on the message, but they can use scalar string/binary functions:{
Event.timestamp: _.timestamp # INVALID -- strings don't have an 'age' field
Event.raw_data_cleaned: F.trim(F.lower(_)) # Here, `_` refers to the string message as whole.
Event.timestamp: F.json_value(_, "$.metadata.timestamp") # This parses the input string as JSON and extracts a single field.
}Note: Not all pydantic or protobuf types are supported. The message type needs to be converted into a well-defined PyArrow schema. Types that are recursive and can be arbitrarily nested will fail to be parsed. Similarly, Pydantic types with type annotations such as Any or complex union types are also not supported. To process these types in a stream resolver, use a custom parse function to only extract specific fields with well-known datatypes.
class PhoneInfo(BaseModel):
number: str
area_code: str
class EmailInfo(BaseModel):
address: str
class Parent(BaseModel):
id: int
parent: Parent | None
class UsersMessage(BaseModel):
id: int
account_id: int | str # INVALID -- 'account_id' field's datatype is not well-defined
card_id: int | None # Nullable types OK
contact: PhoneInfo | EmailInfo # INVALID -- complex union types not supported
parent: Parent | None # INVALID -- even if finitely nested in practice, the schema of this can't be determined ahead of time since it contains arbitrarily many sub-columns.
By default, the message parsing logic is inferred automatically from the message_type. For Pydantic types or dataclasses, JSON parsing is used. For protobuf message classes, these are parsed from the protobuf’s binary representation. However, it is also possible to provide custom parsing logic. In addition to handling custom message formats, this also allows the stream resolver to skip a subset of messages. For instance, one might have multiple message types on the same kafka topic, or perhaps a stream of events where only events of type “CREATE” are relevant.
The make_stream_resolver() function has an optional parse argument that takes an expression. If provided, this expression will be used to convert raw message bytes into the expected message type. If the expression returns None, that message will be skipped.
For example, consider the following protobuf definition:
message User {
enum UserType {
USER_TYPE_UNSPECIFIED = 0;
USER_TYPE_GUEST = 1;
USER_TYPE_MEMBER = 2;
}
message ContactInfo {
optional string phone_number = 1;
optional string email_address = 1;
}
uint32 id = 1;
string name = 2;
UserType user_type = 3;
optional ContactInfo contact_info = 4;
uint64 updated_at = 5;
}Let’s say our system produces protobuf messages w/ an unusual format (a 6-byte prefix is added). We would like to strip the prefix before parsing and also ignore messages for users of type “GUEST”. We can use a custom parse expression to do this:
from messages.user_pb2 import UserMessage # Import the generated protobuf classes
from chalk import functions as F
from chalk.features.resolver import make_stream_resolver
# Strip first 6 bytes, then parse the protobuf message
parsed_message = F.proto_deserialize(F.substr(_, 6, None), UserMessage)
# Exclude any UserMessages whose user_type == GUEST
parse_expression = F.if_then_else(
parsed_message.user_type == UserMessage.UserType.GUEST, None, parsed_message
)
users_streaming_resolver = make_stream_resolver(
name="get_users_stream",
source=kafka_source,
message_type=UserMessage,
parse=parse_expression, # Custom parse expression provided here
output_features={
User.user_id: _.id,
User.full_name: _.name,
User.email_address: _.contact_info.email_address,
User.updated_at: F.from_unix_seconds(_.updated_at / 1_000_000),
},
)A single Kafka topic often carries multiple message types — for example, a User event and a Transaction event distinguished by a header like X-Chalk-MessageType. The recommended way to route each type to the right resolver is the message_header_filters argument on make_stream_resolver. Each resolver declares the (header_name, header_value) pairs it wants to consume, and the streaming server drops any message whose Kafka headers don’t match before it is deserialized.
users_streaming_resolver = make_stream_resolver(
name="get_users_stream",
source=kafka_source,
message_type=UserMessage,
output_features={
User.user_id: _.id,
User.full_name: _.name,
},
message_header_filters=[("X-Chalk-MessageType", b"User")],
)
transactions_streaming_resolver = make_stream_resolver(
name="get_transactions_stream",
source=kafka_source, # same topic
message_type=TransactionMessage,
output_features={
Transaction.id: _.id,
Transaction.amount: _.amount,
},
message_header_filters=[("X-Chalk-MessageType", b"Transaction")],
)A few things to keep in mind:
bytes, not strings — use b"User", not "User".(name, value) pair to be processed.Prefer message_header_filters over inspecting headers inside a custom parse expression. Header filtering runs before deserialization, so messages destined for other resolvers are discarded cheaply without ever being parsed.
If your filtering logic needs to look at the message body — not just headers — fall back to a parse expression that returns None for messages you want to skip (see Custom Parse Functions). You can also combine both: pre-filter on headers, then run additional body-level filtering inside parse.
Stream resolvers compile to vectorized C++, so a single CPU core can handle thousands of messages per second, and processing is parallelized across cores automatically. Expressions are validated at compile time, so schema and type mismatches surface before deployment rather than at runtime.
Stream resolvers are ideal for:
For complex business logic that requires external API calls or elaborate error handling, a regular Python @online resolver downstream of a stream resolver is usually a better fit.
Stream resolvers can write computed features to downstream Kafka topics using the Sink parameter. This enables chaining of stream processing stages, where one resolver’s output becomes another resolver’s input. This pattern is particularly useful for:
The sink specifies which output features should be written to the destination stream. These features are computed by the resolver or by downstream resolvers that depend on the initial resolver’s outputs.
This example demonstrates a two-stage pipeline where items are first processed through a stream resolver, then routed to a GPU-accelerated resolver for embedding generation:
from chalk.streams import KafkaSource
from chalk.features.resolver import Sink, make_stream_resolver
from chalk.features import _
from chalk import online
from pydantic import BaseModel
from src.models import Item
import numpy as np
class ItemMessage(BaseModel):
id: int
name: str
price: float
description: str
image_url: str
new_items = KafkaSource(
name="new_items",
)
embedding_stream = KafkaSource(
name="item_embeddings",
)
make_stream_resolver(
name="process_new_items",
source=new_items,
message_type=ItemMessage,
output_features={
Item.id: _.id,
Item.name: _.name,
Item.price: _.price,
Item.description: _.description,
Item.image_url: _.image_url,
},
sink=Sink(
send_to=embedding_stream,
output_features=[Item.id, Item.embedding],
),
)
@online(resource_hint="gpu")
def run_item_embedding_model(
price: Item.price,
description: Item.description,
state: Item.state,
image_url: Item.image_url,
) -> Item.embedding:
"""Mock generate item embedding using a pre-trained model."""
# In a real implementation, this function would load a pre-trained model
# and generate an embedding based on the input features or make an API
# call to an external service.
return np.random.rand(128).tolist()In this example:
process_new_items resolver consumes messages from the new_items Kafka source and extracts basic item featuresItem.id and Item.embedding should be written to the item_embeddings streamItem.embedding by calling the run_item_embedding_model resolver, which runs on a GPU-enabled instanceitem_embeddings topicThis pattern allows the lightweight stream resolver to handle high-throughput message parsing while delegating compute-intensive embedding generation to specialized GPU resources. The sink acts as a bridge, automatically triggering the downstream computation and routing results to the appropriate stream.
Messages can be encoded as either arrow IPC streams or as JSON. The fully qualified feature names are used as the column names in the output stream or as the JSON keys.
Stream resolvers can drop duplicate messages by passing a Deduplication policy to make_stream_resolver. For each incoming message, Chalk evaluates the on expression to compute a deduplication key. If that key has already been seen within the within retention window, the message is skipped — no features are written and no downstream computation is triggered.
from chalk.features.resolver import Deduplication, make_stream_resolver
transactions_streaming_resolver = make_stream_resolver(
name="get_transactions_stream",
source=kafka_source,
message_type=TransactionMessage,
output_features={
Transaction.id: _.id,
Transaction.amount: _.amount,
Transaction.submitted: F.from_unix_seconds(_.submitted / 1_000_000),
},
deduplication=Deduplication(on=_.id, within="24h"),
)The on expression is a regular underscore expression over the parsed message, so the dedup key can be any field or computed value — for example _.id, a composite like F.concat(_.user_id, "-", _.event_id), or a hash. The within argument accepts a Chalk duration string ("24h", "7d") or a datetime.timedelta.
Deduplication is not free: every message incurs an extra round-trip to a key-value store (Redis Lightning or DynamoDB, depending on your online store configuration) to check for and record the dedup key. This adds latency per message and consumes storage proportional to the volume of unique keys seen within the retention window. Consider the tradeoff before enabling deduplication on very high-throughput streams, and keep the within window as short as your duplicate-tolerance requirements allow.
Deduplication is currently supported only when the online store is Redis Lightning or DynamoDB.
If you already store a historical log of your stream messages in a data warehouse, you can disable stream persistence to the Chalk offline store. This can be done by setting the skip_offline parameter of make_stream_resolver:
users_streaming_resolver = make_stream_resolver(
name="get_users_stream",
source=kafka_source,
message_type=UsersMessage,
output_features={
User.user_id: _.id,
User.full_name: _.name,
User.email_address: _.contact_info.email_address,
User.updated_at: F.from_unix_seconds(_.updated_at / 1_000_000),
},
skip_offline=True, # don't write resolved events from stream to offline store
)A stream resolver can be restricted to a specific environment through the environment parameter of make_stream_resolver:
@features(max_staleness="30d", etl_offline_to_online=True)
class Transaction:
transaction_id: Primary[int]
submitted: dt.datetime
amount: int
class TransactionMessage(BaseModel):
id: int # maps to transaction_id
submitted: int # epoch microseconds
amount: int
# new resolver to experiment with new Transaction class
experimental_streaming_resolver = make_stream_resolver(
name="new_transaction_stream_resolver",
source=kafka_source,
message_type=UsersMessage,
output_features={
Transaction.transaction_id: _.id,
Transaction.timestamp: F.from_unix_seconds(_.submitted / 1_000_000),
Transaction.amount: int
},
environments=["dev"], # only deploy resolver in "dev" environment
)Earlier versions of Chalk used the @stream decorator to define streaming resolvers in Python. Existing @stream resolvers continue to run, but new resolvers should use make_stream_resolver — it compiles to vectorized C++ and offers significantly better performance for high-throughput streams.
Most @stream resolvers translate directly to make_stream_resolver by replacing the decorated function body with an output_features mapping:
from chalk.features.resolver import make_stream_resolver
from chalk import stream, Features
# Old: Python @stream decorator
@stream(source=user_stream)
def process_users(message: UsersMessage) -> Features[Users]:
return Users(
user_id=message.id,
has_been_evicted_bool=message.qualification.has_been_evicted,
monthly_income_min=message.qualification.monthly_income.min,
# ... more processing
)
# New: make_stream_resolver
process_users_native = make_stream_resolver(
name="process_users_native",
message_type=UsersMessage,
output_features={
Users.user_id: _.id,
Users.has_been_evicted_bool: _.qualification.has_been_evicted,
Users.monthly_income_min: _.qualification.monthly_income.min,
# ... same mappings using underscore expressions
},
source=user_stream,
)Both resolvers produce identical results.