Chalk SDK Reference

This reference documents the complete Chalk Python SDK for building and managing feature pipelines.

Lightweight DataFrame wrapper around Chalk's execution engine.

The DataFrame class constructs query plans backed by libchalk and can materialize them into Arrow tables. It offers a minimal API similar to other DataFrame libraries while delegating heavy lifting to the underlying engine.

A DataFrame wraps a plan and a mapping of materialized Arrow tables. Operations construct new plans and return new DataFrame instances, leaving previous ones untouched.

Logical representation of tabular data for query operations.

DataFrame provides a lazy evaluation model where operations build up a query plan that executes only when materialized. Most users should use the class methods like from_dict, from_arrow, or scan to create DataFrames rather than calling the constructor directly.

Examples

from chalkdf import DataFrame
from chalk.features import _
# Create from a dictionary
df = DataFrame({"x": [1, 2, 3], "y": ["a", "b", "c"]})
# Apply operations
filtered = df.filter(_.x > 1)
result = filtered.run()
Attributes

Return a list of the column names on this dataframe

column_dtypes
list[pyarrow.DataType]

Return a list of column data types on this dataframe

Return schema of this dataframe

Return the number of columns on this dataframe

Functions

Create a DataFrame from a dictionary, Arrow table, or query plan.

For most use cases, prefer using class methods like from_dict, from_arrow, or scan instead of calling this constructor directly.

Parameters
root:
ChalkTable | MaterializedTable | dict

Data source for the DataFrame. Can be:

  • dict: Dictionary mapping column names to lists of values
  • PyArrow Table or RecordBatch: In-memory Arrow data
  • ChalkTable: Query plan (advanced usage)
tables:
dict[str, MaterializedTable] | None
= None

Optional mapping of additional table names to Arrow data. Used internally for query execution with multiple tables.

from chalkdf import DataFrame
# Simple dictionary input
df = DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
# Or use the explicit class method (recommended)
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})

Return the number of rows if this DataFrame has already been materialized.

Raising TypeError for non-materialized frames matches Python's default behavior while avoiding implicitly executing the plan.

Serialize this DataFrame into a LogicalExprNode proto.

Return a LazyFramePlaceholder when lazy recording is enabled.

Compose a LazyFramePlaceholder on top of this DataFrame.

Create a DataFrame for a named table.

Parameters

Table identifier.

Arrow schema describing the table.

sorted_by: = None
Returns
type:

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Construct a DataFrame from a Python dictionary.

Parameters

Dictionary mapping column names to lists of values.

Returns
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})

Create a DataFrame from a Python async generator function.

This method allows you to create a DataFrame by streaming data from a custom Python async generator. The generator can yield data as PyArrow RecordBatches, pydicts, or pylists, and the method will handle conversion and schema alignment automatically. If the UDF yields an invalid batch, no further batches

will be processed.Notes

  • The UDF runs in a separate thread with its own event loop
  • Column reordering is automatic if batch columns don't match schema order
  • The generator is consumed lazily during DataFrame execution
  • A timeout is applied to prevent backpressure deadlocks with the output handler
Parameters
udf:
Callable[[], typing.AsyncGenerator[pyarrow.RecordBatch | dict | list, None]]

An async generator function that yields data batches. Each yielded value can be a pyarrow.RecordBatch, a dictionary (will be converted using pyarrow.RecordBatch.from_pydict), or a list (will be converted using pyarrow.RecordBatch.from_pylist). The generator should yield None or complete iteration to signal completion.

The expected PyArrow schema for the output data. If yielded batches have columns in a different order, they will be automatically reordered to match this schema.

Maximum time in seconds to wait for the output handler to accept each batch. Prevents deadlocks when the consumer is blocked. Default is 300 seconds (5 minutes). Set to None to disable timeout (not recommended).

Returns
type:
Raises
error:
asyncio.TimeoutError

If sending a batch to the output handler exceeds the timeout.

