Overview

Chalk Functions let you define Python functions that run remotely on Chalk Compute. Decorate a function with @chalkcompute.function, and it becomes callable from any Python process — locally, from a notebook, or from another Chalk function. Arguments and results are serialized automatically; you write ordinary Python and Chalk handles scheduling, scaling, and delivery.

Functions support three return styles:

  • Single value — return a result and the caller receives it.
  • Async generatoryield results incrementally and the caller receives them as they’re produced.
  • Nested calls — a function can call other functions, composing pipelines without manual orchestration.

Defining a function

Use the @chalkcompute.function decorator:

import chalkcompute

@chalkcompute.function
def add(x: int, y: int) -> int:
    return x + y

Deploy by running the script — the decorator registers the function with Chalk. Call wait_ready() to block until the function is available:

add.wait_ready()
result = add(3, 5)
print(result)  # 8

Example: audio transcription

A more realistic example — transcribe an audio file from a URL using Whisper and stream back the transcript as segments are decoded:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install faster-whisper httpx",
    ),
    cpu=4,
    memory="8Gi",
)
async def transcribe(audio_uri: str):
    import httpx
    from faster_whisper import WhisperModel

    # Download the audio file.
    async with httpx.AsyncClient() as client:
        resp = await client.get(audio_uri)
        resp.raise_for_status()

    audio_path = "/tmp/audio"
    with open(audio_path, "wb") as f:
        f.write(resp.content)

    # Load model and transcribe.
    model = WhisperModel("base.en", compute_type="int8")
    segments, _info = model.transcribe(audio_path)

    # Yield each segment as it's decoded — the caller receives
    # these incrementally without waiting for the full transcript.
    for segment in segments:
        yield {
            "start": segment.start,
            "end": segment.end,
            "text": segment.text,
        }

The caller consumes segments as they arrive:

transcribe.wait_ready()

for segment in transcribe("https://example.com/meeting-recording.wav"):
    print(f"[{segment['start']:.1f}s] {segment['text']}")

Output streams in real time:

[0.0s]  Welcome to the quarterly review.
[3.2s]  Let's start with the revenue numbers.
[6.1s]  Q3 came in at 4.2 million, up 18 percent.
...

Because `transcribe` is an async generator (it uses `yield`), the caller begins receiving segments as soon as the first one is decoded. For a 30-minute recording, the first segment may arrive in seconds — the caller doesn't wait for the full file to finish processing.


Nested function calls

A function can call other functions. This lets you compose multi-step pipelines where each step runs on its own infrastructure. The inner call goes through the same queue — it doesn’t require a direct connection to the inner function’s worker.

import chalkcompute

@chalkcompute.function
def translate(text: str, target_lang: str) -> str:
    from transformers import pipeline
    translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-en-{target_lang}")
    return translator(text)[0]["translation_text"]


@chalkcompute.function
async def transcribe_and_translate(audio_uri: str, target_lang: str):
    """Transcribe audio, then translate each segment."""
    translate.wait_ready()

    async for segment in transcribe(audio_uri):
        translated = translate(segment["text"], target_lang)
        yield {
            **segment,
            "translated": translated,
        }

The caller sees a single stream of translated segments:

transcribe_and_translate.wait_ready()

for seg in transcribe_and_translate(
    "https://example.com/meeting.wav",
    target_lang="es",
):
    print(f"[{seg['start']:.1f}s] {seg['translated']}")
[0.0s]  Bienvenidos a la revisión trimestral.
[3.2s]  Comencemos con las cifras de ingresos.
[6.1s]  El tercer trimestre llegó a 4,2 millones, un 18 por ciento más.

Here transcribe_and_translate calls two other functions — transcribe (an async generator that yields segments) and translate (a single-value function that returns a translated string). Each runs on its own compute; Chalk routes the calls through the function queue automatically.


Resource configuration

