# Fan-Out Inference
source: https://docs.chalk.ai/docs/compute/fan-out-inference

## Orchestrate parallel embedding jobs across worker functions with shared volumes.

### What we're building

A two-function system for batch-embedding documents at scale:

- orchestrate — reads a manifest of file paths from a volume, fans out one
embed call per document, and collects the results.
- embed — reads a single file from the shared volume, calls an embedding model
through Chalk's OpenAI-compatible router, writes the embedding back to the volume,
and upserts it into Turbopuffer for vector search.

Both functions are deployed by running their Python scripts and communicate through a
shared Volume. The orchestrator doesn't need a GPU or heavy dependencies — it just
coordinates. The workers carry the inference libraries.

### The embed worker

Start with the leaf function. Each invocation receives a file path on a shared volume,
embeds its contents, and pushes the result to two places: back to the volume (so the
orchestrator can read it) and to Turbopuffer for indexing.

```
# embed_worker.py
import json
import os

import chalkcompute
from chalkcompute import Container, Image, Secret, Volume

worker_image = (
    Image.base("python:3.12-slim")
    .pip_install(["openai", "turbopuffer"])
)

worker_container = Container(
    image=worker_image,
    cpu="2",
    memory="4Gi",
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("TURBOPUFFER_API_KEY"),
    ],
    volumes=[("embed-data", "/data")],
)
```

The Container definition pins the worker's resource limits and mounts the shared
volume at /data. Secrets are injected as environment variables — the function code
reads them with os.environ as usual, but they never appear in the image or source.

Now the function itself:

```
@chalkcompute.function(name="embed", container=worker_container)
def embed(file_path: str) -> str:
    """Embed a single document and push to Turbopuffer.

    Args:
        file_path: Path relative to the volume root, e.g. "docs/readme.md".

    Returns:
        The path where the embedding was written on the volume.
    """
    import openai
    import turbopuffer as tpuf

    # 1. Read the document from the shared volume.
    full_path = os.path.join("/data", file_path)
    with open(full_path) as f:
        text = f.read()

    # 2. Call the embedding model through Chalk's OpenAI-compatible router.
    #    The router handles rate limiting, retries, and cost tracking.
    client = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    embedding = response.data[0].embedding

    # 3. Write the embedding to the volume so the orchestrator can access it.
    output_path = file_path.rsplit(".", 1)[0] + ".embedding.json"
    full_output = os.path.join("/data", output_path)
    os.makedirs(os.path.dirname(full_output), exist_ok=True)
    with open(full_output, "w") as f:
        json.dump({"file": file_path, "embedding": embedding}, f)

    # 4. Upsert into Turbopuffer for vector search.
    tpuf.api_key = os.environ["TURBOPUFFER_API_KEY"]
    ns = tpuf.Namespace("documents")
    ns.upsert(
        ids=[file_path],
        vectors=[embedding],
        attributes={"text": [text[:1000]]},  # store a preview
    )

    return output_path
```

Each worker instance is stateless. The volume provides the input; the function
writes its output back to the same volume and to Turbopuffer. Ten workers
processing ten files all write to different paths — no coordination needed.

### The orchestrator

The orchestrator reads a manifest file from the volume, fans out embed calls in
parallel, and waits for all of them to finish.

```
# orchestrator.py
import json
import os
import asyncio

import chalkcompute
from chalkcompute import Container, Image, Volume

orchestrator_image = (
    Image.base("python:3.12-slim")
    .pip_install(["chalkcompute"])
)

orchestrator_container = Container(
    image=orchestrator_image,
    cpu="1",
    memory="1Gi",
    volumes=[("embed-data", "/data")],
)
```

The orchestrator is deliberately lightweight — it doesn't need the OpenAI or
Turbopuffer SDKs. It only needs chalkcompute to call the remote embed function.

