Chalk SDK Reference

Lightweight DataFrame wrapper around Chalk's execution engine.

The DataFrame class constructs query plans backed by libchalk and can materialize them into Arrow tables. It offers a minimal API similar to other DataFrame libraries while delegating heavy lifting to the underlying engine.

A DataFrame wraps a plan and a mapping of materialized Arrow tables. Operations construct new plans and return new DataFrame instances, leaving previous ones untouched.

Logical representation of tabular data for query operations.

DataFrame provides a lazy evaluation model where operations build up a query plan that executes only when materialized. Most users should use the class methods like from_dict, from_arrow, or scan to create DataFrames rather than calling the constructor directly.

Examples

from chalkdf import DataFrame
from chalk.features import _
# Create from a dictionary
df = DataFrame({"x": [1, 2, 3], "y": ["a", "b", "c"]})
# Apply operations
filtered = df.filter(_.x > 1)
result = filtered.run()
Attributes

Return a list of the column names on this dataframe

column_dtypes
list[pyarrow.DataType]

Return a list of column data types on this dataframe

Return schema of this dataframe

Return the number of columns on this dataframe

Functions

Create a DataFrame from a dictionary, Arrow table, or query plan.

For most use cases, prefer using class methods like from_dict, from_arrow, or scan instead of calling this constructor directly.

Parameters
root:
ChalkTable | MaterializedTable | dict

Data source for the DataFrame. Can be:

  • dict: Dictionary mapping column names to lists of values
  • PyArrow Table or RecordBatch: In-memory Arrow data
  • ChalkTable: Query plan (advanced usage)
tables:
dict[str, MaterializedTable] | None
= None

Optional mapping of additional table names to Arrow data. Used internally for query execution with multiple tables.

from chalkdf import DataFrame
# Simple dictionary input
df = DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
# Or use the explicit class method (recommended)
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})

Return the number of rows if this DataFrame has already been materialized.

Raising TypeError for non-materialized frames matches Python's default behavior while avoiding implicitly executing the plan.

Create a DataFrame for a named table.

Parameters

Table identifier.

Arrow schema describing the table.

Returns
type:

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Construct a DataFrame from a Python dictionary.

Parameters

Dictionary mapping column names to lists of values.

Returns
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": ["a", "b", "c"]})

Scan files and return a DataFrame.

Currently supports CSV (with headers) and Parquet file formats.

Parameters

Name to assign to the table being scanned.

input_uris:
typing.Sequence[str | Path]

List of file paths or URIs to scan. Supports local paths and file:// URIs.

Schema of the data. Required for CSV files, optional for Parquet.

Returns
type:
from chalkdf import DataFrame
# Scan Parquet files
df = DataFrame.scan("sales_data", ["data/sales_2024.[Parquet](https://parquet.apache.org/)"])
# Scan CSV with explicit schema
import pyarrow as pa
schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
df = DataFrame.scan("users", ["data/users.csv"], schema=schema)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of executing a SQL query (DuckDB dialect).

Parameters

SQL query string (DuckDB dialect).

tables:
CompatibleFrameType
= {}
Returns
type:

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Computes plan JSON for debugging the structure of the computation.

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Add or replace columns.

Accepts multiple forms:

  • A mapping of column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that must include .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Get a column expression from the DataFrame.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Use col to reference columns in expressions
df_filtered = df.filter(_.x > 1)

Get a column expression from the DataFrame.

Alias for col() method.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df_sum = df.with_columns({"sum": _.x + _.y})

Project to a new set of columns using expressions.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
projected = df.project({"sum": _.x + _.y, "x": _.x})

Select existing columns by name.

Parameters
columns: = ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Drop specified columns from the DataFrame.

Parameters
columns: = ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
df_dropped = df.drop("z")

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Join this DataFrame with another.

Parameters
on:
dict[str, str] | typing.Sequence[str]

Column names or mapping of left->right join keys.

how: = 'inner'

Join type (e.g. "inner" or "left").

Optional suffix applied to right-hand columns when names collide.

Returns
type:

Perform an as-of join with another DataFrame.

An as-of join is similar to a left join, but instead of matching on equality, it matches on the nearest key from the right DataFrame. This is commonly used for time-series data where you want to join with the most recent observation.

Important: Both DataFrames must be sorted by the on column before calling this method. Use .order_by(on) to sort if needed.

Parameters

Right-hand DataFrame to join with.

Column name in the left DataFrame to join on (must be sorted).

right_on: = None

