Queries
Write feature values directly to Chalk's online store.
upload_features lets you write feature values directly to the online store and, optionally,
update materialized windowed aggregations. This is useful for bootstrapping the online store
from an external pipeline, pushing real-time events, or syncing data computed outside of Chalk.
ChalkGRPCClient.upload_features is the recommended way to upload features. It is a
synchronous, blocking call that accepts bulk data in a single request and offers
significantly higher throughput than the HTTP API.
import pyarrow as pa
from chalk.client import ChalkGRPCClient
from chalk.features import features, Primary
@features
class Transaction:
id: Primary[str]
amount: float
merchant: str
client = ChalkGRPCClient()
# Option 1: mapping of feature → list of values
response = client.upload_features(
inputs={
Transaction.id: ["txn_1", "txn_2"],
Transaction.amount: [49.99, 12.50],
Transaction.merchant: ["Acme", "Globex"],
}
)
# Option 2: Arrow Table or RecordBatch
batch = pa.RecordBatch.from_pydict({
"transaction.id": ["txn_3"],
"transaction.amount": [7.00],
"transaction.merchant": ["Initech"],
})
response = client.upload_features(inputs=batch)The inputs parameter accepts:
Table or RecordBatchDataFrameAdditional parameters:
| Parameter | Type | Description |
|---|---|---|
inputs | mapping / DataFrame / Arrow | Feature data to upload |
request_timeout | float \| None | Network-level timeout in seconds |
headers | mapping / sequence | Extra headers forwarded with the request |
The call blocks until the server acknowledges the write and returns an UploadFeaturesResponse
containing any errors that occurred.
By default, upload_features writes to the online store only. You can control this
behaviour — as well as materialized aggregation updates — via keyword arguments:
| Parameter | Default | Description |
|---|---|---|
write_online | True | Write feature values to the online store |
write_offline | False | Write feature values to the offline store |
update_mataggs | False | Update materialized windowed aggregations |
When update_mataggs=True, Chalk will recompute materialized windowed aggregation buckets
from the uploaded child records rather than just storing the raw feature values. This is
the primary way to keep pre-aggregated features (e.g. sum, count, approx_top_k over
a time window) up to date without running a full resolver.
The uploaded records must include a FeatureTime column so Chalk knows which aggregation
buckets to update.
from datetime import datetime, timezone
import pyarrow as pa
from chalk.client import ChalkGRPCClient
from chalk import DataFrame, FeatureTime, Primary, windowed
from chalk.features import _, features
@features
class Transaction:
id: Primary[int]
user_id: "User.id"
amount: float
ts: FeatureTime
@features
class User:
id: Primary[int]
transactions: DataFrame[Transaction]
total_spend: "Windowed[float]" = windowed(
"7d", "30d",
materialization={"bucket_duration": "1d"},
expression=_.transactions[_.amount].sum(),
)
client = ChalkGRPCClient()
now = datetime.now(timezone.utc)
batch = pa.RecordBatch.from_pydict({
"transaction.id": [101, 102, 103],
"transaction.user_id": [42, 42, 42],
"transaction.amount": [10.0, 25.0, 5.0],
"transaction.ts": [now, now, now],
})
response = client.upload_features(inputs=batch, update_mataggs=True)After this call, User.total_spend["7d"] and User.total_spend["30d"] will reflect the
newly uploaded transactions for user 42.
ChalkClient.upload_features sends a single record over HTTP and writes to both the
online and offline stores. It is lower throughput than gRPC and does not support the
UploadFeaturesOptions described above.
from chalk.client import ChalkClient
ChalkClient().upload_features(
input={
Transaction.id: 104,
Transaction.amount: 22.00,
}
)For multiple records over HTTP, use ChalkClient.multi_upload_features, which accepts a
list of mappings, a column-oriented mapping, or a pandas/polars/chalk DataFrame. Both
HTTP methods write to the online and offline stores.
ChalkGRPCClient.upload_features_bulk is a separate endpoint designed specifically for
updating materialized windowed aggregations. Use it when you need to push raw child
records that feed into pre-aggregated bucket values rather than uploading finished
feature values directly.