Chalk home page
Docs
SDK
CLI
  1. Streaming
  2. Native Streaming Resolvers

Native Streaming

Native streaming resolvers provide a way to process streaming data using statically-defined expressions instead of Python functions. Similar to Chalk’s expression-based feature definitions, these expressions are compiled and executed as vectorized C++, enabling high-throughput processing of stream messages.

Overview

Native streaming resolvers can be defined using the make_stream_resolver() function. The definition takes a message type and a mapping from Chalk features to expressions. In production, raw streaming messages are first converted into the given message type. Then, the output features of the resolver are computed based on the given expressions.

Basic Example

Let’s start with a simple example that processes user data from a Kafka stream:

from chalk import Features, Primary, features
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk import functions as F
from pydantic import BaseModel
import pyarrow as pa
import datetime as dt

# Define the Kafka stream source
kafka_source = KafkaSource(
    bootstrap_server='kafka.website.com:9092',
    topic='user_updates_topic'
)

# Define the feature class
@features(max_staleness="30d", etl_offline_to_online=True)
class User:
    user_id: Primary[int]
    full_name: str
    email_address: str | None
    updated_at: dt.datetime | None

# Define the message schema
class ContactInfo(BaseModel):
    phone_number: str
    email_address: str | None

class UserMessage(BaseModel):
    id: int  # maps to user_id
    name: str
    updated_at: int  # epoch microseconds
    contact_info: ContactInfo


# Create a native streaming resolver
users_streaming_resolver = make_stream_resolver(
    name="get_users_stream", # The resolver name
    source=kafka_source,  # Your Kafka stream source
    message_type=UsersMessage,
    output_features={
        User.user_id: _.id,
        User.full_name: _.name,
        User.email_address: _.contact_info.email_address,
        User.updated_at: F.from_unix_seconds(_.updated_at / 1_000_000),
    },
)

Here, stream messages will come in as JSON strings whose structure matches the definition of the UserMessage model. The output features are defined as simple field accesses into the JSON struct. Note that nested access, such as _.contact_info.email_address, works as expected. Similar to feature definitions, scalar functions can be used to process the raw message fields. Here, we convert a raw integer timestamp in microseconds into a UTC datetime.

Note that the expressions here are are essentially the same as those used in feature definitions. However, the semantics are slightly different: The "root" here (`_`), refers not to a namespace of features, but rather a table column whose datatype is based on the stream resolver's `message_type`. Thus, `_.amount` does not mean "get me Chalk feature `amount` in the current namespace" but rather "access struct field `amount` on this column." In addition, since these expressions represent projections on single columns, dataframe operations such as `_.transactions[_.amount, _.amount > 100].sum()` don't apply here.

Example Stream Messages

Here’s what an incoming message might look like:

{
  "id": 123,
  "name": "John Smith",
  "updated_at": 1700000000000000,
  "contact_info": {
    "phone_number": "555-555-5555",
    "email_address": "john.smith@gmail.com",
  }
}

This message will get parsed into the following data:

User(
    id=123, 
    full_name="John Smith",
    email_address="john.smith@gmail.com",
    updated_at=datetime.datetime(2023, 11, 14, 22, 13, 20, tzinfo=datetime.timezone.utc)
)

Supported message types

The following message types are supported:

  • Pydantic BaseModels or python @dataclasses: Incoming string messages will be deserialized from JSON into structs.
  • Protocol buffer messages: The incoming binary messages will be deserialized into structs with the various fields.
  • str or bytes: The incoming messages can also be treated as simple strings or bytes. In this case, your feature expressions can’t access struct fields on the message, but they can use scalar string/binary functions:
{
    Event.timestamp: _.timestamp #  INVALID -- strings don't have an 'age' field
    Event.raw_data_cleaned:  F.trim(F.lower(_)) # Here, `_` refers to the string message as whole. 
    Event.timestamp: F.json_value(_, "$.metadata.timestamp") # This parses the input string as JSON and extracts a single field.
}

Note: Not all pydantic or protobuf types are supported. The message type needs to be converted into a well-defined PyArrow schema. Types that are recursive and can be arbitrariliy nested will fail to be parsed. Similarly, Pydantic types with type annotations such as `Any` or complex union types are also not supported. To process these types in a native streaming resolver, use a custom parse function to only extract specific fields with well-known datatypes.


class PhoneInfo(BaseModel): 
    number: str
    area_code: str

class EmailInfo(BaseModel): 
    address: str

class Parent(BaseModel):
    id: int
    parent: Parent | None

class UsersMessage(BaseModel):
    id: int
    account_id: int | str # INVALID -- 'account_id' field's datatype is not well-defined
    card_id: int | None # Nullable types OK
    contact: PhoneInfo | EmailInfo # INVALID -- complex union types not supported
    parent: Parent | None # INVALID -- even if finitely nested in practice, the schema of this can't be determined ahead of time since it contains arbitrarily many sub-columns.