Column name in the right DataFrame to join on. If None, uses on.

by: = None

Additional exact-match columns for left DataFrame (optional).

right_by: = None

Additional exact-match columns for right DataFrame. If None, uses by.

strategy:
AsOfJoinStrategy | typing.Literal['forward', 'backward']
= 'backward'

Join strategy - "backward" (default) matches with the most recent past value, "forward" matches with the nearest future value. Can also pass AsOfJoinStrategy enum.

Suffix to add to overlapping column names from the right DataFrame.

coalesce: = True

Whether to coalesce the join keys (default True).

Returns
type:

Compute windowed expressions 'expressions' over 'by' columns ordered by 'order_by' columns. Overlap in by and order_by is not allowed

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str]

Column names to group by.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))

Remove duplicate rows based on specified columns.

For rows with identical values in the specified columns, only one row is kept (chosen arbitrarily).

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 1, 2], "y": [10, 20, 30]})
unique = df.distinct_on("x")

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Persist the DataFrame plan using Velox's Hive connector.

Parameters

Directory to write output files.

Optional explicit file name.

file_format: = 'parquet'

Output format (default [Parquet](https://parquet.apache.org/)).

serde_parameters:
typing.Mapping[str, str] | None
= None

Optional SerDe options for text formats.

compression: = None

Optional compression codec.

Ensure writers emit files even if no rows were produced.

Optional connector id override.

Returns
type:

Rename columns in the DataFrame.

Parameters

Dictionary mapping old column names to new column names.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})

Execute the query plan and return the result as a PyArrow Table.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
pyarrow.Table
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
arrow_table = filtered.to_arrow()

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Class methods for constructing new DataFrame instances from various data sources.

These methods provide multiple ways to create DataFrames:

  • From Arrow tables in memory
  • From Parquet files on disk or cloud storage
  • From AWS Glue Iceberg tables
  • From SQL datasources
  • From Chalk SQL catalog tables

Create a DataFrame for a named table.

Parameters

Table identifier.

Arrow schema describing the table.

Returns
type:

Construct a DataFrame from an in-memory Arrow object.

Parameters
data:
MaterializedTable

PyArrow Table or RecordBatch to convert into a DataFrame.

Returns
import pyarrow as pa
from chalkdf import DataFrame
table = pa.table({"x": [1, 2, 3], "y": ["a", "b", "c"]})
df = DataFrame.from_arrow(table)

Load data from an AWS Glue Iceberg table.

Parameters

Fully qualified database.table name.

schema:
typing.Mapping[str, pyarrow.DataType]

Mapping of column names to Arrow types.

Number of rows per batch.

aws_catalog_account_id:
typing.Optional[str]
= None

AWS account hosting the Glue catalog.

aws_catalog_region:
typing.Optional[str]
= None

Region of the Glue catalog.

aws_role_arn:
typing.Optional[str]
= None

IAM role to assume for access.

filter_predicate:
typing.Optional[Expr]
= None

Optional filter applied during scan.

parquet_scan_range_column:
typing.Optional[str]
= None

Column used for range-based reads.

custom_partitions:
typing.Optional[dict[str, tuple[typing.Literal['date_trunc(day)'], str]]]
= None

Additional partition definitions.

partition_column:
typing.Optional[str]
= None

Column name representing partitions.

Returns
type:

Create a DataFrame from a Chalk SQL catalog table.

Parameters

Name of the table in the catalog.

catalog:
ChalkSqlCatalog

ChalkSqlCatalog instance containing the table.

Returns
type:
from chalkdf import DataFrame
from libchalk.chalksql import ChalkSqlCatalog
catalog = ChalkSqlCatalog()
df = DataFrame.from_catalog_table("users", catalog=catalog)

Create a DataFrame from the result of querying a SQL data source.

Parameters
source:
BaseSQLSource

SQL source to query (e.g., PostgreSQL, Snowflake, BigQuery).

SQL query to execute against the data source.

Output schema of the query result. The datasource's driver converts the native query result to this schema.

Returns
import pyarrow as pa
from chalkdf import DataFrame
from chalk.sql import PostgreSQLSource
source = PostgreSQLSource(...)
schema = pa.schema([("user_id", pa.int64()), ("name", pa.string())])
df = DataFrame.from_datasource(source, "SELECT * FROM users", schema)

Methods for selecting, transforming, and manipulating columns.

