Chalk provides three tools for testing stream resolvers, each with a different scope. Pick the one that matches what you want to verify:

ToolScopeWhen to use
check_stream_parsingParser only, localFast iteration on the parse expression and output_features mapping
check_stream_scenarioFull pipeline against a branch, multi-stepVerify materialized windowed aggregates, or behavior that depends on prior online-store state
test_streaming_resolverFull resolver against a deployed branchSanity-check a deployed resolver against real bytes; inspect the resulting Arrow table

Local parser checks with check_stream_parsing

The parsing logic of a stream resolver can be tested locally via chalk.testing.check_stream_parsing. This verifies that your parse expression and output_features mappings produce the expected results, without deploying to a branch or running the full resolver pipeline.

import datetime
import json
import pyarrow as pa
from pydantic import BaseModel

from chalk import functions as F
from chalk.features import _
from chalk.features.resolver import make_stream_resolver
from chalk.testing import StreamMessage, check_stream_parsing


class UserMessage(BaseModel):
    id: int
    name: str
    updated_at_us: int


users_streaming_resolver = make_stream_resolver(
    name="get_users_streaming",
    source=kafka_source,
    message_type=UserMessage,
    parse=F.struct_pack(
        {
            "id": F.cast(F.json_value(F.bytes_to_string(_, "utf-8"), "$.id"), pa.int64()),
            "name": F.json_value(F.bytes_to_string(_, "utf-8"), "$.name"),
            "updated_at_us": F.cast(
                F.json_value(F.bytes_to_string(_, "utf-8"), "$.updated_at_us"), pa.int64()
            ),
        }
    ),
    output_features={
        User.user_id: _.id,
        User.full_name: _.name,
        User.updated_at: F.from_unix_seconds(_.updated_at_us / 1_000_000),
    },
)

check_stream_parsing(
    users_streaming_resolver,
    [
        StreamMessage(
            message=json.dumps(
                {"id": 123, "name": "John Smith", "updated_at_us": 1_700_000_000_000_000}
            ).encode(),
            parsed=User(
                user_id=123,
                full_name="John Smith",
                updated_at=datetime.datetime(
                    2023, 11, 14, 22, 13, 20, tzinfo=datetime.timezone.utc
                ),
            ),
        ),
        StreamMessage(
            message=json.dumps(
                {"id": 456, "name": "Alice Liddell", "updated_at_us": 1_710_000_000_000_000}
            ).encode(),
            parsed=User(
                user_id=456,
                full_name="Alice Liddell",
                updated_at=datetime.datetime(
                    2024, 3, 9, 16, 0, 0, tzinfo=datetime.timezone.utc
                ),
            ),
        ),
    ],
)

Each StreamMessage contains the raw message bytes and the expected parsed feature outputs. If a message should be skipped, set parsed=None.


End-to-end scenarios with check_stream_scenario

check_stream_parsing verifies the parser in isolation, but it does not exercise the rest of the streaming pipeline: feature uploads, materialized windowed aggregates, or any computation that depends on prior online-store state. To test those behaviors, use ChalkClient.check_stream_scenario, which runs the full pipeline against a branch.

The method accepts a sequence of step objects that are processed left-to-right:

  • StreamMessage(message=..., stream_resolver=..., timestamp=...) — parses message locally using the resolver’s parse expression (the same path as check_stream_parsing), then uploads the resulting features against branch with materialized-aggregate updates enabled. Set timestamp to control the feature time of the uploaded row (useful for out-of-order or windowed tests).
  • UploadFeatures({Feature: value_or_list, ...}) — primes the branch online store directly without going through a resolver. Each value can be a scalar (uploaded as one row) or a list (bulk upload, one row per element). All features in a single UploadFeatures step must agree on row count. This path does not refresh materialized aggregates.
  • FeatureAssertion(input=..., output=...) — runs an online query whose inputs are the set fields of input and whose requested outputs are the set fields of output, then compares each returned value against the expected value. A mismatch raises AssertionError. For Windowed[...] features, pass a {window: value} dict on either side, e.g. transaction_count={"1d": 1, "30d": 7}, and each bucket is checked independently.

