Feature Engine
Locally verify stream resolver parsers and end-to-end scenarios.
Chalk provides three tools for testing stream resolvers, each with a different scope. Pick the one that matches what you want to verify:
| Tool | Scope | When to use |
|---|---|---|
check_stream_parsing | Parser only, local | Fast iteration on the parse expression and output_features mapping |
check_stream_scenario | Full pipeline against a branch, multi-step | Verify materialized windowed aggregates, or behavior that depends on prior online-store state |
test_streaming_resolver | Full resolver against a deployed branch | Sanity-check a deployed resolver against real bytes; inspect the resulting Arrow table |
The parsing logic of a stream resolver can be tested locally via chalk.testing.check_stream_parsing. This verifies that your parse expression and output_features mappings produce the expected results, without deploying to a branch or running the full resolver pipeline.
import datetime
import json
import pyarrow as pa
from pydantic import BaseModel
from chalk import functions as F
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.testing import StreamMessage, check_stream_parsing
class UserMessage(BaseModel):
id: int
name: str
updated_at_us: int
users_streaming_resolver = make_stream_resolver(
name="get_users_streaming",
source=kafka_source,
message_type=UserMessage,
parse=F.struct_pack(
{
"id": F.cast(F.json_value(F.bytes_to_string(_, "utf-8"), "$.id"), pa.int64()),
"name": F.json_value(F.bytes_to_string(_, "utf-8"), "$.name"),
"updated_at_us": F.cast(
F.json_value(F.bytes_to_string(_, "utf-8"), "$.updated_at_us"), pa.int64()
),
}
),
output_features={
User.user_id: _.id,
User.full_name: _.name,
User.updated_at: F.from_unix_seconds(_.updated_at_us / 1_000_000),
},
)
check_stream_parsing(
users_streaming_resolver,
[
StreamMessage(
message=json.dumps(
{"id": 123, "name": "John Smith", "updated_at_us": 1_700_000_000_000_000}
).encode(),
parsed=User(
user_id=123,
full_name="John Smith",
updated_at=datetime.datetime(
2023, 11, 14, 22, 13, 20, tzinfo=datetime.timezone.utc
),
),
),
StreamMessage(
message=json.dumps(
{"id": 456, "name": "Alice Liddell", "updated_at_us": 1_710_000_000_000_000}
).encode(),
parsed=User(
user_id=456,
full_name="Alice Liddell",
updated_at=datetime.datetime(
2024, 3, 9, 16, 0, 0, tzinfo=datetime.timezone.utc
),
),
),
],
)Each StreamMessage contains the raw message bytes and the expected parsed feature outputs. If a message should be skipped, set parsed=None.
check_stream_parsing verifies the parser in isolation, but it does not exercise the rest of the streaming pipeline: feature uploads, materialized windowed aggregates, or any computation that depends on prior online-store state. To test those behaviors, use ChalkClient.check_stream_scenario, which runs the full pipeline against a branch.
The method accepts a sequence of step objects that are processed left-to-right:
StreamMessage(message=..., stream_resolver=..., timestamp=...) — parses message locally using the resolver’s parse expression (the same path as check_stream_parsing), then uploads the resulting features against branch with materialized-aggregate updates enabled. Set timestamp to control the feature time of the uploaded row (useful for out-of-order or windowed tests).UploadFeatures({Feature: value_or_list, ...}) — primes the branch online store directly without going through a resolver. Each value can be a scalar (uploaded as one row) or a list (bulk upload, one row per element). All features in a single UploadFeatures step must agree on row count. This path does not refresh materialized aggregates.FeatureAssertion(input=..., output=...) — runs an online query whose inputs are the set fields of input and whose requested outputs are the set fields of output, then compares each returned value against the expected value. A mismatch raises AssertionError. For Windowed[...] features, pass a {window: value} dict on either side, e.g. transaction_count={"1d": 1, "30d": 7}, and each bucket is checked independently.check_stream_scenario parses each StreamMessage locally via the same chalkdf-backed path as check_stream_parsing, so the chalkdf package must be installed in the test environment.
Uploads and assertions run against a Chalk branch, so the target environment must have a branch online store configured. Without it, the uploads produced by each StreamMessage and UploadFeatures step have nowhere to land and the subsequent FeatureAssertion queries will not see them.
The example below seeds three merchant rows, then drives a transaction stream and a user-creation stream, and finally asserts that the windowed transaction_count aggregate reflects the single transaction processed:
from datetime import datetime
from chalk.client import ChalkClient
from chalk.testing import FeatureAssertion, StreamMessage, UploadFeatures
ChalkClient().check_stream_scenario(
UploadFeatures({
Merchant.id: [501, 502, 503],
Merchant.category: ["grocery", "restaurant", "fuel"],
}),
StreamMessage(
timestamp=datetime(2026, 1, 1, 12, 20),
message=b'{"id": 1, "user_id": 1, "merchant_id": 501, "amount": 10.0}',
stream_resolver=transaction_stream,
),
StreamMessage(
timestamp=datetime(2026, 1, 1, 12, 20),
message=b'{"id": 1, "name": "Bartleby"}',
stream_resolver=create_user,
),
FeatureAssertion(
input=User(id=1),
output=User(name="Bartleby", transaction_count={"1d": 1}),
),
branch="my-branch",
)For the common case where you just want to prime the online store before the first stream message, pass seed_online_store= instead of writing an explicit UploadFeatures step. It is equivalent to prepending one UploadFeatures step at the head of the scenario:
ChalkClient().check_stream_scenario(
StreamMessage(
message=b'{"user_id": 1, "amount": 10.0}',
stream_resolver=transaction_stream,
timestamp=datetime(2026, 1, 1, 12, 20),
),
FeatureAssertion(
input=User(id=1),
output=User(transaction_count={"1d": 1}),
),
seed_online_store={User.id: [1, 2, 3], User.name: ["Alice", "Bob", "Carol"]},
branch="my-branch",
)Other useful parameters:
show_table=True — print the Chalk Feature Value Check Table for every FeatureAssertion step, not only on failure. On a mismatch the table is always printed, regardless of this flag.float_rel_tolerance / float_abs_tolerance — tolerances applied to float comparisons inside FeatureAssertion checks (defaults 1e-6 and 1e-12). Values pass if either tolerance is satisfied.environment — pin the scenario to a specific Chalk environment; otherwise the client’s default is used.To test against a deployed branch, use ChalkClient.test_streaming_resolver:
from chalk.client import ChalkClient
sample_messages = [
'{"id": 123, "name": "John Smith"}',
'{"id": 456, "name": "Alice Liddell"}',
]
client = ChalkClient()
# 1. Test messages against production resolver:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages)
# 'test_streaming_resolver' returns a signed URL to a parquet file in cloud storage -- `result.features` downloads this and extracts the table.
print(result.features)
# 2. Test messages against a stream resolver on a branch:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages, branch="new_users_stream_logic")
print(result.features)
# 3. Create a new resolver and test it:
new_users_resolver = make_stream_resolver(
source=...,
name="get_users_streaming_new",
message_type=...,
output_features={...},
)
result = client.test_streaming_resolver(resolver=new_users_resolver, message_bodies=sample_messages)
print(result.features)