These operations allow you to:

  • Select specific columns by name
  • Add or replace columns with new expressions
  • Rename columns
  • Project columns to new names
  • Get column expressions for use in filters and transformations
  • Add unique identifier columns
  • Explode array columns into multiple rows

Get a column expression from the DataFrame.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Use col to reference columns in expressions
df_filtered = df.filter(_.x > 1)

Get a column expression from the DataFrame.

Alias for col() method.

Parameters

Name of the column to retrieve.

Returns
type:
Underscore
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
df_sum = df.with_columns({"sum": _.x + _.y})

Select existing columns by name.

Parameters
columns: = ()
strict: = True

If True, raise an error if any column doesn't exist. If False, silently ignore missing columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8, 9]})
selected = df.select("x", "y")

Add or replace columns.

Accepts multiple forms:

  • A mapping of column names to expressions
  • Positional tuples of (name, expression)
  • Bare positional expressions that must include .alias(<name>)
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
# Add a new column using a dict with _ syntax
df2 = df.with_columns({"z": _.x + _.y})
# Add a new column using alias
df3 = df.with_columns((_.x + _.y).alias("z"))

Project to a new set of columns using expressions.

Parameters
columns:
typing.Mapping[str, Expr | Underscore]

Mapping of output column names to expressions that define them.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
projected = df.project({"sum": _.x + _.y, "x": _.x})

Rename columns in the DataFrame.

Parameters

Dictionary mapping old column names to new column names.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
renamed = df.rename({"x": "id", "y": "value"})

Add a monotonically increasing unique identifier column.

Parameters

Name of the new ID column.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [10, 20, 30]})
df_with_id = df.with_unique_id("row_id")

Explode a list or array column into multiple rows.

Each element in the list becomes a separate row, with other column values duplicated.

Parameters

Name of the list/array column to explode.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"id": [1, 2], "items": [[10, 20], [30]]})
exploded = df.explode("items")

Methods for filtering and ordering rows.

These operations allow you to:

  • Filter rows based on conditions
  • Sort rows by one or more columns
  • Select a subset of rows by position

Filter rows based on a boolean expression.

Parameters
expr:
Expr | Underscore

Boolean expression to filter rows. Only rows where the expression evaluates to True are kept.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
filtered = df.filter(_.x > 2)

Sort the DataFrame by one or more columns.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [3, 1, 2], "y": [30, 10, 20]})
# Sort by x ascending
sorted_df = df.order_by("x")
# Sort by x descending, then y ascending
sorted_df = df.order_by(("x", "desc"), "y")

Return a subset of rows starting at a specific position.

Parameters

Zero-based index where the slice begins.

length: = None

Number of rows to include. If None, includes all remaining rows.

Returns
type:
from chalkdf import DataFrame
df = DataFrame.from_dict({"x": [1, 2, 3, 4, 5]})
# Get rows 1-3 (indices 1, 2, 3)
sliced = df.slice(1, 3)

Methods for combining DataFrames and performing group-by operations.

Join operations combine two DataFrames based on matching keys. Aggregation operations group rows and compute summary statistics.

Join this DataFrame with another.

Parameters
on:
dict[str, str] | typing.Sequence[str]

Column names or mapping of left->right join keys.

how: = 'inner'

Join type (e.g. "inner" or "left").

Optional suffix applied to right-hand columns when names collide.

Returns
type:

Group by columns and apply aggregation expressions.

Parameters
by:
typing.Sequence[str]

Column names to group by.

aggregations:
AggExpr | Underscore
= ()
Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"group": ["A", "A", "B"], "value": [1, 2, 3]})
agg_df = df.agg(["group"], _.value.sum().alias("total"))

Methods for executing query plans and inspecting DataFrame structure.

These methods allow you to:

  • Execute the DataFrame plan and materialize results
  • View the logical query plan
  • View the physical execution plan
  • Access the underlying ChalkTable plan
  • Get materialized tables

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.

Execute the query plan and return a materialized DataFrame.

Parameters
tables:
typing.Mapping[str, MaterializedTable]
= _empty_table_dict

Optional mapping of table names to materialized Arrow data for execution.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
materialized = filtered.run()

Return a string representation of the logical query plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_logical())

Return a string representation of the physical execution plan.

Returns
type:
from chalkdf import DataFrame
from chalk.features import _
df = DataFrame.from_dict({"x": [1, 2, 3], "y": [4, 5, 6]})
filtered = df.filter(_.x > 1)
print(filtered.explain_physical())

Expose the underlying ChalkTable plan.

Return the mapping of materialized tables for this DataFrame.