Compute
Orchestrate parallel embedding jobs across worker functions with shared volumes.
A two-function system for batch-embedding documents at scale:
orchestrate — reads a manifest of file paths from a volume, fans out one
embed call per document, and collects the results.embed — reads a single file from the shared volume, calls an embedding model
through Chalk’s OpenAI-compatible router, writes the embedding back to the volume,
and upserts it into Turbopuffer for vector search.Both functions are deployed with chalk compute deploy and communicate through a
shared Volume. The orchestrator doesn’t need a GPU or heavy dependencies — it just
coordinates. The workers carry the inference libraries.
Start with the leaf function. Each invocation receives a file path on a shared volume, embeds its contents, and pushes the result to two places: back to the volume (so the orchestrator can read it) and to Turbopuffer for indexing.
# embed_worker.py
import json
import os
import chalkcompute
from chalkcompute import Container, Image, Secret, Volume
worker_image = (
Image.base("python:3.12-slim")
.pip_install(["openai", "turbopuffer"])
)
worker_container = Container(
image=worker_image,
cpu="2",
memory="4Gi",
secrets=[
Secret(name="OPENAI_API_KEY"),
Secret(name="TURBOPUFFER_API_KEY"),
],
volumes={"embed-data": "/data"},
)The Container definition pins the worker’s resource limits and mounts the shared
volume at /data. Secrets are injected as environment variables — the function code
reads them with os.environ as usual, but they never appear in the image or source.
Now the function itself:
@chalkcompute.function(name="embed", container=worker_container)
def embed(file_path: str) -> str:
"""Embed a single document and push to Turbopuffer.
Args:
file_path: Path relative to the volume root, e.g. "docs/readme.md".
Returns:
The path where the embedding was written on the volume.
"""
import openai
import turbopuffer as tpuf
# 1. Read the document from the shared volume.
full_path = os.path.join("/data", file_path)
with open(full_path) as f:
text = f.read()
# 2. Call the embedding model through Chalk's OpenAI-compatible router.
# The router handles rate limiting, retries, and cost tracking.
client = openai.OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
base_url="https://router.chalk.ai/v1",
)
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
embedding = response.data[0].embedding
# 3. Write the embedding to the volume so the orchestrator can access it.
output_path = file_path.rsplit(".", 1)[0] + ".embedding.json"
full_output = os.path.join("/data", output_path)
os.makedirs(os.path.dirname(full_output), exist_ok=True)
with open(full_output, "w") as f:
json.dump({"file": file_path, "embedding": embedding}, f)
# 4. Upsert into Turbopuffer for vector search.
tpuf.api_key = os.environ["TURBOPUFFER_API_KEY"]
ns = tpuf.Namespace("documents")
ns.upsert(
ids=[file_path],
vectors=[embedding],
attributes={"text": [text[:1000]]}, # store a preview
)
return output_pathEach worker instance is stateless. The volume provides the input; the function writes its output back to the same volume and to Turbopuffer. Ten workers processing ten files all write to different paths — no coordination needed.
The orchestrator reads a manifest file from the volume, fans out embed calls in
parallel, and waits for all of them to finish.
# orchestrator.py
import json
import os
import asyncio
import chalkcompute
from chalkcompute import Container, Image, Volume
orchestrator_image = (
Image.base("python:3.12-slim")
.pip_install(["chalkcompute"])
)
orchestrator_container = Container(
image=orchestrator_image,
cpu="1",
memory="1Gi",
volumes={"embed-data": "/data"},
)The orchestrator is deliberately lightweight — it doesn’t need the OpenAI or
Turbopuffer SDKs. It only needs chalkcompute to call the remote embed function.
@chalkcompute.function(name="orchestrate", container=orchestrator_container)
def orchestrate(manifest_path: str) -> dict:
"""Fan out embedding jobs for every file listed in a manifest.
The manifest is a JSON file on the shared volume containing a list of
relative file paths, e.g. ["docs/readme.md", "docs/guide.md"].
"""
# 1. Read the manifest from the shared volume.
full_path = os.path.join("/data", manifest_path)
with open(full_path) as f:
file_paths = json.load(f)
# 2. Get a reference to the embed worker by name.
embed_fn = chalkcompute.function_ref("embed")
# 3. Fan out: call embed() for every file in parallel.
futures = [embed_fn.async_call(fp) for fp in file_paths]
results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*futures))
# 4. Summarize.
return {
"total": len(file_paths),
"completed": len([r for r in results if r is not None]),
"output_paths": list(results),
}The key line is chalkcompute.function_ref("embed") — this returns a handle to the
remote embed function by name, without importing it. The orchestrator and worker
can live in separate files, separate containers, even separate repos.
Put both files in the same directory and deploy them together:
chalk compute deploy embed_worker.py
# ✓ Container created successfully
# Container ID: e4a29f83-1bc5-4d7a-8e62-5c9f0ad31b78
# Name: embed-worker
# Status: Running
# Pod Name: chalk-container-embed-worker
# URL: https://e4a29f83-1bc5-4d7a-8e62-5c9f0ad31b78.compute.chalk.ai
chalk compute deploy orchestrator.py
# ✓ Container created successfully
# Container ID: f7b38d14-9ce2-4f5b-a371-8d6e2ca40f95
# Name: fan-out-orchestrator
# Status: Running
# Pod Name: chalk-container-fan-out-orchestrator
# URL: https://f7b38d14-9ce2-4f5b-a371-8d6e2ca40f95.compute.chalk.aiBefore running the orchestrator, upload your documents and a manifest:
from chalkcompute import Volume
vol = Volume(name="embed-data")
# Upload documents.
vol.put_file("docs/readme.md", open("./corpus/readme.md", "rb").read())
vol.put_file("docs/guide.md", open("./corpus/guide.md", "rb").read())
vol.put_file("docs/api-ref.md", open("./corpus/api-ref.md", "rb").read())
# Upload a manifest listing every file to embed.
import json
manifest = ["docs/readme.md", "docs/guide.md", "docs/api-ref.md"]
vol.put_file("manifest.json", json.dumps(manifest).encode())Call the orchestrator — it reads the manifest and fans out to workers:
import chalkcompute
orchestrate = chalkcompute.function_ref("orchestrate")
orchestrate.wait_ready()
result = orchestrate("manifest.json")
print(result)
# {
# "total": 3,
# "completed": 3,
# "output_paths": [
# "docs/readme.embedding.json",
# "docs/guide.embedding.json",
# "docs/api-ref.embedding.json",
# ],
# }Each embedding is now in two places: on the volume (for downstream pipelines) and in Turbopuffer (for vector search). The orchestrator finished as soon as all three workers returned.
┌─────────────────┐
│ orchestrate │
manifest.json ────────────▸│ (1 CPU, 1Gi) │
on Volume │ │
└───────┬─────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ embed │ │ embed │ │ embed │
│ worker 0 │ │ worker 1 │ │ worker 2 │
│ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│ │ (2 CPU,4Gi)│
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
┌─────────┼──────────────┼──────────────┼─────────┐
│ ▼ ▼ ▼ │
│ Shared Volume │
│ /data/docs/*.embedding.json │
└─────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────┐
│ Turbopuffer │
│ "documents" namespace │
└─────────────────────────────────────────┘
For larger corpora, adjust the number of concurrent workers by setting
max_instances on the worker container:
worker_container = Container(
image=worker_image,
cpu="2",
memory="4Gi",
secrets=[
Secret(name="OPENAI_API_KEY"),
Secret(name="TURBOPUFFER_API_KEY"),
],
volumes={"embed-data": "/data"},
min_instances=0,
max_instances=20,
max_concurrent_requests=4,
)With this configuration, Chalk scales from zero to 20 worker replicas as the orchestrator’s fan-out grows. Each replica handles up to 4 concurrent embed calls. A corpus of 1,000 documents would be processed by up to 80 parallel embedding requests across 20 workers.