import pyarrow as pa
from chalkdf import DataFrame
>>>
async def generate_data():
    for i in range(3):
        yield {"x": [i * 10, i * 10 + 1], "y": [i, i]}
>>>
schema = pa.schema([("x", pa.int64()), ("y", pa.int64())])
df = DataFrame.from_python_udf(generate_data, schema)
result = df.run()
# Example with PyArrow RecordBatches
async def generate_batches():
    batch1 = pa.RecordBatch.from_pydict({"a": [1, 2], "b": [3, 4]})
    batch2 = pa.RecordBatch.from_pydict({"a": [5, 6], "b": [7, 8]})
    yield batch1
    yield batch2
>>>
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
df = DataFrame.from_python_udf(generate_batches, schema)
# Example with custom timeout
df = DataFrame.from_python_udf(generate_data, schema, output_timeout=60.0)

Scan files and return a DataFrame.

Currently supports CSV (with headers) and Parquet file formats.

Parameters
input_uris:
typing.Sequence[str | Path] | str | Path

File path/URI or list of paths/URIs to scan. Supports local paths and file:// URIs.

name:
typing.Optional[str]
= None

Optional name to assign to the table being scanned.

Schema of the data. Required for CSV files, optional for Parquet.

Returns
type:
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan(["data/sales_2024.parquet"], name="sales_data")
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan(["data/users.csv"], schema=schema)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of executing a SQL query (DuckDB dialect).

Parameters

SQL query string (DuckDB dialect).

tables:
CompatibleFrameType
= {}
Returns
type:

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Compile the current plan if necessary.

Configuration is resolved from multiple sources in priority order:

  1. Explicit config parameter (highest priority)
  2. Active compilation_config context manager
  3. Global defaults from set_compilation_defaults
  4. Environment variables (e.g., CHALK_USE_VELOX_PARQUET_READER)
  5. Built-in fallback defaults

If a different configuration is provided than the previous compilation, the plan will be automatically recompiled.

Parameters

Explicit compilation configuration (highest priority).

recompile: = False

Force recompilation even if a plan exists.

Returns
type:
CompiledPlan
from chalkdf import DataFrame
from chalkdf.config import CompilationConfig
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
compiled = df.compile(config=CompilationConfig(use_online_hash_join=True))
print(compiled.explain_logical())

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Computes plan JSON for debugging the structure of the computation.

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Add or replace columns.

Accepts multiple forms:

  • A mapping of column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that must include .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Get a column expression from the DataFrame.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Use col to reference columns in expressions
df_filtered = df.filter(_.x > 1)

Get a column expression from the DataFrame.

Alias for col() method.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df_sum = df.with_columns({"sum": _.x + _.y})

Combine this DataFrame with one or more others by stacking rows.

All DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

Returns
type:
Raises
error:

If no other DataFrames are provided, or if schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
df3 = DataFrame({"x": [5], "y": [50]})
result = df1.union_all(df2, df3)
# result contains all 5 rows from df1, df2, and df3, in any order

Combine this DataFrame with another by stacking rows.

Convenience method for unioning with a single DataFrame. Equivalent to union_all(other).

Both DataFrames must have the same schema (different column order is allowed - the output will have the same column order as self). Duplicates are retained. Row order is not preserved.

See Also

union_all : Union with multiple DataFrames at once.

Parameters

DataFrame to union with this DataFrame.

Returns
type:
Raises
error:

If schemas don't match.

df1 = DataFrame({"x": [1, 2], "y": [10, 20]})
df2 = DataFrame({"x": [3, 4], "y": [30, 40]})
result = df1.union(df2)
# result contains all 4 rows from df1 and df2, in any order

Project to a new set of columns using expressions.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
projected = df.project({"sum": _.x + _.y, "x": _.x})

Select existing columns by name.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Drop specified columns from the DataFrame.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
df_dropped = df.drop("z")

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters
column:
str | Underscore

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Join this DataFrame with another.

