Build vectorized batch feature computations using chalkdf and static resolvers
This tutorial shows you how to use chalkdf with Chalk’s static=True resolver pattern to compute batch features across multiple entities in a single, vectorized pass.
A static DF resolver, defined with static=True, receives a batch of entities as a DataFrame and returns a DataFrame of results. Static resolvers are the right choice when:
Define feature classes for User and Transaction. User has a has-many relationship to Transaction, and two output features that the static resolver will populate.
from chalk.features import features, DataFrame
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
@features
class User:
id: int
email: str
# Has-many relationship to transactions
transactions: DataFrame[Transaction]
# Output features computed by the static resolver
transaction_count: int
total_spend: floatThe input type annotation declares every feature the resolver needs—including the has-many relationship projected down to the specific columns it uses.
Note that the primary key of the feature class must be in the both the input and output type annotations.
from chalk import online, DataFrame
from chalk.features import _
import chalk.functions as F
from src.features import User, Transaction
@online(static=True)
def compute_transaction_stats(
df: DataFrame[
User.id,
User.transactions[
Transaction.id,
Transaction.user_id,
Transaction.amount,
],
],
) -> DataFrame[User.id, User.transaction_count, User.total_spend]:
# Explode the has-many relationship into one row per transaction
df_exploded = df.explode(str(User.transactions))
# Lift the nested amount field into a top-level column
txns = df_exploded.with_columns({
"amount": _.transactions.amount,
})
# Aggregate per user
stats = txns.agg(
[str(User.id)],
_.count().alias("txn_count"),
_.amount.sum().alias("spend_total"),
)
# Join back to the original df so users with no transactions still appear
return (
df.join(stats, on=[str(User.id)], how="left")
.with_columns({
"user.transaction_count": F.coalesce(_.txn_count, 0),
"user.total_spend": F.coalesce(_.spend_total, 0.0),
})
.select(str(User.id), "user.transaction_count", "user.total_spend")
)A few things worth noting:
df.explode(): Flattens the has-many list into individual rows—one per transaction.with_columns: Lifts a field out of the nested struct into a plain top-level column so it can be used in expressions.agg: Groups by user and computes the count and sum in one pass.df: Ensures every user appears in the output. F.coalesce fills in 0 for users with no transactions.chalkdf ships a Testing class for asserting equality between DataFrames. Because static resolvers take and return plain DataFrame objects, you can test them directly—no network calls or running Chalk environment required.
from chalkdf import DataFrame, Testing
from src.resolvers import compute_transaction_stats
def test_compute_transaction_stats():
transactions_data = [
{"transaction.id": 1, "transaction.user_id": 1, "transaction.amount": 25.00},
{"transaction.id": 2, "transaction.user_id": 1, "transaction.amount": 50.00},
{"transaction.id": 3, "transaction.user_id": 1, "transaction.amount": 75.00},
]
input_df = DataFrame({
"user.id": [1],
"user.transactions": [transactions_data],
})
result_df = compute_transaction_stats(input_df)
expected = DataFrame({
"user.id": [1],
"user.transaction_count": [3],
"user.total_spend": [150.0],
})
Testing.assert_frame_equal(result_df, expected, check_row_order=False)A few details on constructing the input:
"user.id", "user.transactions"."transaction.id", "transaction.amount", etc.check_row_order=False makes the assertion order-independent.Install chalkdf locally with `pip install "chalkdf[chalkpy]"` to run tests without deploying.
Run the test with:
pytest tests/test_resolvers.py -vBeyond computing features on-demand, you can use static resolvers to bulk-ingest historical feature data from parquet files in S3. This is the right pattern when you have existing data—warehouse exports, data lake snapshots, third-party feeds—that you want to make available for training set generation without recomputing it on every query.
Use ChalkDF.scan() to lazily read one or more parquet files, then .select() to rename raw columns to Chalk feature paths. Reusing the same Transaction feature class from above:
from chalk import offline, DataFrame
from chalkdf import DataFrame as ChalkDF
from chalk.features import _
from src.features import Transaction
S3_PATH = "s3://my-bucket/data/transactions/*.parquet"
@offline(static=True)
def ingest_transactions() -> DataFrame[
Transaction.id,
Transaction.user_id,
Transaction.amount,
]:
return (
ChalkDF.scan([S3_PATH])
.select(
_.txn_id.alias("transaction.id"),
_.uid.alias("transaction.user_id"),
_.amt.alias("transaction.amount"),
)
)A few things to note:
ChalkDF.scan(): Accepts a list of S3 URIs—glob patterns like *.parquet are supported. Files are read lazily; only the columns referenced in .select() are fetched..alias(): Maps each raw column name to its Chalk feature path (e.g. "transaction.amount"). The alias must match the dotted feature path exactly..scan() and the final return—filters, type casts, derived columns—before handing the returning the resulting features.filter, join, agg, project, and more in the chalkdf getting started guide