This reference documents the Chalk DataFrame API for offline feature computation and data processing.
Lightweight DataFrame wrapper around Chalk's execution engine.
The DataFrame class constructs query plans backed by libchalk and
can materialize them into Arrow tables. Operations build a lazy query plan
that executes only when you call run or to_arrow.
Column expressions can be written with _ (underscore) attribute syntax
or using functions. See the
Python function reference for the full list.
Logical representation of tabular data for query operations.
DataFrame provides a lazy evaluation model where operations build up a query
plan that executes only when materialized via run or
to_arrow. Each operation returns a new DataFrame, leaving the
original unchanged.
Most users should use class methods like from_dict,
from_arrow, or scan to create DataFrames rather than
calling the constructor directly.
Column expressions use _ (underscore) or F function syntax.
from chalkdf import DataFrame
from chalk.features import _
import chalk.functions as F
df = DataFrame.from_dict({"x": [1, 2, 3], "price": [10.0, 20.0, 30.0]})
# Underscore syntax
doubled = df.with_columns({"x2": _.x * 2})
# F function syntax
capped = df.with_columns({"price": F.least(_.price, 25.0)})
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
filtered = df.filter(_.x > 1)
result = filtered.run()
Create a schema-only placeholder DataFrame for a named table.
The returned DataFrame contains no data; it is a logical reference
that must be supplied with actual Arrow data at execution time via the
tables argument of run or to_arrow. This is
useful when you want to build a reusable query plan against a
well-known schema and inject different data at runtime.
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})
Load a Chalk offline-query dataset's parquet output as a DataFrame.
Resolves a Chalk dataset revision to its signed parquet output URLs
using the server-streaming
DatasetMetadataService.StreamDatasetRevisionDownloadLinks RPC,
downloads the parquet partitions, and returns a materialized DataFrame
backed by the concatenated tables.
Exactly one of dataset_id or revision_id must be provided. When
dataset_id is given, the dataset's most recent revision is loaded.
from chalkdf import DataFrame
df = DataFrame.from_dataset(revision_id="rev_abc123")
Create a DataFrame from a Python async generator function.
This method allows you to create a DataFrame by streaming data from a custom Python async generator. The generator can yield data as PyArrow RecordBatches, pydicts, or pylists, and the method will handle conversion and schema alignment automatically. If the UDF yields an invalid batch, no further batches will be processed.
An async generator function that yields data batches. Each yielded value
can be a pyarrow.RecordBatch, a dictionary (will be converted using
pyarrow.RecordBatch.from_pydict), or a list (will be converted using
pyarrow.RecordBatch.from_pylist). The generator should yield None
or complete iteration to signal completion.
The expected PyArrow schema for the output data. If yielded batches have columns in a different order, they will be automatically reordered to match this schema.
Maximum time in seconds to wait for the output handler to accept each
batch. Prevents deadlocks when the consumer is blocked. Default is 300
seconds (5 minutes). Set to None to disable timeout (not recommended).
If sending a batch to the output handler exceeds the timeout.
import pyarrow as pa
from chalkdf import DataFrame
async def generate_data():
for i in range(3):
yield {"x": [i * 10, i * 10 + 1], "y": [i, i]}
schema = pa.schema([("x", pa.int64()), ("y", pa.int64())])
df = DataFrame.from_python_udf(generate_data, schema)
result = df.run()
# Example with PyArrow RecordBatches
async def generate_batches():
batch1 = pa.RecordBatch.from_pydict({"a": [1, 2], "b": [3, 4]})
batch2 = pa.RecordBatch.from_pydict({"a": [5, 6], "b": [7, 8]})
yield batch1
yield batch2
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
df = DataFrame.from_python_udf(generate_batches, schema)
# Example with custom timeout
df = DataFrame.from_python_udf(generate_data, schema, output_timeout=60.0)
Scan files and return a DataFrame.
Currently supports CSV (with headers), Parquet, Delta, and Iceberg.
Schema of the data. Required for CSV files, optional for Parquet. For Delta and Iceberg, inferred from table metadata when omitted.
Scan inference mode:
"auto": infer scan type from the URI/path. Uses suffixes for CSV/Parquet and
can also recognize Delta table roots."hive": expand Hive/glob paths without Delta inference fallback."delta": treat the input as a Delta table root (requires exactly one URI)."iceberg": treat the input as an Iceberg table root (requires exactly one URI).
To read from a catalog instead of file system, use scan_iceberg.Optional Bernoulli row-level sampling rate in (0, 1]. For example, 0.01
yields ~1% of rows. Passed through to the Velox Hive connector's
sampleRate; sampling is non-uniform and skips rows during read (it
does not skip files or row groups, so I/O is not reduced proportionally).
None disables sampling.
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)
# Scan ~1% of rows
df = DataFrame.scan(["data/big.parquet"], row_sample=0.01)
Scan an Iceberg table that is registered in a catalog.
storage_options follows the Apache Iceberg catalog + FileIO
properties spec — the same key namespace accepted by PyIceberg,
iceberg-rust, and Trino. See the full list of standard keys at
https://iceberg.apache.org/docs/latest/configuration/.
Only type='glue' (AWS Glue catalog) is supported today. Example
for Glue::
{
"type": "glue",
"glue.id": "123456789012",
"region_name": "us-east-1",
"client.assume-role.arn": "arn:aws:iam::123456789012:role/chalk-reader",
"warehouse": "s3://my-bucket/warehouse",
}
client.assume-role.arn is optional; when absent, ambient AWS
credentials are used (instance profile / IRSA / env vars / ~/.aws).
Apache Iceberg catalog + FileIO properties. None uses the
ambient catalog configuration from the host engine's environment.
Pin the scan to a specific snapshot id. None selects the
current snapshot at plan time.
Scan a Delta Lake table by URI or Unity Catalog three-part name.
Object-store and/or Unity Catalog configuration forwarded to the
underlying object_store. None falls back to ambient credentials
resolved from process env.
To read from a Databricks Unity Catalog–registered table, pass:
type='unity'unity.workspace_url='https://<workspace>.cloud.databricks.com'unity.token='<PAT or service-principal bearer token>'unity.operation='READ' (optional, default READ)When type='unity', table is interpreted as the three-part
identifier catalog.schema.table and the resolver looks up the
actual storage location (S3/GCS/ABFSS) via the Unity Catalog REST
API. Short-lived cloud credentials are vended via the
temporary-table-credentials endpoint and forwarded to
object_store automatically.
When type is omitted, table must be a filesystem URI (e.g.
s3://bucket/delta/path, file:///abs/path, or a local path),
and storage_options may carry explicit cloud credential keys
(aws_access_key_id, azure_storage_sas_token, etc.).
Write this DataFrame to an Iceberg table.
Iceberg catalog + FileIO properties. See scan_iceberg.
None uses ambient configuration from the host engine's environment.
A passthrough DataFrame (same data as input); run it to execute the write.
Create a DataFrame from the result of executing a SQL query (DuckDB dialect).
Pass DataFrames or Arrow tables as keyword arguments to make them
available as named tables inside the query. If no keyword arguments
are provided, from_sql will attempt to auto-register any
DataFrames found in the calling scope.
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
"SELECT order_id, amount FROM orders WHERE amount > 8",
orders=orders,
)
Join two DataFrames with SQL:
users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
"SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
users=users,
purchases=purchases,
)
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Create a DataFrame by executing a SQL SELECT and scanning the resulting parquet files.
The connection wraps the SELECT in dialect-specific EXPORT DATA / COPY INTO
syntax, writes parquet to output_uri_prefix, and reads the files via a
deferred TableScan node. Filter and projection pushdown is applied
automatically when the DataFrame is composed with .filter() or
.project() before execution.
URI prefix where the exported parquet output will be written
(e.g., gs://bucket/path/ or s3://bucket/prefix/). Alternatively, can be something that
the SQL operation can unload to, such as a stage in Snowflake defined via CREATE STAGE.
import pyarrow as pa
from chalkdf import DataFrame
from libchalk.sql import ConnectionPool
from libchalk.sql.bigquery import make_bigquery_connection_factory
factory = make_bigquery_connection_factory(project_id="my-project")
pool = ConnectionPool(factory, max_pool_size=1)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan_from_sql(
"SELECT user_id, name FROM `my_dataset.users`",
pool=pool,
output_uri_prefix="gs://my-bucket/unload-output/",
schema=schema,
)
Create a DataFrame by pulling messages from a streaming source.
This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n
messages, returning them as a DataFrame.
A streaming source configuration. Can be one of:
KafkaSource: Kafka topic configurationKinesisSource: Kinesis stream configurationPubSubSource: Google PubSub subscription configurationfrom chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
name="my_kafka",
bootstrap_server="localhost:9092",
topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)
Return input/output schema metadata for an ONNX model.
Requires the chalkdf-onnx-runtime package to be installed so
that libchalk_onnx_module.so can be dlopened.
A dict with keys input_names (list[str]) and
output_names (list[str]).
If the ONNX module is not available or the model cannot be loaded.
meta = DataFrame.get_onnx_model_metadata("model.onnx")
meta["input_names"]
['input']
meta["output_names"]
['output']
Append an "Unbound" online-store write to this plan.
Wraps the underlying ChalkTable in an UploadFeaturesUDF that
records the destination name + per-call config. The runtime
OnlineStoreWriter is resolved at plan compile time by looking
destination up against the active BindingRegistry (the
engine populates one per request; ad-hoc local runs construct one
with register_redis_lightning_client / register_dynamodb_client
as appropriate). All arguments round-trip cleanly through the
lazyframe proto.
Supported destinations:
"redis_online_store""dynamodb_online_store"chalkdf write_to is flat-scalar-only today — input columns must be
present in column_to_feature. Nested has-one / has-many / vector
feature persistence is handled by the engine's own persist flow, not
through chalkdf.
Run ONNX model inference on this DataFrame.
The DataFrame must contain a column for each model input (matching
the names from get_onnx_model_metadata) plus a __cidx__
column. The returned DataFrame contains the model's output columns
along with __cidx__ and __valid__ columns.
Requires the chalkdf-onnx-runtime package to be installed.
A new DataFrame with the model's output columns, __cidx__,
and __valid__.
If the ONNX module is not available or the model cannot be loaded.
meta = DataFrame.get_onnx_model_metadata("model.onnx")
input_name = meta["input_names"][0]
input_values = pa.array(
[[1.0] * 10],
type=pa.list_(pa.float32()),
)
df = DataFrame.from_arrow(
pa.table(
{
input_name: input_values,
"__cidx__": [0],
}
)
)
result = (
df.onnx_inference_udf(onnx_model_path="model.onnx")
.to_arrow()
)
result.column_names
['output', '__valid__', '__cidx__']
Add or replace columns while keeping all existing columns.
Unlike project, which returns only the columns you specify,
with_columns keeps every existing column and either adds new ones
or replaces columns whose names match.
Accepts multiple forms:
dict mapping column names to expressions(name, expression).alias(<name>)from chalkdf import DataFrame
from chalk.features import _
import chalk.functions as F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column
Force caching of this subcomputation during execution.
replay is an escape hatch for expensive subplans that are reused
downstream. It inserts an explicit replay boundary into the logical
plan, asking libchalk to compute this DataFrame once and serve later
consumers from the replay cache instead of duplicating the work.
Most plans should rely on the optimizer to place replay nodes automatically. Use this only when a test or carefully inspected plan needs a stable caching boundary.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3]})
cached = df.with_columns({"x2": df.col("x") * 2}).replay("shared_x2")
Return a column expression for the named column.
df.col("name") is equivalent to _.name but validates that
"name" exists in the DataFrame's schema at call time and is
therefore useful when the column name is a runtime string variable
rather than a literal attribute access.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})
Return a column expression for the named column.
Alias for col.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})
Combine this DataFrame with one or more others by stacking rows.
All DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
If no other DataFrames are provided, or if schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order
Combine this DataFrame with another by stacking rows.
Convenience method for unioning with a single DataFrame.
Equivalent to union_all(other).
Both DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
union_all : Union with multiple DataFrames at once.
If schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order
Project to an exact set of output columns using expressions.
Unlike with_columns, which keeps all existing columns and only
adds or replaces the ones you name, project returns only the
columns you specify. Columns not listed in columns are dropped.
Use project when you want to reshape or rename the schema
entirely; use with_columns when you only want to augment it.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})
Join this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Optional suffix applied to right-hand columns when names collide.
For example, if both DataFrames have a column "value" and right_suffix="_right",
the result will have "value" and "value_right".
Perform an as-of join with another DataFrame.
An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.
Important: Both DataFrames must be sorted by the on (or left_on/right_on)
column before calling this method. Use .order_by(on) to sort if needed.
Column name to use as the as-of join key (must be sorted).
This column is used for both left and right DataFrames.
The join finds the nearest match according to the strategy.
Either on or both left_on and right_on must be specified.
Column name in left DataFrame for the as-of join key. Only used when on
is None. Must be paired with right_on.
Column name in right DataFrame for the as-of join key. Can be used with on
(to specify a different right column name) or with left_on (when on is None).
Additional exact-match columns (optional). These columns must match exactly
before performing the as-of match on the on column. Can be specified as:
by=["col1", "col2"]by={"left_col": "right_col"}left_by and right_by separately.Column names in left DataFrame for exact-match conditions. Only used when
by is None. Must be paired with right_by.
Column names in right DataFrame for exact-match conditions. Only used when
by is None. Must be paired with left_by.
Compute window (analytic) expressions partitioned by by and ordered by order_by.
Window operations evaluate each WindowExpr over a partition of
rows (defined by by) sorted within that partition (by order_by).
The result columns are appended to the existing schema; original columns
are preserved.
Overlap between by and order_by columns is not allowed.
Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.
Column names (or (name, direction) / (name, direction, nulls_order)
tuples) that define the sort order within each partition.
Direction can be "asc" (default) or "desc". nulls_order
can be "nulls_first"/"first" or "nulls_last"/"last"
(defaults to nulls last).
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
"idx": [1, 1, 2, 2],
"v": [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition
Create a GroupBy object for chained aggregation operations.
This method returns a GroupBy object that can be used to apply
aggregation expressions via the .agg() method. This provides
an alternative syntax to df.agg(by, *aggregations).
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))
Multiple grouping columns:
df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))
Using underscore expressions:
result = df.group_by(_.group).agg(_.value.mean().alias("avg"))
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))
Multi-set aggregation matching SQL GROUP BY ROLLUP(b1, b2, ...).
Expands to grouping sets [(b1,...,bN), (b1,...,bN-1), ..., (b1,), ()]
— every prefix of by plus the empty (grand-total) set. The
output schema is (b1, ..., bN, aggregations..., grouping_id_col):
the by-columns are nullable (carrying NULL in rows from sets that
omit them) and grouping_id_col is an int64 0-based index into
the expansion above (set 0 is the full key, the grand total is the
last index).
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"a": ["A", "A", "B"], "b": ["X", "Y", "X"], "v": [1, 2, 3]})
rolled = df.rollup("a", "b").agg(_.v.sum().alias("sv"))
Multi-set aggregation matching SQL GROUP BY CUBE(b1, b2, ...).
Expands to all 2^N subsets of by, emitted in descending
size and lexicographic-by-index order within each size. The output
schema and grouping_id_col semantics match rollup.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"a": ["A", "B"], "b": ["X", "Y"], "v": [1, 2]})
cubed = df.cube("a", "b").agg(_.v.sum().alias("sv"))
Multi-set aggregation matching SQL GROUP BY GROUPING SETS (...).
Each inner sequence is a grouping set; the empty inner sequence
denotes the grand-total () set. The by-clause is the
column-order union of all sets (first-appearance order across the
supplied sets). The output schema and grouping_id_col semantics
match rollup.
Sequence of grouping sets. At least two sets are required; a
single set is equivalent to df.group_by(...).agg(...) and
should use that form instead.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"a": ["A", "A", "B"], "b": ["X", "Y", "X"], "v": [1, 2, 3]})
result = df.grouping_sets([("a", "b"), ("a",), ()]).agg(_.v.sum().alias("sv"))
Remove duplicate rows based on the specified partition columns.
For each unique combination of values in columns, exactly one
row is emitted. Which row is kept within a partition is not
guaranteed — the engine may choose any row. If you need a
deterministic choice, sort the DataFrame first with order_by
before calling distinct_on.
If no columns are provided.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x") # one row per unique x value
Sort the DataFrame by one or more columns.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")
# Sort by x descending, nulls first
sorted_df = df.order_by(("x", "desc", "nulls_first"))
Build a lazy write plan without executing it.
Returns a new DataFrame whose query plan ends with a
TableWrite operator. No files are written until you call
run or to_arrow on the returned DataFrame.
For immediate execution use write instead.
Execute the DataFrame plan and write the output files immediately.
This is the eager counterpart to write_lazy: it builds the
write plan and runs it in one step.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result MaterializedDataFrame instead.
Write the DataFrame as Parquet files using an auto-configured connector.
Convenience method that simplifies writing Parquet files compared to
the more general write. The connector is selected
automatically based on the URI scheme.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result MaterializedDataFrame instead.
URI prefix where Parquet files will be written.
Supports local (file://), S3 (s3://), and GCS (gs://) URIs.
Whether to skip validation at planning time (default: False).
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/") # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)
Compile the current plan if necessary.
Configuration is resolved from multiple sources in priority order:
config parameter (highest priority)compilation_config context managerset_compilation_defaultsCHALK_USE_VELOX_PARQUET_READER)If a different configuration is provided than the previous compilation, the plan will be automatically recompiled.
from chalkdf import DataFrame
from chalkdf.config import CompilationConfig
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
compiled = filtered.compile(config=CompilationConfig(use_online_hash_join=True))
print(compiled.explain_logical())
Submit this DataFrame plan for remote execution as an asynchronous Chalk job.
The DataFrame must have been created via a serializable constructor
(from_dict, from_arrow, scan, from_datasource, etc.) or a
chain of operations on such a DataFrame. If the plan cannot be serialized a
ValueError is raised.
Requires chalkpy to be installed.
A non-blocking handle. Call wait
to block until the job finishes and retrieve results.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()
job = filtered.run(remote=True)
Class methods for constructing new DataFrame instances from various sources.
named_table (data injected at run time)Create a schema-only placeholder DataFrame for a named table.
The returned DataFrame contains no data; it is a logical reference
that must be supplied with actual Arrow data at execution time via the
tables argument of run or to_arrow. This is
useful when you want to build a reusable query plan against a
well-known schema and inject different data at runtime.
import pyarrow as pa
from chalkdf import DataFrame
schema = pa.schema([("user_id", pa.int64()), ("score", pa.float64())])
df = DataFrame.named_table("users", schema)
# Build a query plan
from chalk.features import _
result_plan = df.filter(_.score > 0.5)
# Inject real data at execution time
import pyarrow as pa
data = pa.table({"user_id": [1, 2, 3], "score": [0.3, 0.8, 0.6]})
result = result_plan.run(tables={"users": data})
Scan files and return a DataFrame.
Currently supports CSV (with headers), Parquet, Delta, and Iceberg.
Schema of the data. Required for CSV files, optional for Parquet. For Delta and Iceberg, inferred from table metadata when omitted.
Scan inference mode:
"auto": infer scan type from the URI/path. Uses suffixes for CSV/Parquet and
can also recognize Delta table roots."hive": expand Hive/glob paths without Delta inference fallback."delta": treat the input as a Delta table root (requires exactly one URI)."iceberg": treat the input as an Iceberg table root (requires exactly one URI).
To read from a catalog instead of file system, use scan_iceberg.Optional Bernoulli row-level sampling rate in (0, 1]. For example, 0.01
yields ~1% of rows. Passed through to the Velox Hive connector's
sampleRate; sampling is non-uniform and skips rows during read (it
does not skip files or row groups, so I/O is not reduced proportionally).
None disables sampling.
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)
# Scan ~1% of rows
df = DataFrame.scan(["data/big.parquet"], row_sample=0.01)
Create a DataFrame from the result of executing a SQL query (DuckDB dialect).
Pass DataFrames or Arrow tables as keyword arguments to make them
available as named tables inside the query. If no keyword arguments
are provided, from_sql will attempt to auto-register any
DataFrames found in the calling scope.
from chalkdf import DataFrame
orders = DataFrame.from_dict({"order_id": [1, 2, 3], "amount": [10.0, 20.0, 5.0]})
result = DataFrame.from_sql(
"SELECT order_id, amount FROM orders WHERE amount > 8",
orders=orders,
)
Join two DataFrames with SQL:
users = DataFrame.from_dict({"id": [1, 2], "name": ["Alice", "Bob"]})
purchases = DataFrame.from_dict({"user_id": [1, 1, 2], "item": ["a", "b", "c"]})
result = DataFrame.from_sql(
"SELECT u.name, p.item FROM users u JOIN purchases p ON u.id = p.user_id",
users=users,
purchases=purchases,
)
Create a DataFrame by pulling messages from a streaming source.
This method connects to a Kafka, Kinesis, or PubSub source and pulls up to n
messages, returning them as a DataFrame.
A streaming source configuration. Can be one of:
KafkaSource: Kafka topic configurationKinesisSource: Kinesis stream configurationPubSubSource: Google PubSub subscription configurationfrom chalkdf import DataFrame
from chalk.streams import KafkaSource
source = KafkaSource(
name="my_kafka",
bootstrap_server="localhost:9092",
topic="my_topic",
)
# Pull 100 messages, just the raw bytes
df = DataFrame.from_stream_source(source, n=100)
# Pull with full metadata
df = DataFrame.from_stream_source(source, n=100, include_metadata=True)
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Methods for selecting, transforming, and manipulating columns.
Column expressions accept _ (underscore) syntax or functions — for
example _.price * _.qty or F.coalesce(_.value, 0). See the
Python function reference for
available functions.
select / drop — pick or remove columns by namewith_columns — add or replace columns while keeping existing onesproject — replace all columns with a new set of expressionscol / column — reference a column by runtime name stringrename — rename one or more columnsexplode — expand a list column into one row per elementwith_unique_id — append a monotonically increasing ID columnReturn a column expression for the named column.
df.col("name") is equivalent to _.name but validates that
"name" exists in the DataFrame's schema at call time and is
therefore useful when the column name is a runtime string variable
rather than a literal attribute access.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Reference a column by name to build expressions
col_x = df.col("x")
df_filtered = df.filter(col_x > 1)
# Useful when the column name comes from a variable
target = "y"
df2 = df.with_columns({"doubled": df.col(target) * 2})
Return a column expression for the named column.
Alias for col.
If column is not present in the DataFrame's schema.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Compute a sum from two columns referenced by name
df2 = df.with_columns({"sum": df.col("x") + df.col("y")})
Add or replace columns while keeping all existing columns.
Unlike project, which returns only the columns you specify,
with_columns keeps every existing column and either adds new ones
or replaces columns whose names match.
Accepts multiple forms:
dict mapping column names to expressions(name, expression).alias(<name>)from chalkdf import DataFrame
from chalk.features import _
import chalk.functions as F
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using underscore syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a column using an F function
df3 = df.with_columns({"z_capped": F.least(_.x + _.y, 8)})
# Add a column using .alias()
df4 = df.with_columns((_.x * 2).alias("x_doubled"))
# Both df2, df3, df4 still contain x and y in addition to the new column
Project to an exact set of output columns using expressions.
Unlike with_columns, which keeps all existing columns and only
adds or replaces the ones you name, project returns only the
columns you specify. Columns not listed in columns are dropped.
Use project when you want to reshape or rename the schema
entirely; use with_columns when you only want to augment it.
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
# Keep only "sum" and "x"; z is dropped
projected = df.project({"sum": _.x + _.y, "x": _.x})
Methods for filtering, ordering, and deduplicating rows.
filter — keep rows matching a boolean expressionorder_by — sort rows by one or more columnsslice — select a positional range of rowsdistinct_on — deduplicate by a set of key columnsSort the DataFrame by one or more columns.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")
# Sort by x descending, nulls first
sorted_df = df.order_by(("x", "desc", "nulls_first"))
Remove duplicate rows based on the specified partition columns.
For each unique combination of values in columns, exactly one
row is emitted. Which row is kept within a partition is not
guaranteed — the engine may choose any row. If you need a
deterministic choice, sort the DataFrame first with order_by
before calling distinct_on.
If no columns are provided.
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x") # one row per unique x value
Methods for combining rows from multiple DataFrames.
Both DataFrames must share the same schema (column order may differ). Duplicates are retained; row order is not guaranteed.
Combine this DataFrame with another by stacking rows.
Convenience method for unioning with a single DataFrame.
Equivalent to union_all(other).
Both DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
union_all : Union with multiple DataFrames at once.
If schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order
Combine this DataFrame with one or more others by stacking rows.
All DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
If no other DataFrames are provided, or if schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order
Methods for combining two DataFrames based on matching keys.
join — standard equality joins (inner, left, right, outer, semi, anti, cross)join_asof — temporal / nearest-key join for time-series dataJoin this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Optional suffix applied to right-hand columns when names collide.
For example, if both DataFrames have a column "value" and right_suffix="_right",
the result will have "value" and "value_right".
Perform an as-of join with another DataFrame.
An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.
Important: Both DataFrames must be sorted by the on (or left_on/right_on)
column before calling this method. Use .order_by(on) to sort if needed.
Column name to use as the as-of join key (must be sorted).
This column is used for both left and right DataFrames.
The join finds the nearest match according to the strategy.
Either on or both left_on and right_on must be specified.
Column name in left DataFrame for the as-of join key. Only used when on
is None. Must be paired with right_on.
Column name in right DataFrame for the as-of join key. Can be used with on
(to specify a different right column name) or with left_on (when on is None).
Additional exact-match columns (optional). These columns must match exactly
before performing the as-of match on the on column. Can be specified as:
by=["col1", "col2"]by={"left_col": "right_col"}left_by and right_by separately.Column names in left DataFrame for exact-match conditions. Only used when
by is None. Must be paired with right_by.
Column names in right DataFrame for exact-match conditions. Only used when
by is None. Must be paired with left_by.
Methods for computing group summaries and window (analytic) expressions.
group_by — returns a GroupBy object for chained aggregationsagg — group by columns and apply aggregation expressions directlywindow — compute analytic (window) expressions partitioned by key columnsCreate a GroupBy object for chained aggregation operations.
This method returns a GroupBy object that can be used to apply
aggregation expressions via the .agg() method. This provides
an alternative syntax to df.agg(by, *aggregations).
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
grouped = df.group_by("group").agg(_.value.sum().alias("total"))
Multiple grouping columns:
df2 = DataFrame.from_dict({"g1": ["A", "A", "B"], "g2": ["X", "Y", "X"], "val": [1, 2, 3]})
result = df2.group_by("g1", "g2").agg(_.val.sum().alias("sum"))
Using underscore expressions:
result = df.group_by(_.group).agg(_.value.mean().alias("avg"))
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))
# Or with a single column:
agg_df = df.agg("group", _.value.sum().alias("total"))
Compute window (analytic) expressions partitioned by by and ordered by order_by.
Window operations evaluate each WindowExpr over a partition of
rows (defined by by) sorted within that partition (by order_by).
The result columns are appended to the existing schema; original columns
are preserved.
Overlap between by and order_by columns is not allowed.
Column names that define the partition boundaries. Rows with the same combination of values in these columns form one partition.
Column names (or (name, direction) / (name, direction, nulls_order)
tuples) that define the sort order within each partition.
Direction can be "asc" (default) or "desc". nulls_order
can be "nulls_first"/"first" or "nulls_last"/"last"
(defaults to nulls last).
from chalkdf import DataFrame
from libchalk.chalktable import WindowExpr
df = DataFrame.from_dict({
"idx": [1, 1, 2, 2],
"v": [10, 20, 30, 40],
})
# Partition by "idx", sort by "v" ascending, shift "v" by -1 into "v_shifted"
result = df.window(["idx"], ["v"], WindowExpr.shift("v", "v_shifted", -1))
# result schema: idx, v, v_shifted
# v_shifted contains the *next* value of v within each idx partition
Methods for executing query plans and inspecting DataFrame structure.
run — execute and return a materialized DataFrameto_arrow — execute and return a pyarrow.Tablewrite / write_parquet — execute and persist output filesexplain_logical / explain_physical — inspect the query planget_plan / get_tables — access internal plan and table statefrom chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()
job = filtered.run(remote=True)
Execute the DataFrame plan and write the output files immediately.
This is the eager counterpart to write_lazy: it builds the
write plan and runs it in one step.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result MaterializedDataFrame instead.
Write the DataFrame as Parquet files using an auto-configured connector.
Convenience method that simplifies writing Parquet files compared to
the more general write. The connector is selected
automatically based on the URI scheme.
By default (return_table_write_result=False) the method returns
None after the write completes. Pass
return_table_write_result=True to receive the raw
TableWrite result MaterializedDataFrame instead.
URI prefix where Parquet files will be written.
Supports local (file://), S3 (s3://), and GCS (gs://) URIs.
Whether to skip validation at planning time (default: False).
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df.write_parquet("file:///tmp/output/") # returns None
result = df.write_parquet("gs://my-bucket/output/", return_table_write_result=True)
High-level interface for calling functions deployed on Chalk scaling groups.
Remote lets you invoke a deployed function by name from within a resolver
or from client code. Two calling modes are supported:
FunctionCallHandle that you can get() later.High-level interface for calling remote functions.
Direct call (scalar):
remote = Remote(client)
fn = remote.function("add")
result = fn(1, 2)
Direct call from async code:
fn = remote.function("add")
result = await fn(1, 2)
Generator (sync):
fn = remote.function("count-up")
for batch in fn(5):
print(batch)
Generator (async):
fn = remote.function("async-count-up")
async for batch in fn(5):
print(batch)
Deferred (any function type):
handle = fn.defer(5)
result = handle.get()
for batch in handle.stream(): ...
Look up a deployed function by name and return a callable.
The function's traits (sync/async, scalar/generator) are discovered from the catalog metadata. The calling convention mirrors Python:
result = fn(args) in sync code or result = await fn(args) in async code.def f() -> Iterator: for x in fn(args): ...async def f() -> AsyncIterator: async for x in fn(args): ...Use fn.defer(args) for deferred execution via the function queue.
Drop pending items from every per-function queue for the tenant.
Returns {function_name: items_removed} for every queue that had
at least one pending item. Empty queues are omitted from the dict.
Already-popped (in-flight) calls continue to completion; only items
still pending in the per-function Redis queues are removed.
A deployed remote function.
Returned by function. The calling convention mirrors Python:
def f() -> int: result = fn(args)async def f() -> int: result = await fn(args)def f() -> Iterator: for x in fn(args): ...async def f() -> AsyncIterator: async for x in fn(args): ...Use defer for deferred execution via the function queue.
A deployed remote function.
Supports both direct calls and deferred calls.
Calling Convention:
result = fn(args) in sync code or result = await fn(args) in async code.def f() -> Iterator[int]: for x in fn(args): ...async def f() -> AsyncIterator[int]: async for x in fn(args): ...All function types also support deferred execution via the function queue:
handle = fn.defer(args)result = handle.get() or for x in handle.stream(): ...Call the function directly through the scaling group via Velox.
Returns the appropriate type based on the function's signature:
Handle to a deferred function call.
Returned by defer.
Use get to block until the result is ready, or
stream to iterate over generator results as they arrive.
Handle to a deferred function call enqueued via the function queue.
Returned by defer.