Parameters
on:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Join keys. Can be specified in multiple ways:

  • A sequence of column names (same names on both sides): on=["col1", "col2"]
  • A mapping of left->right column names: on={"left_col": "right_col"}
  • If None, must specify left_on and right_on separately.
left_on:
typing.Sequence[str | Underscore] | None
= None

Column names for left DataFrame join keys. Only used when on is None. Must be paired with right_on.

right_on:
typing.Sequence[str | Underscore] | None
= None

Column names for right DataFrame join keys. Only used when on is None. Must be paired with left_on.

how:
JoinType
= 'inner'

Join type. Supported values:

  • "inner": Keep only rows that match in both DataFrames (default)
  • "left": Keep all rows from left DataFrame
  • "right": Keep all rows from right DataFrame
  • "outer" or "full": Keep all rows from both DataFrames
  • "semi": Return rows from left that have matches in right (no right columns)
  • "anti": Return rows from left that have no matches in right
  • "cross": Cartesian product (do not pass in on)

Optional suffix applied to right-hand columns when names collide. For example, if both DataFrames have a column "value" and right_suffix="_right", the result will have "value" and "value_right".

Returns
type:

Perform an as-of join with another DataFrame.

An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.

Important: Both DataFrames must be sorted by the on (or left_on/right_on) column before calling this method. Use .order_by(on) to sort if needed.

Parameters

Right-hand DataFrame to join with.

on:
str | Underscore | None
= None

Column name to use as the as-of join key (must be sorted). This column is used for both left and right DataFrames. The join finds the nearest match according to the strategy. Either on or both left_on and right_on must be specified.

left_on:
str | Underscore | None
= None

Column name in left DataFrame for the as-of join key. Only used when on is None. Must be paired with right_on.

right_on:
str | Underscore | None
= None

Column name in right DataFrame for the as-of join key. Can be used with on (to specify a different right column name) or with left_on (when on is None).

by:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Additional exact-match columns (optional). These columns must match exactly before performing the as-of match on the on column. Can be specified as:

  • A sequence of column names (same names on both sides): by=["col1", "col2"]
  • A mapping of left->right column names: by={"left_col": "right_col"}
  • If None, can specify left_by and right_by separately.
left_by:
typing.Sequence[str | Underscore] | None
= None

Column names in left DataFrame for exact-match conditions. Only used when by is None. Must be paired with right_by.

right_by:
typing.Sequence[str | Underscore] | None
= None

Column names in right DataFrame for exact-match conditions. Only used when by is None. Must be paired with left_by.

strategy:
AsOfJoinStrategy | typing.Literal['forward', 'backward']
= 'backward'

Join strategy controlling which match to select:

  • "backward" (default): Match with the most recent past value
  • "forward": Match with the nearest future value Can also pass AsOfJoinStrategy.BACKWARD or AsOfJoinStrategy.FORWARD.

Suffix to add to overlapping column names from the right DataFrame.

coalesce: = True

Whether to coalesce the join keys (default True).

Returns
type:

Compute windowed expressions 'expressions' over 'by' columns ordered by 'order_by' columns. Overlap in by and order_by is not allowed

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str | Underscore]

Column names to group by.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))

Remove duplicate rows based on specified columns.

For rows with identical values in the specified columns, only one row is kept (chosen arbitrarily).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x")

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Persist the DataFrame plan using Velox's Hive connector.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default [Parquet](https://parquet.apache.org/)).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

compression: = None

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

Returns
type:

Write the DataFrame as Parquet files using an auto-configured connector.

This is a convenience method that simplifies writing Parquet files compared to the more general write() method. It automatically configures the appropriate connector based on the URI prefix.

Parameters

URI prefix where Parquet files will be written. Examples:

  • "file:///path/to/dir/" for local filesystem
  • "s3://bucket/prefix/" for S3
  • "gs://bucket/prefix/" for Google Cloud Storage

Whether to skip validation at planning time (default: False).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Write to local filesystem
write_df = df.write_parquet("file:///tmp/output/")
result = write_df.run()