Custom Parse Functions

By default, the message parsing logic is inferred automatically from the message_type. For Pydantic types or dataclasses, JSON parsing is used. For protobuf message classes, these are parsed from the protobuf’s binary representation. However, it is also possible to provide customer parsing logic. In addition to handling custom message formats, this also allows the stream resolver to skip a subset of messages. For instance, one might have multiple message types on the same kafka topic, or perhaps a stream of events where only events of type “CREATE” are relevant.

The make_stream_resolver() function has an optional parse argument that takes an expression. If provided, this expression will be used to convert raw message bytes into the expected message type. If the expression returns None, that message will be skipped.

For example, consider the following protobuf definition:

message User {
  enum UserType {
    USER_TYPE_UNSPECIFIED = 0;
    USER_TYPE_GUEST = 1;
    USER_TYPE_MEMBER = 2;
  }

  message ContactInfo {
    optional string phone_number = 1;
    optional string email_address = 1;
  }

  uint32 id = 1; 
  string name = 2;
  UserType user_type = 3;
  optional ContactInfo contact_info = 4;
  uint64 updated_at = 5;
}

Let’s say our system produces protobuf messages w/ an unusual format (a 6-byte prefix is added). We would like to strip the prefix before parsing and also ignore messages for users of type “GUEST”. We can use a custom parse expression to do this:

from messages.user_pb2 import UserMessage # Import the generated protobuf classes

# Strip first 6 bytes, then parse the protobuf message
parsed_message = F.proto_deserialize(F.substr(_, 6, None), UserMessage)
# Exlude any UserMessages whos user_type == GUEST
parse_expression = F.if_then_else(
    parsed_message.user_type == UserMessage.UserType.GUEST, None, parsed_message
)

users_streaming_resolver = make_stream_resolver(
    name="get_users_stream",
    source=kafka_source,
    message_type=UserMessage,
    parse=parse_expression, # Custom parse expression provided here
    output_features={
        User.user_id: _.id,
        User.full_name: _.name,
        User.email_address: _.contact_info.email_address,
        User.updated_at: F.from_unix_seconds(_.updated_at / 1_000_000),
    },
)

Testing Native Streaming Resolvers

Running native streaming resolvers locally is currently not supported. However, native streaming resolvers can be tested with sample messages from a notebook or client against the branch server or a production streaming server. In addition, native streaming resolvers defined in notebooks can be uploaded to Chalk servers for testing, without affecting the production deployment.

from chalk.client import ChalkClient 

sample_messages = [
    '{"id": 123, "name": "John Smith"}',
    '{"id": 456, "name": "Alice Liddell"}',
]

client = ChalkClient()

# 1. Test messages against production resolver:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages)
# 'test_streaming_resolver' returns a signed URL to a parquet file in cloud storage -- `result.features` downloads this and extracts the table. 
print(result.features)

# 2. Test messages against a stream resolver on a branch:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages, branch="new_users_stream_logic")
print(result.features)

# 3. Create a new resolver and test it:
new_users_resolver = make_streaming_resolver(
    source=...,
    name="get_users_streaming_new", 
    message_type=...,
    output_features={...},
)
result = client.test_streaming_resolver(resolver=new_users_resolver, message_bodies=sample_messages)
print(result.features)

Performance Considerations

Benefits of Native Streaming

  1. High Throughput: Process thousands of messages per second per CPU core
  2. Low Latency: Rust-based processing eliminates Python GIL overhead
  3. Parallel Processing: Automatic parallelization across CPU cores
  4. Type Safety: Compile-time validation of expressions

When to Use Native Streaming

Native streaming is ideal for:

  • High-volume data ingestion (>1000 messages/second)
  • Low-latency feature computation (<10ms)
  • Simple to moderate transformations
  • Stateless processing

Consider traditional Python resolvers for:

  • Complex business logic requiring external API calls
  • Complex error handling and recovery logic

Migration from Traditional Streaming

If you have an existing Python streaming resolver, you can create an equivalent native version:

# Traditional Python resolver
@stream(source=user_stream)
def process_users(message: UsersMessage) -> Features[Users]:
    return Users(
        user_id=message.id,
        has_been_evicted_bool=message.qualification.has_been_evicted,
        monthly_income_min=message.qualification.monthly_income.min,
        # ... more processing
    )

# Equivalent native streaming resolver
process_users_native = make_stream_resolver(
    name="process_users_native",
    message_type=UsersMessage,
    output_features={
        Users.user_id: _.id,
        Users.has_been_evicted_bool: _.qualification.has_been_evicted,
        Users.monthly_income_min: _.qualification.monthly_income.min,
        # ... same mappings using underscore expressions
    },
    source=user_stream,
)

Both resolvers produce identical results, but the native version offers significantly better performance for high-throughput scenarios.