What we're building

A two-function system for batch-embedding documents at scale:

  • orchestrate — reads a manifest of file paths from a volume, fans out one embed call per document, and collects the results.
  • embed — reads a single file from the shared volume, calls an embedding model through Chalk’s OpenAI-compatible router, writes the embedding back to the volume, and upserts it into Turbopuffer for vector search.

Both functions are deployed with chalk compute deploy and communicate through a shared Volume. The orchestrator doesn’t need a GPU or heavy dependencies — it just coordinates. The workers carry the inference libraries.


The embed worker

Start with the leaf function. Each invocation receives a file path on a shared volume, embeds its contents, and pushes the result to two places: back to the volume (so the orchestrator can read it) and to Turbopuffer for indexing.

# embed_worker.py
import json
import os

import chalkcompute
from chalkcompute import Container, Image, Secret, Volume

worker_image = (
    Image.base("python:3.12-slim")
    .pip_install(["openai", "turbopuffer"])
)

worker_container = Container(
    image=worker_image,
    cpu="2",
    memory="4Gi",
    secrets=[
        Secret(name="OPENAI_API_KEY"),
        Secret(name="TURBOPUFFER_API_KEY"),
    ],
    volumes={"embed-data": "/data"},
)

The Container definition pins the worker’s resource limits and mounts the shared volume at /data. Secrets are injected as environment variables — the function code reads them with os.environ as usual, but they never appear in the image or source.

Now the function itself:

@chalkcompute.function(name="embed", container=worker_container)
def embed(file_path: str) -> str:
    """Embed a single document and push to Turbopuffer.

    Args:
        file_path: Path relative to the volume root, e.g. "docs/readme.md".

    Returns:
        The path where the embedding was written on the volume.
    """
    import openai
    import turbopuffer as tpuf

    # 1. Read the document from the shared volume.
    full_path = os.path.join("/data", file_path)
    with open(full_path) as f:
        text = f.read()

    # 2. Call the embedding model through Chalk's OpenAI-compatible router.
    #    The router handles rate limiting, retries, and cost tracking.
    client = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    embedding = response.data[0].embedding

    # 3. Write the embedding to the volume so the orchestrator can access it.
    output_path = file_path.rsplit(".", 1)[0] + ".embedding.json"
    full_output = os.path.join("/data", output_path)
    os.makedirs(os.path.dirname(full_output), exist_ok=True)
    with open(full_output, "w") as f:
        json.dump({"file": file_path, "embedding": embedding}, f)

    # 4. Upsert into Turbopuffer for vector search.
    tpuf.api_key = os.environ["TURBOPUFFER_API_KEY"]
    ns = tpuf.Namespace("documents")
    ns.upsert(
        ids=[file_path],
        vectors=[embedding],
        attributes={"text": [text[:1000]]},  # store a preview
    )

    return output_path

Each worker instance is stateless. The volume provides the input; the function writes its output back to the same volume and to Turbopuffer. Ten workers processing ten files all write to different paths — no coordination needed.


The orchestrator

The orchestrator reads a manifest file from the volume, fans out embed calls in parallel, and waits for all of them to finish.

# orchestrator.py
import json
import os
import asyncio

import chalkcompute
from chalkcompute import Container, Image, Volume

orchestrator_image = (
    Image.base("python:3.12-slim")
    .pip_install(["chalkcompute"])
)

orchestrator_container = Container(
    image=orchestrator_image,
    cpu="1",
    memory="1Gi",
    volumes={"embed-data": "/data"},
)

The orchestrator is deliberately lightweight — it doesn’t need the OpenAI or Turbopuffer SDKs. It only needs chalkcompute to call the remote embed function.

@chalkcompute.function(name="orchestrate", container=orchestrator_container)
def orchestrate(manifest_path: str) -> dict:
    """Fan out embedding jobs for every file listed in a manifest.

    The manifest is a JSON file on the shared volume containing a list of
    relative file paths, e.g. ["docs/readme.md", "docs/guide.md"].
    """
    # 1. Read the manifest from the shared volume.
    full_path = os.path.join("/data", manifest_path)
    with open(full_path) as f:
        file_paths = json.load(f)

    # 2. Get a reference to the embed worker by name.
    embed_fn = chalkcompute.function_ref("embed")

    # 3. Fan out: call embed() for every file in parallel.
    futures = [embed_fn.async_call(fp) for fp in file_paths]
    results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*futures))

    # 4. Summarize.
    return {
        "total": len(file_paths),
        "completed": len([r for r in results if r is not None]),
        "output_paths": list(results),
    }