Rename columns in the DataFrame.

Parameters
new_names:
dict[str | Underscore, str]

Dictionary mapping old column names to new column names. Both keys and values can be either strings or underscore column references (e.g., _.col_name).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})
# Can also use underscore syntax for keys
renamed = df.rename({_.x: "id", _.y: "value"})

Execute the query plan and return the result as a PyArrow Table.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
pyarrow.Table
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
arrow_table = filtered.to_arrow()

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Class methods for constructing new DataFrame instances from various data sources.

These methods provide multiple ways to create DataFrames:

  • From Arrow tables in memory
  • From Parquet files on disk or cloud storage
  • From AWS Glue Iceberg tables
  • From SQL datasources
  • From Chalk SQL catalog tables

Create a DataFrame for a named table.

Parameters

Table identifier.

Arrow schema describing the table.

sorted_by: = None
Returns
type:

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Methods for selecting, transforming, and manipulating columns.

These operations allow you to:

  • Select specific columns by name
  • Add or replace columns with new expressions
  • Rename columns
  • Project columns to new names
  • Get column expressions for use in filters and transformations
  • Add unique identifier columns
  • Explode array columns into multiple rows

Get a column expression from the DataFrame.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Use col to reference columns in expressions
df_filtered = df.filter(_.x > 1)

Get a column expression from the DataFrame.

Alias for col() method.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df_sum = df.with_columns({"sum": _.x + _.y})

Select existing columns by name.

Parameters
columns:
str | Underscore
= ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Add or replace columns.

Accepts multiple forms:

  • A mapping of column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that must include .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))

Project to a new set of columns using expressions.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
projected = df.project({"sum": _.x + _.y, "x": _.x})

Rename columns in the DataFrame.

Parameters
new_names:
dict[str | Underscore, str]

Dictionary mapping old column names to new column names. Both keys and values can be either strings or underscore column references (e.g., _.col_name).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})
# Can also use underscore syntax for keys
renamed = df.rename({_.x: "id", _.y: "value"})

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters
column:
str | Underscore

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Methods for filtering and ordering rows.

These operations allow you to:

  • Filter rows based on conditions
  • Sort rows by one or more columns
  • Select a subset of rows by position

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Methods for combining DataFrames and performing group-by operations.

Join operations combine two DataFrames based on matching keys. Aggregation operations group rows and compute summary statistics.

Join this DataFrame with another.

Parameters
on:
dict[str | Underscore, str | Underscore] | typing.Sequence[str | Underscore] | None
= None

Join keys. Can be specified in multiple ways:

  • A sequence of column names (same names on both sides): on=["col1", "col2"]
  • A mapping of left->right column names: on={"left_col": "right_col"}
  • If None, must specify left_on and right_on separately.
left_on:
typing.Sequence[str | Underscore] | None
= None

Column names for left DataFrame join keys. Only used when on is None. Must be paired with right_on.

right_on:
typing.Sequence[str | Underscore] | None
= None

Column names for right DataFrame join keys. Only used when on is None. Must be paired with left_on.

how:
JoinType
= 'inner'

Join type. Supported values:

  • "inner": Keep only rows that match in both DataFrames (default)
  • "left": Keep all rows from left DataFrame
  • "right": Keep all rows from right DataFrame
  • "outer" or "full": Keep all rows from both DataFrames
  • "semi": Return rows from left that have matches in right (no right columns)
  • "anti": Return rows from left that have no matches in right
  • "cross": Cartesian product (do not pass in on)

Optional suffix applied to right-hand columns when names collide. For example, if both DataFrames have a column "value" and right_suffix="_right", the result will have "value" and "value_right".

Returns
type:

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str | Underscore]

Column names to group by.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))

Methods for executing query plans and inspecting DataFrame structure.

These methods allow you to:

  • Execute the DataFrame plan and materialize results
  • View the logical query plan
  • View the physical execution plan
  • Access the underlying ChalkTable plan
  • Get materialized tables

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.