Functions accept resource hints that control the compute environment:

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install sentence-transformers",
    ),
    cpu=2,
    memory="4Gi",
    gpu="nvidia-l4",
    secrets=[chalkcompute.Secret("HF_TOKEN")],
    max_batching_size=25,
    max_buffer_duration=2000,
    retries=3,
)
def embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()
ParameterDescription
nameCustom function name. Defaults to the function’s __name__.
imageContainer image to deploy in. Defaults to Image.debian_slim().
cpuCPU resource request (e.g. 2, "500m").
memoryMemory resource request (e.g. "4Gi", "512Mi").
gpuGPU resource request (e.g. "nvidia-l4", "nvidia-a100").
envEnvironment variables to pass to the function.
min_replicasMinimum number of replicas.
max_replicasMaximum number of replicas.
volumesList of (Volume(name), mount_path) tuples for persistent storage.
secretsList of Secret references to inject into the function’s environment.
serialization_formatWire format for arguments and results. Currently only "pyarrow" is supported.
suspend_cooldown_periodTime in seconds until containers shut down after the last request.
optionsArbitrary key-value options forwarded to the Chalk platform.
max_buffer_durationMaximum time in milliseconds to buffer incoming items before invoking the handler. Defaults to 1000 ms when batching is enabled.
max_batching_sizeMaximum number of items to accumulate before invoking the handler. Defaults to 10 when batching is enabled.
retriesRetry policy for handler invocations. Pass an int for simple max-attempts with default exponential backoff, or a RetryPolicy for full control.
rate_limit_keyName of an externally defined rate limit policy for controlling outbound concurrency.

Secrets

Inject secrets into a function using the secrets parameter. Secrets are resolved at deploy time and made available as environment variables inside the container:

import chalkcompute

@chalkcompute.function(
    secrets=[
        chalkcompute.Secret.from_env("OPENAI_API_KEY"),
        chalkcompute.Secret.from_integration("prod_postgres"),
        chalkcompute.Secret.from_local_env("MY_LOCAL_TOKEN"),
    ],
)
def call_api(prompt: str) -> str:
    import os
    import httpx
    key = os.environ["OPENAI_API_KEY"]
    # ...

See the Secrets section of the overview for the full Secret API.


Batching

When your handler is more efficient processing multiple items at once (e.g. GPU batch inference), enable batching with max_buffer_duration and/or max_batching_size. Chalk will accumulate incoming items and invoke the handler in chunks:

@chalkcompute.function(
    max_buffer_duration=1000,  # buffer up to 1000 ms
    max_batching_size=32,      # or up to 32 items
    gpu="nvidia-l4",
)
def batch_embed(texts: list[str]) -> list[list[float]]:
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return model.encode(texts).tolist()

When batching is enabled, handler arguments are delivered as lists or a single pyarrow table. Defaults apply when either parameter is set:

ParameterDefault (when batching enabled)
max_buffer_duration1000 ms
max_batching_size10 items

Retry policies

Functions can automatically retry on failure. Pass a simple integer for max-attempts with default exponential backoff, or use RetryPolicy for full control:

import chalkcompute
from chalkcompute import RetryPolicy

# Simple: retry up to 3 times with exponential backoff
@chalkcompute.function(retries=3)
def flaky_api(query: str) -> str:
    ...

# Exponential backoff with custom parameters
@chalkcompute.function(
    retries=RetryPolicy.exponential(
        attempts=5,
        initial=0.5,
        max_wait=30.0,
    ),
)
def resilient_call(payload: str) -> str:
    ...

# Fixed delay, only retry on specific exceptions
@chalkcompute.function(
    retries=RetryPolicy.linear(
        attempts=3,
        delay=2.0,
        retry_on=(TimeoutError, ConnectionError),
    ),
)
def network_call(url: str) -> str:
    ...

RetryPolicy.exponential()

Exponential backoff — wait times grow geometrically: initial, initial * multiplier, initial * multiplier², …, capped at max_wait:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
initialfloat1.0Base wait time in seconds
multiplierfloat2.0Backoff multiplier
max_waitfloat60.0Upper bound on wait time
jitterboolTrueAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

RetryPolicy.linear()

Fixed delay between retries:

ParameterTypeDefaultDescription
attemptsint3Max retry attempts
delayfloat1.0Fixed wait in seconds
jitterboolFalseAdd random jitter
retry_ontuple[type, ...](Exception,)Exception types that trigger retry

Exception filtering

By default, all Exception subclasses trigger a retry. Pass a tuple of exception classes to retry_on to restrict which failures are retried:

RetryPolicy.exponential(
    attempts=4,
    retry_on=(ConnectionError, TimeoutError, OSError),
)

Non-matching exceptions propagate immediately without consuming retry budget.


Rate limiting

Use rate_limit_key to reference a rate limit policy defined as a resource in your project settings. This controls outbound concurrency from multiple sources calling the same downstream service:

@chalkcompute.function(
    rate_limit_key="provider-api-limit",
)
def call_provider(query: str) -> str:
    ...

Suspend cooldown

Set suspend_cooldown_period to control how long containers stay alive after the last request. This avoids cold starts for bursty workloads while still releasing resources during idle periods:

@chalkcompute.function(
    suspend_cooldown_period=300,  # keep alive for 5 minutes after last call
    gpu="nvidia-l4",
)
def infer(text: str) -> str:
    ...

Volumes

Attach persistent storage to a function with the volumes parameter. Each entry is a (Volume(...), mount_path) tuple. Inside the function, the volume is accessible as an ordinary filesystem path — read and write files normally:

import chalkcompute
from chalkcompute import Volume

@chalkcompute.function(
    volumes=[(Volume("model-cache"), "/models")],
    gpu="nvidia-l4",
)
def predict(text: str) -> list[float]:
    import os
    import json
    from sentence_transformers import SentenceTransformer

    cache_path = "/models/all-MiniLM-L6-v2"
    if os.path.exists(cache_path):
        model = SentenceTransformer(cache_path)
    else:
        model = SentenceTransformer("all-MiniLM-L6-v2")
        model.save(cache_path)

    return model.encode(text).tolist()

Volumes persist across invocations — data written by one call is available to subsequent calls. This is useful for caching large model weights, storing intermediate results, or sharing data between functions that mount the same volume.


Calling from a DataFrame

You can invoke a function on every row of a DataFrame using F.catalog_call() inside a .project() expression. This is useful for batch-processing columnar data — Chalk handles parallelism, batching, and delivery automatically.

For example, define a function that extracts named entities from text:

import chalkcompute

@chalkcompute.function(
    image=chalkcompute.Image.base("python:3.12-slim").run_commands(
        "pip install spacy && python -m spacy download en_core_web_sm",
    ),
    cpu=2,
    memory="4Gi",
)
def extract_entities(memo: str) -> str:
    import json
    import spacy
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(memo)
    return json.dumps([
        {"text": ent.text, "label": ent.label_}
        for ent in doc.ents
    ])

Then apply it across a parquet file of transaction memos:

from chalkcompute import DataFrame, F, _

results = (
    DataFrame.scan("transactions.parquet")
    .project({
        "transaction_id": _.transaction_id,
        "memo": _.memo,
        "entities": F.catalog_call("extract_entities", _.memo),
    })
)

Each row’s memo column is passed to extract_entities and the result is returned as a new entities column. The original DataFrame is not mutated — .project() returns only the columns you specify.


Error handling

If a function raises an exception, the caller receives it as a RuntimeError with the original exception type and message:

@chalkcompute.function
def divide(a: float, b: float) -> float:
    return a / b

divide.wait_ready()

try:
    divide(1.0, 0.0)
except RuntimeError as e:
    print(e)  # ZeroDivisionError: division by zero

For async generators, an exception mid-stream terminates the iterator. Any segments already yielded are still available to the caller — only subsequent iteration raises:

@chalkcompute.function
async def fragile_gen():
    yield "ok-1"
    yield "ok-2"
    raise ValueError("something went wrong")

for item in fragile_gen():
    print(item)
# ok-1
# ok-2
# RuntimeError: ValueError: something went wrong

How it works

Under the hood, @chalkcompute.function uses the Function Queue infrastructure. When you call a function:

  1. Arguments are serialized and sent to the fnq-server via gRPC.
  2. The server assigns a UUID, enqueues the call to Redis, and begins polling a result stream.
  3. An fnq-consumer picks up the work item and forwards it to the function’s worker process.
  4. The worker executes the function and writes results (chunks for generators, or a single value) to the Redis stream.
  5. The server relays each result back to the caller over the gRPC stream.

For async generators, each yield becomes a separate chunk in the result stream — the caller receives it as soon as the server’s next poll picks it up. See the Function Queue page for protocol details.