```
@chalkcompute.function(name="orchestrate", container=orchestrator_container)
def orchestrate(manifest_path: str) -> dict:
    """Fan out embedding jobs for every file listed in a manifest.

    The manifest is a JSON file on the shared volume containing a list of
    relative file paths, e.g. ["docs/readme.md", "docs/guide.md"].
    """
    # 1. Read the manifest from the shared volume.
    full_path = os.path.join("/data", manifest_path)
    with open(full_path) as f:
        file_paths = json.load(f)

    # 2. Get a reference to the embed worker by name.
    embed_fn = chalkcompute.function_ref("embed")

    # 3. Fan out: call embed() for every file in parallel.
    futures = [embed_fn.async_call(fp) for fp in file_paths]
    results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*futures))

    # 4. Summarize.
    return {
        "total": len(file_paths),
        "completed": len([r for r in results if r is not None]),
        "output_paths": list(results),
    }
```

The key line is chalkcompute.function_ref("embed") — this returns a handle to the
remote embed function by name, without importing it. The orchestrator and worker
can live in separate files, separate containers, even separate repos.

### Deploying both functions

Put both files in the same directory and deploy them together:

```
python embed_worker.py

python orchestrator.py
```

### Preparing the volume

Before running the orchestrator, upload your documents and a manifest:

```
from chalkcompute import Volume

vol = Volume(name="embed-data")

# Upload documents.
vol.put_file("docs/readme.md", open("./corpus/readme.md", "rb").read())
vol.put_file("docs/guide.md", open("./corpus/guide.md", "rb").read())
vol.put_file("docs/api-ref.md", open("./corpus/api-ref.md", "rb").read())

# Upload a manifest listing every file to embed.
import json
manifest = ["docs/readme.md", "docs/guide.md", "docs/api-ref.md"]
vol.put_file("manifest.json", json.dumps(manifest).encode())
```

### Running it

Call the orchestrator — it reads the manifest and fans out to workers:

```
import chalkcompute

orchestrate = chalkcompute.function_ref("orchestrate")
orchestrate.wait_ready()

result = orchestrate("manifest.json")
print(result)
# {
#   "total": 3,
#   "completed": 3,
#   "output_paths": [
#     "docs/readme.embedding.json",
#     "docs/guide.embedding.json",
#     "docs/api-ref.embedding.json",
#   ],
# }
```

Each embedding is now in two places: on the volume (for downstream pipelines)
and in Turbopuffer (for vector search). The orchestrator finished as soon as all
three workers returned.

### How it fits together

```
                            ┌─────────────────┐
                            │   orchestrate   │
 manifest.json ────────────▸│   (1 CPU, 1Gi)  │
    on Volume               │                 │
                            └───────┬─────────┘
                                    │
                       ┌────────────┼────────────┐
                       ▼            ▼            ▼
                ┌────────────┐ ┌────────────┐ ┌────────────┐
                │   embed    │ │   embed    │ │   embed    │
                │  worker 0  │ │  worker 1  │ │  worker 2  │
                │ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│
                └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
                      │              │              │
            ┌─────────┼──────────────┼──────────────┼─────────┐
            │         ▼              ▼              ▼         │
            │              Shared Volume                      │
            │         /data/docs/*.embedding.json             │
            └─────────────────────────────────────────────────┘
                      │              │              │
                      ▼              ▼              ▼
               ┌─────────────────────────────────────────┐
               │            Turbopuffer                  │
               │         "documents" namespace           │
               └─────────────────────────────────────────┘
```

### Scaling up

For larger corpora, adjust the number of concurrent workers by setting
max_instances on the worker container:

```
worker_container = Container(
    image=worker_image,
    cpu="2",
    memory="4Gi",
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("TURBOPUFFER_API_KEY"),
    ],
    volumes=[("embed-data", "/data")],
    min_instances=0,
    max_instances=20,
    max_concurrent_requests=4,
)
```

With this configuration, Chalk scales from zero to 20 worker replicas as the
orchestrator's fan-out grows. Each replica handles up to 4 concurrent embed calls.
A corpus of 1,000 documents would be processed by up to 80 parallel embedding
requests across 20 workers.





