Overview

Embedding models typically accept batches of inputs and return batches of vectors. When you need to embed a large dataset, sending one row at a time wastes round-trips and leaves GPU capacity idle. The chunked_call method on AsyncFunction splits a PyArrow table into chunks, submits them to the function queue with bounded concurrency, and reassembles the results — all in a single call.


Define the embedding function

The remote function receives its input as a columnar batch. Declare a list[str] or pyarrow.Table parameter so it can accept multiple rows at once:

Your embedding function must accept batch input — either a `list[str]`, a `list[list[float]]`, or a `pyarrow.Table`. A function that accepts a single `str` will only process one row per call, defeating the purpose of batching.

import chalkcompute

@chalkcompute.function(
    name="embed",
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu=2,
    memory="4Gi",
    gpu="nvidia-l4",
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(text: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer

    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(text).tolist()

When max_batching_size or max_buffer_duration is set, the function queue automatically buffers incoming items and delivers them to the handler as a batch. The handler is invoked when either the buffer reaches max_batching_size items or max_buffer_duration milliseconds have elapsed — whichever comes first.

Deploy the function:

chalk compute deploy embed.py

Call with chunked_call

Once the function is deployed, use Remote to get a handle and call chunked_call to process a large dataset in batches:

import pyarrow as pa
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

# Build the input table — one column per function argument.
texts = [f"Document number {i}: some text to embed." for i in range(1000)]
args = pa.table({"text": pa.array(texts, type=pa.large_utf8())})

# Split into 25-row chunks, run up to 4 chunks concurrently.
result = embed.chunked_call(
    args,
    chunk_size=25,
    concurrent_chunks=4,
    timeout=120,
)

print(f"Embedded {len(result)} rows")
print(f"Embedding dim: {len(result.column('result')[0].as_py())}")

Parameters

ParameterDescription
argsA pyarrow.Table whose columns match the function’s expected inputs.
chunk_sizeMax rows per chunk. Mutually exclusive with num_chunks.
num_chunksSplit into exactly this many chunks. Mutually exclusive with chunk_size.
concurrent_chunksMax chunks in flight at once. Higher = more throughput, more queue pressure.
timeoutSeconds to wait per chunk. None = wait forever.

Choosing chunk_size vs num_chunks

Use chunk_size when you know the optimal batch size for your model (e.g. the embedding model’s max batch size). Use num_chunks when you want to control parallelism directly and let the chunk sizes adjust to the dataset.

# Fixed chunk size — good when the model has a known optimal batch size
result = embed.chunked_call(args, chunk_size=64, concurrent_chunks=4)

# Fixed chunk count — good when you want exactly N parallel requests
result = embed.chunked_call(args, num_chunks=8, concurrent_chunks=4)

Lower-level: defer_batch

If you need finer control over submission and result collection, use defer_batch directly. It accepts a pyarrow.RecordBatch and returns an AsyncFunctionCallHandle:

import pyarrow as pa
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

batch = pa.record_batch(
    [pa.array(["hello world", "another sentence"], type=pa.large_utf8())],
    schema=pa.schema([("text", pa.large_utf8())]),
)

handle = embed.defer_batch(batch)
result = handle.get(timeout=60)  # blocks until done
print(f"Got {len(result)} embeddings")

This is useful when you want to manage concurrency yourself, retry individual batches, or interleave submission with other work.


End-to-end example

A complete script that embeds a list of documents and writes the results to Parquet:

import pyarrow as pa
import pyarrow.parquet as pq
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

# Load documents — in practice, read from a file or database.
documents = [f"Document {i}: content goes here." for i in range(500)]

args = pa.table({"text": pa.array(documents, type=pa.large_utf8())})

result = embed.chunked_call(
    args,
    chunk_size=50,
    concurrent_chunks=4,
    timeout=300,
)

# Combine input and output into a single table.
output = pa.table({
    "text": args.column("text"),
    "embedding": result.column("result"),
})

pq.write_table(output, "embeddings.parquet")
print(f"Wrote {len(output)} embeddings to embeddings.parquet")

`chunked_call` preserves row order — the output table's rows correspond 1:1 with the input table's rows, regardless of how chunks were split or in what order they completed.