Compute
Use chunked_call to efficiently batch embedding requests with bounded concurrency.
Embedding models typically accept batches of inputs and return batches of vectors.
When you need to embed a large dataset, sending one row at a time wastes round-trips
and leaves GPU capacity idle. The chunked_call method on AsyncFunction splits a
PyArrow table into chunks, submits them to the function queue with bounded concurrency,
and reassembles the results — all in a single call.
The remote function receives its input as a columnar batch. Declare a
list[str] or pyarrow.Table parameter so it can accept multiple rows at once:
Your embedding function must accept batch input — either a `list[str]`, a `list[list[float]]`, or a `pyarrow.Table`. A function that accepts a single `str` will only process one row per call, defeating the purpose of batching.
import chalkcompute
@chalkcompute.function(
name="embed",
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install sentence-transformers",
),
cpu=2,
memory="4Gi",
gpu="nvidia-l4",
max_batching_size=25,
max_buffer_duration=2000,
retries=3,
)
def embed(text: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text).tolist()When max_batching_size or max_buffer_duration is set, the function queue
automatically buffers incoming items and delivers them to the handler as a batch.
The handler is invoked when either the buffer reaches max_batching_size items or
max_buffer_duration milliseconds have elapsed — whichever comes first.
Deploy the function:
chalk compute deploy embed.pyOnce the function is deployed, use Remote to get a handle and call
chunked_call to process a large dataset in batches:
import pyarrow as pa
from chalkdf import Remote
remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)
# Build the input table — one column per function argument.
texts = [f"Document number {i}: some text to embed." for i in range(1000)]
args = pa.table({"text": pa.array(texts, type=pa.large_utf8())})
# Split into 25-row chunks, run up to 4 chunks concurrently.
result = embed.chunked_call(
args,
chunk_size=25,
concurrent_chunks=4,
timeout=120,
)
print(f"Embedded {len(result)} rows")
print(f"Embedding dim: {len(result.column('result')[0].as_py())}")| Parameter | Description |
|---|---|
args | A pyarrow.Table whose columns match the function’s expected inputs. |
chunk_size | Max rows per chunk. Mutually exclusive with num_chunks. |
num_chunks | Split into exactly this many chunks. Mutually exclusive with chunk_size. |
concurrent_chunks | Max chunks in flight at once. Higher = more throughput, more queue pressure. |
timeout | Seconds to wait per chunk. None = wait forever. |
Use chunk_size when you know the optimal batch size for your model (e.g. the
embedding model’s max batch size). Use num_chunks when you want to control
parallelism directly and let the chunk sizes adjust to the dataset.
# Fixed chunk size — good when the model has a known optimal batch size
result = embed.chunked_call(args, chunk_size=64, concurrent_chunks=4)
# Fixed chunk count — good when you want exactly N parallel requests
result = embed.chunked_call(args, num_chunks=8, concurrent_chunks=4)If you need finer control over submission and result collection, use defer_batch
directly. It accepts a pyarrow.RecordBatch and returns an AsyncFunctionCallHandle:
import pyarrow as pa
from chalkdf import Remote
remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)
batch = pa.record_batch(
[pa.array(["hello world", "another sentence"], type=pa.large_utf8())],
schema=pa.schema([("text", pa.large_utf8())]),
)
handle = embed.defer_batch(batch)
result = handle.get(timeout=60) # blocks until done
print(f"Got {len(result)} embeddings")This is useful when you want to manage concurrency yourself, retry individual batches, or interleave submission with other work.
A complete script that embeds a list of documents and writes the results to Parquet:
import pyarrow as pa
import pyarrow.parquet as pq
from chalkdf import Remote
remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)
# Load documents — in practice, read from a file or database.
documents = [f"Document {i}: content goes here." for i in range(500)]
args = pa.table({"text": pa.array(documents, type=pa.large_utf8())})
result = embed.chunked_call(
args,
chunk_size=50,
concurrent_chunks=4,
timeout=300,
)
# Combine input and output into a single table.
output = pa.table({
"text": args.column("text"),
"embedding": result.column("result"),
})
pq.write_table(output, "embeddings.parquet")
print(f"Wrote {len(output)} embeddings to embeddings.parquet")`chunked_call` preserves row order — the output table's rows correspond 1:1 with the input table's rows, regardless of how chunks were split or in what order they completed.