The key line is chalkcompute.function_ref("embed") — this returns a handle to the remote embed function by name, without importing it. The orchestrator and worker can live in separate files, separate containers, even separate repos.


Deploying both functions

Put both files in the same directory and deploy them together:

chalk compute deploy embed_worker.py
# ✓ Container created successfully
# Container ID: e4a29f83-1bc5-4d7a-8e62-5c9f0ad31b78
# Name: embed-worker
# Status: Running
# Pod Name: chalk-container-embed-worker
# URL: https://e4a29f83-1bc5-4d7a-8e62-5c9f0ad31b78.compute.chalk.ai

chalk compute deploy orchestrator.py
# ✓ Container created successfully
# Container ID: f7b38d14-9ce2-4f5b-a371-8d6e2ca40f95
# Name: fan-out-orchestrator
# Status: Running
# Pod Name: chalk-container-fan-out-orchestrator
# URL: https://f7b38d14-9ce2-4f5b-a371-8d6e2ca40f95.compute.chalk.ai

Preparing the volume

Before running the orchestrator, upload your documents and a manifest:

from chalkcompute import Volume

vol = Volume(name="embed-data")

# Upload documents.
vol.put_file("docs/readme.md", open("./corpus/readme.md", "rb").read())
vol.put_file("docs/guide.md", open("./corpus/guide.md", "rb").read())
vol.put_file("docs/api-ref.md", open("./corpus/api-ref.md", "rb").read())

# Upload a manifest listing every file to embed.
import json
manifest = ["docs/readme.md", "docs/guide.md", "docs/api-ref.md"]
vol.put_file("manifest.json", json.dumps(manifest).encode())

Running it

Call the orchestrator — it reads the manifest and fans out to workers:

import chalkcompute

orchestrate = chalkcompute.function_ref("orchestrate")
orchestrate.wait_ready()

result = orchestrate("manifest.json")
print(result)
# {
#   "total": 3,
#   "completed": 3,
#   "output_paths": [
#     "docs/readme.embedding.json",
#     "docs/guide.embedding.json",
#     "docs/api-ref.embedding.json",
#   ],
# }

Each embedding is now in two places: on the volume (for downstream pipelines) and in Turbopuffer (for vector search). The orchestrator finished as soon as all three workers returned.


How it fits together

                            ┌─────────────────┐
                            │   orchestrate   │
 manifest.json ────────────▸│   (1 CPU, 1Gi)  │
    on Volume               │                 │
                            └───────┬─────────┘
                                    │
                       ┌────────────┼────────────┐
                       ▼            ▼            ▼
                ┌────────────┐ ┌────────────┐ ┌────────────┐
                │   embed    │ │   embed    │ │   embed    │
                │  worker 0  │ │  worker 1  │ │  worker 2  │
                │ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│
                └─────┬──────┘ └─────┬──────┘ └─────┬──────┘
                      │              │              │
            ┌─────────┼──────────────┼──────────────┼─────────┐
            │         ▼              ▼              ▼         │
            │              Shared Volume                      │
            │         /data/docs/*.embedding.json             │
            └─────────────────────────────────────────────────┘
                      │              │              │
                      ▼              ▼              ▼
               ┌─────────────────────────────────────────┐
               │            Turbopuffer                  │
               │         "documents" namespace           │
               └─────────────────────────────────────────┘

Scaling up

For larger corpora, adjust the number of concurrent workers by setting max_instances on the worker container:

worker_container = Container(
    image=worker_image,
    cpu="2",
    memory="4Gi",
    secrets=[
        Secret(name="OPENAI_API_KEY"),
        Secret(name="TURBOPUFFER_API_KEY"),
    ],
    volumes={"embed-data": "/data"},
    min_instances=0,
    max_instances=20,
    max_concurrent_requests=4,
)

With this configuration, Chalk scales from zero to 20 worker replicas as the orchestrator’s fan-out grows. Each replica handles up to 4 concurrent embed calls. A corpus of 1,000 documents would be processed by up to 80 parallel embedding requests across 20 workers.