This reference documents the complete Chalk Python SDK for building and managing feature pipelines.
Lightweight DataFrame wrapper around Chalk's execution engine.
The DataFrame class constructs query plans backed by libchalk and
can materialize them into Arrow tables. It offers a minimal API similar to
other DataFrame libraries while delegating heavy lifting to the underlying
engine.
A DataFrame wraps a plan and a mapping of materialized Arrow tables.
Operations construct new plans and return new DataFrame instances, leaving
previous ones untouched.
Logical representation of tabular data for query operations.
DataFrame provides a lazy evaluation model where operations build up a query
plan that executes only when materialized. Most users should use the class
methods like from_dict, from_arrow, or scan to create
DataFrames rather than calling the constructor directly.
from chalkdf import DataFrame
from chalk.features import _
# Create from a dictionary
df = DataFrame({"x": [1, 2, 3], "y": ["a", "b", "c"]})
# Apply operations
filtered = df.filter(_.x > 1)
result = filtered.run()
Create a DataFrame from a dictionary, Arrow table, or query plan.
For most use cases, prefer using class methods like from_dict,
from_arrow, or scan instead of calling this constructor directly.
from chalkdf import DataFrame
# Simple dictionary input
df = DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
# Or use the explicit class method (recommended)
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
Return the number of rows if this DataFrame has already been materialized.
Raising TypeError for non-materialized frames matches Python's default
behavior while avoiding implicitly executing the plan.
Create a DataFrame from a Python async generator function.
This method allows you to create a DataFrame by streaming data from a custom Python async generator. The generator can yield data as PyArrow RecordBatches, pydicts, or pylists, and the method will handle conversion and schema alignment automatically. If the UDF yields an invalid batch, no further batches
An async generator function that yields data batches. Each yielded value
can be a pyarrow.RecordBatch, a dictionary (will be converted using
pyarrow.RecordBatch.from_pydict), or a list (will be converted using
pyarrow.RecordBatch.from_pylist). The generator should yield None
or complete iteration to signal completion.
The expected PyArrow schema for the output data. If yielded batches have columns in a different order, they will be automatically reordered to match this schema.
Maximum time in seconds to wait for the output handler to accept each
batch. Prevents deadlocks when the consumer is blocked. Default is 300
seconds (5 minutes). Set to None to disable timeout (not recommended).
If sending a batch to the output handler exceeds the timeout.
import pyarrow as pa
from chalkdf import DataFrame
>>>
async def generate_data():
for i in range(3):
yield {"x": [i * 10, i * 10 + 1], "y": [i, i]}
>>>
schema = pa.schema([("x", pa.int64()), ("y", pa.int64())])
df = DataFrame.from_python_udf(generate_data, schema)
result = df.run()
# Example with PyArrow RecordBatches
async def generate_batches():
batch1 = pa.RecordBatch.from_pydict({"a": [1, 2], "b": [3, 4]})
batch2 = pa.RecordBatch.from_pydict({"a": [5, 6], "b": [7, 8]})
yield batch1
yield batch2
>>>
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
df = DataFrame.from_python_udf(generate_batches, schema)
# Example with custom timeout
df = DataFrame.from_python_udf(generate_data, schema, output_timeout=60.0)
Scan files and return a DataFrame.
Currently supports CSV (with headers) and Parquet file formats.
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Compile the current plan if necessary.
Configuration is resolved from multiple sources in priority order:
config parameter (highest priority)compilation_config context managerset_compilation_defaultsCHALK_USE_VELOX_PARQUET_READER)If a different configuration is provided than the previous compilation, the plan will be automatically recompiled.
from chalkdf import DataFrame
from chalkdf.config import CompilationConfig
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
compiled = df.compile(config=CompilationConfig(use_online_hash_join=True))
print(compiled.explain_logical())
Add or replace columns.
Accepts multiple forms:
.alias(<name>)from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))
Combine this DataFrame with one or more others by stacking rows.
All DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
If no other DataFrames are provided, or if schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order
Combine this DataFrame with another by stacking rows.
Convenience method for unioning with a single DataFrame.
Equivalent to union_all(other).
Both DataFrames must have the same schema (different column order is
allowed - the output will have the same column order as self).
Duplicates are retained. Row order is not preserved.
union_all : Union with multiple DataFrames at once.
If schemas don't match.
df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order
Join this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Perform an as-of join with another DataFrame.
An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.
Important: Both DataFrames must be sorted by the on (or left_on/right_on)
column before calling this method. Use .order_by(on) to sort if needed.
Column name to use as the as-of join key (must be sorted).
This column is used for both left and right DataFrames.
The join finds the nearest match according to the strategy.
Either on or both left_on and right_on must be specified.
Column name in left DataFrame for the as-of join key. Only used when on
is None. Must be paired with right_on.
Column name in right DataFrame for the as-of join key. Can be used with on
(to specify a different right column name) or with left_on (when on is None).
Additional exact-match columns (optional). These columns must match exactly
before performing the as-of match on the on column. Can be specified as:
by=["col1", "col2"]by={"left_col": "right_col"}left_by and right_by separately.Column names in left DataFrame for exact-match conditions. Only used when
by is None. Must be paired with right_by.
Column names in right DataFrame for exact-match conditions. Only used when
by is None. Must be paired with left_by.
Write the DataFrame as Parquet files using an auto-configured connector.
This is a convenience method that simplifies writing Parquet files compared
to the more general write() method. It automatically configures the
appropriate connector based on the URI prefix.
URI prefix where Parquet files will be written. Examples:
"file:///path/to/dir/" for local filesystem"s3://bucket/prefix/" for S3"gs://bucket/prefix/" for Google Cloud Storagefrom chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Write to local filesystem
write_df = df.write_parquet("file:///tmp/output/")
result = write_df.run()
Class methods for constructing new DataFrame instances from various data sources.
These methods provide multiple ways to create DataFrames:
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)
Methods for selecting, transforming, and manipulating columns.
These operations allow you to:
Add or replace columns.
Accepts multiple forms:
.alias(<name>)from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))
Methods for filtering and ordering rows.
These operations allow you to:
Methods for combining DataFrames and performing group-by operations.
Join operations combine two DataFrames based on matching keys. Aggregation operations group rows and compute summary statistics.
Join this DataFrame with another.
Join keys. Can be specified in multiple ways:
on=["col1", "col2"]on={"left_col": "right_col"}left_on and right_on separately.Join type. Supported values:
"inner": Keep only rows that match in both DataFrames (default)"left": Keep all rows from left DataFrame"right": Keep all rows from right DataFrame"outer" or "full": Keep all rows from both DataFrames"semi": Return rows from left that have matches in right (no right columns)"anti": Return rows from left that have no matches in right"cross": Cartesian product (do not pass in on)Methods for executing query plans and inspecting DataFrame structure.
These methods allow you to: