Compute
Define and call remote functions that run on Chalk Compute.
Chalk Functions let you define Python functions that run remotely on Chalk Compute.
Decorate a function with @chalkcompute.function, and it becomes callable from any
Python process — locally, from a notebook, or from another Chalk function. Arguments
and results are serialized automatically; you write ordinary Python and Chalk handles
scheduling, scaling, and delivery.
Functions support three return styles:
yield results incrementally and the caller receives them as
they’re produced.Use the @chalkcompute.function decorator:
import chalkcompute
@chalkcompute.function
def add(x: int, y: int) -> int:
return x + yDeploy by running the script — the decorator registers the function with Chalk. Call
wait_ready() to block until the function is available:
add.wait_ready()
result = add(3, 5)
print(result) # 8A more realistic example — transcribe an audio file from a URL using Whisper and stream back the transcript as segments are decoded:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install faster-whisper httpx",
),
cpu=4,
memory="8Gi",
)
async def transcribe(audio_uri: str):
import httpx
from faster_whisper import WhisperModel
# Download the audio file.
async with httpx.AsyncClient() as client:
resp = await client.get(audio_uri)
resp.raise_for_status()
audio_path = "/tmp/audio"
with open(audio_path, "wb") as f:
f.write(resp.content)
# Load model and transcribe.
model = WhisperModel("base.en", compute_type="int8")
segments, _info = model.transcribe(audio_path)
# Yield each segment as it's decoded — the caller receives
# these incrementally without waiting for the full transcript.
for segment in segments:
yield {
"start": segment.start,
"end": segment.end,
"text": segment.text,
}The caller consumes segments as they arrive:
transcribe.wait_ready()
for segment in transcribe("https://example.com/meeting-recording.wav"):
print(f"[{segment['start']:.1f}s] {segment['text']}")Output streams in real time:
[0.0s] Welcome to the quarterly review.
[3.2s] Let's start with the revenue numbers.
[6.1s] Q3 came in at 4.2 million, up 18 percent.
...
Because `transcribe` is an async generator (it uses `yield`), the caller begins receiving segments as soon as the first one is decoded. For a 30-minute recording, the first segment may arrive in seconds — the caller doesn't wait for the full file to finish processing.
A function can call other functions. This lets you compose multi-step pipelines where each step runs on its own infrastructure. The inner call goes through the same queue — it doesn’t require a direct connection to the inner function’s worker.
import chalkcompute
@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
from transformers import pipeline
translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
return translator(text)[0]["translation_text"]
@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
"""Transcribe audio, then translate each segment."""
translate.wait_ready()
async for segment in transcribe(audio_uri):
translated = translate(segment["text"], target_lang)
yield {
**segment,
"translated": translated,
}The caller sees a single stream of translated segments:
transcribe_and_translate.wait_ready()
for seg in transcribe_and_translate(
"https://example.com/meeting.wav",
target_lang="es",
):
print(f"[{seg['start']:.1f}s] {seg['translated']}")[0.0s] Bienvenidos a la revisión trimestral.
[3.2s] Comencemos con las cifras de ingresos.
[6.1s] El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.
Here transcribe_and_translate calls two other functions — transcribe (an async
generator that yields segments) and translate (a single-value function that returns a
translated string). Each runs on its own compute; Chalk routes the calls through the
function queue automatically.
Functions accept resource hints that control the compute environment:
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install sentence-transformers",
),
cpu=2,
memory="4Gi",
gpu="nvidia-l4",
secrets=[chalkcompute.Secret("HF_TOKEN")],
max_batching_size=25,
max_buffer_duration=2000,
retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()| Parameter | Description |
|---|---|
name | Custom function name. Defaults to the function’s __name__. |
image | Container image to deploy in. Defaults to Image.debian_slim(). |
cpu | CPU resource request (e.g. 2, "500m"). |
memory | Memory resource request (e.g. "4Gi", "512Mi"). |
gpu | GPU resource request (e.g. "nvidia-l4", "nvidia-a100"). |
env | Environment variables to pass to the function. |
min_replicas | Minimum number of replicas. |
max_replicas | Maximum number of replicas. |
volumes | List of (Volume(name), mount_path) tuples for persistent storage. |
secrets | List of Secret references to inject into the function’s environment. |
serialization_format | Wire format for arguments and results. Currently only "pyarrow" is supported. |
suspend_cooldown_period | Time in seconds until containers shut down after the last request. |
options | Arbitrary key-value options forwarded to the Chalk platform. |
max_buffer_duration | Maximum time in milliseconds to buffer incoming items before invoking the handler. Defaults to 1000 ms when batching is enabled. |
max_batching_size | Maximum number of items to accumulate before invoking the handler. Defaults to 10 when batching is enabled. |
retries | Retry policy for handler invocations. Pass an int for simple max-attempts with default exponential backoff, or a RetryPolicy for full control. |
rate_limit_key | Name of an externally defined rate limit policy for controlling outbound concurrency. |
Inject secrets into a function using the secrets parameter. Secrets are
resolved at deploy time and made available as environment variables inside
the container:
import chalkcompute
@chalkcompute.function(
secrets=[
chalkcompute.Secret.from_env("OPENAI_API_KEY"),
chalkcompute.Secret.from_integration("prod_postgres"),
chalkcompute.Secret.from_local_env("MY_LOCAL_TOKEN"),
],
)
def call_api(prompt: str) -> str:
import os
import httpx
key = os.environ["OPENAI_API_KEY"]
# ...See the Secrets section of the overview for
the full Secret API.
When your handler is more efficient processing multiple items at once (e.g.
GPU batch inference), enable batching with max_buffer_duration and/or
max_batching_size. Chalk will accumulate incoming items and invoke the
handler in chunks:
@chalkcompute.function(
max_buffer_duration=1000, # buffer up to 1000 ms
max_batching_size=32, # or up to 32 items
gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(texts).tolist()When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:
| Parameter | Default (when batching enabled) |
|---|---|
max_buffer_duration | 1000 ms |
max_batching_size | 10 items |
Functions can automatically retry on failure. Pass a simple integer for
max-attempts with default exponential backoff, or use RetryPolicy for full
control:
import chalkcompute
from chalkcompute import RetryPolicy
# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
...
# Exponential backoff with custom parameters
@chalkcompute.function(
retries=RetryPolicy.exponential(
attempts=5,
initial=0.5,
max_wait=30.0,
),
)
def resilient_call(payload: str) -> str:
...
# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
retries=RetryPolicy.linear(
attempts=3,
delay=2.0,
retry_on=(TimeoutError, ConnectionError),
),
)
def network_call(url: str) -> str:
...Exponential backoff — wait times grow geometrically: initial, initial * multiplier,
initial * multiplier², …, capped at max_wait:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
initial | float | 1.0 | Base wait time in seconds |
multiplier | float | 2.0 | Backoff multiplier |
max_wait | float | 60.0 | Upper bound on wait time |
jitter | bool | True | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
Fixed delay between retries:
| Parameter | Type | Default | Description |
|---|---|---|---|
attempts | int | 3 | Max retry attempts |
delay | float | 1.0 | Fixed wait in seconds |
jitter | bool | False | Add random jitter |
retry_on | tuple[type, ...] | (Exception,) | Exception types that trigger retry |
By default, all Exception subclasses trigger a retry. Pass a tuple of
exception classes to retry_on to restrict which failures are retried:
RetryPolicy.exponential(
attempts=4,
retry_on=(ConnectionError, TimeoutError, OSError),
)Non-matching exceptions propagate immediately without consuming retry budget.
Use rate_limit_key to reference a rate limit policy defined as a
resource in your project settings. This controls outbound concurrency
from multiple sources calling the same downstream service:
@chalkcompute.function(
rate_limit_key="provider-api-limit",
)
def call_provider(query: str) -> str:
...Set suspend_cooldown_period to control how long containers stay alive after
the last request. This avoids cold starts for bursty workloads while still
releasing resources during idle periods:
@chalkcompute.function(
suspend_cooldown_period=300, # keep alive for 5 minutes after last call
gpu="nvidia-l4",
)
def infer(text: str) -> str:
...Attach persistent storage to a function with the volumes parameter. Each
entry is a (Volume(...), mount_path) tuple. Inside the function, the volume
is accessible as an ordinary filesystem path — read and write files normally:
import chalkcompute
from chalkcompute import Volume
@chalkcompute.function(
volumes=[(Volume("model-cache"), "/models")],
gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
import os
import json
from sentence_transformers import SentenceTransformer
cache_path = "/models/all-MiniLM-L6-v2"
if os.path.exists(cache_path):
model = SentenceTransformer(cache_path)
else:
model = SentenceTransformer("all-MiniLM-L6-v2")
model.save(cache_path)
return model.encode(text).tolist()Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.
You can invoke a function on every row of a DataFrame using
F.catalog_call() inside a .project() expression. This is useful for
batch-processing columnar data — Chalk handles parallelism, batching, and
delivery automatically.
For example, define a function that extracts named entities from text:
import chalkcompute
@chalkcompute.function(
image=chalkcompute.Image.base("python:3.12-slim").run_commands(
"pip install spacy && python -m spacy download en_core_web_sm",
),
cpu=2,
memory="4Gi",
)
def extract_entities(memo: str) -> str:
import json
import spacy
nlp = spacy.load("en_core_web_sm")
doc = nlp(memo)
return json.dumps([
{"text": ent.text, "label": ent.label_}
for ent in doc.ents
])Then apply it across a parquet file of transaction memos:
from chalkcompute import DataFrame, F, _
results = (
DataFrame.scan("transactions.parquet")
.project({
"transaction_id": _.transaction_id,
"memo": _.memo,
"entities": F.catalog_call("extract_entities", _.memo),
})
)Each row’s memo column is passed to extract_entities and the result is
returned as a new entities column. The original DataFrame is not mutated —
.project() returns only the columns you specify.
If a function raises an exception, the caller receives it as a RuntimeError with the
original exception type and message:
@chalkcompute.function
def divide(a: float, b: float) -> float:
return a / b
divide.wait_ready()
try:
divide(1.0, 0.0)
except RuntimeError as e:
print(e) # ZeroDivisionError: division by zeroFor async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:
@chalkcompute.function
async def fragile_gen():
yield "ok-1"
yield "ok-2"
raise ValueError("something went wrong")
for item in fragile_gen():
print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrongUnder the hood, @chalkcompute.function uses the
Function Queue infrastructure. When you call a function:
For async generators, each yield becomes a separate chunk in the result stream — the
caller receives it as soon as the server’s next poll picks it up. See the
Function Queue page for protocol details.