# Batched Calls for Embeddings
source: https://docs.chalk.ai/docs/compute/batched-embeddings

## Use chunked_call to efficiently batch embedding requests with bounded concurrency.

### Overview

Embedding models typically accept batches of inputs and return batches of vectors.
When you need to embed a large dataset, sending one row at a time wastes round-trips
and leaves GPU capacity idle. The chunked_call method on AsyncFunction splits a
PyArrow table into chunks, submits them to the function queue with bounded concurrency,
and reassembles the results — all in a single call.

### Define the embedding function

The remote function receives its input as a columnar batch. Declare a
list[str] or pyarrow.Table parameter so it can accept multiple rows at once:

Your embedding function must accept batch input — either a list[str], a
list[list[float]], or a pyarrow.Table. A function that accepts a single str
will only process one row per call, defeating the purpose of batching.

```
import chalkcompute

@chalkcompute.function(
    name="embed",
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu="2",
    memory="4Gi",
    gpu="nvidia-l4",
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(text: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer

    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(text).tolist()
```

When max_batching_size or max_buffer_duration is set, the function queue
automatically buffers incoming items and delivers them to the handler as a batch.
The handler is invoked when either the buffer reaches max_batching_size items or
max_buffer_duration milliseconds have elapsed — whichever comes first.

Deploy the function:

```
python embed.py
```

### Call with chunked_call

Once the function is deployed, use Remote to get a handle and call
chunked_call to process a large dataset in batches:

```
import pyarrow as pa
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

# Build the input table — one column per function argument.
texts = [f"Document number {i}: some text to embed." for i in range(1000)]
args = pa.table({"text": pa.array(texts, type=pa.large_utf8())})

# Split into 25-row chunks, run up to 4 chunks concurrently.
result = embed.chunked_call(
    args,
    chunk_size=25,
    concurrent_chunks=4,
    timeout=120,
)

print(f"Embedded {len(result)} rows")
print(f"Embedding dim: {len(result.column('result')[0].as_py())}")
```

### Parameters

| Parameter           | Description                                                                  |
| ------------------- | ---------------------------------------------------------------------------- |
| `args`              | A `pyarrow.Table` whose columns match the function's expected inputs.        |
| `chunk_size`        | Max rows per chunk. Mutually exclusive with `num_chunks`.                    |
| `num_chunks`        | Split into exactly this many chunks. Mutually exclusive with `chunk_size`.   |
| `concurrent_chunks` | Max chunks in flight at once. Higher = more throughput, more queue pressure. |
| `timeout`           | Seconds to wait per chunk. `None` = wait forever.                            |

### Choosing chunk_size vs num_chunks

Use chunk_size when you know the optimal batch size for your model (e.g. the
embedding model's max batch size). Use num_chunks when you want to control
parallelism directly and let the chunk sizes adjust to the dataset.

```
# Fixed chunk size — good when the model has a known optimal batch size
result = embed.chunked_call(args, chunk_size=64, concurrent_chunks=4)

# Fixed chunk count — good when you want exactly N parallel requests
result = embed.chunked_call(args, num_chunks=8, concurrent_chunks=4)
```

### Lower-level: defer_batch

If you need finer control over submission and result collection, use defer_batch
directly. It accepts a pyarrow.RecordBatch and returns an AsyncFunctionCallHandle:

```
import pyarrow as pa
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

batch = pa.record_batch(
    [pa.array(["hello world", "another sentence"], type=pa.large_utf8())],
    schema=pa.schema([("text", pa.large_utf8())]),
)

handle = embed.defer_batch(batch)
result = handle.get(timeout=60)  # blocks until done
print(f"Got {len(result)} embeddings")
```

This is useful when you want to manage concurrency yourself, retry individual
batches, or interleave submission with other work.

### End-to-end example

A complete script that embeds a list of documents and writes the results to Parquet:

```
import pyarrow as pa
import pyarrow.parquet as pq
from chalkdf import Remote

remote = Remote()
embed = remote.function("embed", output_type=pa.large_list(pa.float32()), is_async=True)

# Load documents — in practice, read from a file or database.
documents = [f"Document {i}: content goes here." for i in range(500)]

args = pa.table({"text": pa.array(documents, type=pa.large_utf8())})

result = embed.chunked_call(
    args,
    chunk_size=50,
    concurrent_chunks=4,
    timeout=300,
)

# Combine input and output into a single table.
output = pa.table({
    "text": args.column("text"),
    "embedding": result.column("result"),
})

pq.write_table(output, "embeddings.parquet")
print(f"Wrote {len(output)} embeddings to embeddings.parquet")
```

chunked_call preserves row order — the output table's rows correspond 1:1
with the input table's rows, regardless of how chunks were split or in what
order they completed.