check_stream_scenario parses each StreamMessage locally via the same chalkdf-backed path as check_stream_parsing, so the chalkdf package must be installed in the test environment.

Uploads and assertions run against a Chalk branch, so the target environment must have a branch online store configured. Without it, the uploads produced by each StreamMessage and UploadFeatures step have nowhere to land and the subsequent FeatureAssertion queries will not see them.

The example below seeds three merchant rows, then drives a transaction stream and a user-creation stream, and finally asserts that the windowed transaction_count aggregate reflects the single transaction processed:

from datetime import datetime

from chalk.client import ChalkClient
from chalk.testing import FeatureAssertion, StreamMessage, UploadFeatures

ChalkClient().check_stream_scenario(
    UploadFeatures({
        Merchant.id: [501, 502, 503],
        Merchant.category: ["grocery", "restaurant", "fuel"],
    }),
    StreamMessage(
        timestamp=datetime(2026, 1, 1, 12, 20),
        message=b'{"id": 1, "user_id": 1, "merchant_id": 501, "amount": 10.0}',
        stream_resolver=transaction_stream,
    ),
    StreamMessage(
        timestamp=datetime(2026, 1, 1, 12, 20),
        message=b'{"id": 1, "name": "Bartleby"}',
        stream_resolver=create_user,
    ),
    FeatureAssertion(
        input=User(id=1),
        output=User(name="Bartleby", transaction_count={"1d": 1}),
    ),
    branch="my-branch",
)

For the common case where you just want to prime the online store before the first stream message, pass seed_online_store= instead of writing an explicit UploadFeatures step. It is equivalent to prepending one UploadFeatures step at the head of the scenario:

ChalkClient().check_stream_scenario(
    StreamMessage(
        message=b'{"user_id": 1, "amount": 10.0}',
        stream_resolver=transaction_stream,
        timestamp=datetime(2026, 1, 1, 12, 20),
    ),
    FeatureAssertion(
        input=User(id=1),
        output=User(transaction_count={"1d": 1}),
    ),
    seed_online_store={User.id: [1, 2, 3], User.name: ["Alice", "Bob", "Carol"]},
    branch="my-branch",
)

Other useful parameters:

  • show_table=True — print the Chalk Feature Value Check Table for every FeatureAssertion step, not only on failure. On a mismatch the table is always printed, regardless of this flag.
  • float_rel_tolerance / float_abs_tolerance — tolerances applied to float comparisons inside FeatureAssertion checks (defaults 1e-6 and 1e-12). Values pass if either tolerance is satisfied.
  • environment — pin the scenario to a specific Chalk environment; otherwise the client’s default is used.

Testing against a deployed branch with test_streaming_resolver

To test against a deployed branch, use ChalkClient.test_streaming_resolver:

from chalk.client import ChalkClient

sample_messages = [
    '{"id": 123, "name": "John Smith"}',
    '{"id": 456, "name": "Alice Liddell"}',
]

client = ChalkClient()

# 1. Test messages against production resolver:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages)
# 'test_streaming_resolver' returns a signed URL to a parquet file in cloud storage -- `result.features` downloads this and extracts the table.
print(result.features)

# 2. Test messages against a stream resolver on a branch:
result = client.test_streaming_resolver(resolver="get_users_streaming", message_bodies=sample_messages, branch="new_users_stream_logic")
print(result.features)

# 3. Create a new resolver and test it:
new_users_resolver = make_stream_resolver(
    source=...,
    name="get_users_streaming_new",
    message_type=...,
    output_features={...},
)
result = client.test_streaming_resolver(resolver=new_users_resolver, message_bodies=sample_messages)
print(result.features)