Compute
Asynchronous remote function execution over Redis with streaming results.
The Function Queue is an asynchronous execution bridge between gRPC callers and Python function workers. It decouples the call site from the execution site using Redis as an intermediary, enabling callers to submit function invocations and stream back results — including multi-chunk generator outputs — without holding a direct connection to the worker.
The system consists of two cooperating processes:
CallFunction RPCs, assigns each call a
UUID, enqueues it to a Redis list, and polls a Redis stream for results to relay back
to the caller.RemoteCallService (the actual Python worker), and writes responses back to the Redis
stream.┌──────────┐ gRPC (bidi stream) ┌──────────────┐
│ Caller │ ─────────────────────────────────▸ │ fnq-server │
│ │ ◂───────────────────────────────── │ │
└──────────┘ CallFunctionRequest/Response └──────┬───────┘
│
LPUSH │ XREAD
(enqueue call) │ (poll for results)
│
┌──────▾───────┐
│ Redis │
│ │
│ List: │
│ function_ │
│ queue │
│ │
│ Stream: │
│ gen:{id}: │
│ stream │
└──────┬───────┘
│
RPOP │ XADD
(pop work) │ (write results)
│
┌──────▾───────┐
│ fnq-consumer │
└──────┬───────┘
│
gRPC │
(bidi stream) │
│
┌──────▾───────┐
│ Backend │
│ (Python │
│ worker) │
└──────────────┘
A complete request lifecycle looks like this:
Caller fnq-server Redis fnq-consumer Backend
│ │ │ │ │
│ CallFunctionRequest │ │ │ │
│────────────────────▸│ │ │ │
│ │ LPUSH envelope │ │ │
│ │────────────────────▸│ │ │
│ │ XADD sentinel │ │ │
│ │ EXPIRE 24h │ │ │
│ │────────────────────▸│ │ │
│ │ │ │ │
│ │ │ RPOP │ │
│ │ │◂──────────────────────│ │
│ │ │ │ │
│ │ │ │ CallFunctionRequest │
│ │ │ │─────────────────────▸│
│ │ │ │ │
│ │ │ │ CallFunctionResponse │
│ │ │ │◂─────────────────────│
│ │ │ XADD chunk seq=1 │ │
│ │ │◂──────────────────────│ │
│ │ │ │ │
│ │ XREAD │ │ (stream ends) │
│ │◂────────────────────│ │ │
│ CallFunctionResponse│ │ XADD end final=1 │ │
│◂────────────────────│ │◂──────────────────────│ │
│ │ XREAD │ │ │
│ │◂────────────────────│ │ │
│ (stream ends) │ │ │ │
Each function invocation gets a dedicated Redis stream at gen:{call_id}:stream with a
24-hour TTL. The worker writes entries to this stream as results become available, and
the server polls it to relay results back to the caller.
Every stream entry has four fields:
| Field | Description |
|---|---|
type | One of chunk, end, or error |
seq | Monotonically increasing integer |
data | Payload bytes (Arrow IPC, JSON, or raw) |
final | "0" or "1" — marks the terminal entry |
The protocol supports three outcome shapes.
The function returns a single value. The worker writes one chunk entry followed by
end:
XADD gen:{id}:stream * type chunk seq 1 data <bytes> final 0
XADD gen:{id}:stream * type end seq 2 data "" final 1
The function is an async generator that yields multiple values. Each yield becomes a
chunk entry, and the stream terminates with end when the generator is exhausted:
XADD gen:{id}:stream * type chunk seq 1 data <bytes> final 0
XADD gen:{id}:stream * type chunk seq 2 data <bytes> final 0
XADD gen:{id}:stream * type chunk seq 3 data <bytes> final 0
XADD gen:{id}:stream * type end seq 4 data "" final 1
The function raises an exception. The worker writes a single error entry with a
JSON-serialized exception payload:
XADD gen:{id}:stream * type error seq 1 data '{"exc_type":"ValueError","message":"...","traceback":"..."}' final 1
The result stream protocol is designed around async generators as the primary abstraction. A single return value is just a generator that yields once. This means callers can start processing partial results before the function finishes — useful for streaming inference, pagination, or any computation that produces output incrementally.
Here is a Python example showing how a worker would produce results for an async generator, and how a consumer reads them:
import asyncio
import json
import redis.asyncio as redis
async def execute_and_stream(
conn: redis.Redis,
call_id: str,
func,
args,
):
"""Execute an async generator and write each yield to the result stream."""
stream_key = f"gen:{call_id}:stream"
seq = 1
try:
async for chunk in func(*args):
await conn.xadd(stream_key, {
"type": "chunk",
"seq": str(seq),
"data": chunk,
"final": "0",
})
seq += 1
# Generator exhausted — write terminal entry.
await conn.xadd(stream_key, {
"type": "end",
"seq": str(seq),
"data": b"",
"final": "1",
})
except Exception as exc:
# Write exception so the caller unblocks.
await conn.xadd(stream_key, {
"type": "error",
"seq": str(seq),
"data": json.dumps({
"exc_type": type(exc).__name__,
"message": str(exc),
}),
"final": "1",
})async def read_results(conn: redis.Redis, call_id: str):
"""Read chunks from the result stream as they arrive."""
stream_key = f"gen:{call_id}:stream"
last_id = "0-0"
while True:
entries = await conn.xread(
{stream_key: last_id}, count=64, block=5000
)
for _, messages in entries:
for msg_id, fields in messages:
last_id = msg_id
entry_type = fields[b"type"].decode()
if entry_type == "chunk":
yield fields[b"data"]
elif entry_type == "end":
return
elif entry_type == "error":
exc = json.loads(fields[b"data"])
raise RuntimeError(
f"{exc['exc_type']}: {exc['message']}"
)async def generate_squares(n: int):
"""An async generator that yields squares one at a time."""
for i in range(n):
await asyncio.sleep(0.1) # simulate work
yield str(i * i).encode()
async def main():
conn = redis.Redis()
call_id = "example-call-123"
# Worker writes results as they're produced.
worker = asyncio.create_task(
execute_and_stream(conn, call_id, generate_squares, (5,))
)
# Caller reads results as they arrive.
async for chunk in read_results(conn, call_id):
print(f"received: {chunk.decode()}")
await worker
# Output:
# received: 0
# received: 1
# received: 4
# received: 9
# received: 16The caller begins receiving chunks as soon as the first `yield` occurs — it does not wait for the generator to finish. This makes the protocol suitable for streaming LLM token output, incremental ETL results, or any long-running computation that can report progress.
Each function call is serialized as JSON and pushed onto a Redis list:
{
"call_id": "a1b2c3d4-...",
"envelope": {
"call": {
"name": "extract_links",
"feather_stream": "<base64-encoded Arrow IPC bytes>"
},
"metadata": {
"otel_headers": {
"traceparent": "00-abc123-def456-01"
}
}
}
}The call_id tells the worker which Redis stream key (gen:{call_id}:stream) to write
results to. The metadata.otel_headers field carries OpenTelemetry context so
distributed traces span the full call path.
Both binaries read configuration from environment variables with the prefix CHALK_FNQ_:
| Variable | Default | Used by | Description |
|---|---|---|---|
CHALK_FNQ_REDIS_URL | redis://localhost:6379 | both | Redis connection URL |
CHALK_FNQ_QUEUE_KEY | function_queue | both | Redis list key for the work queue |
CHALK_FNQ_LISTEN_ADDR | [::]:50051 | server | gRPC listen address |
CHALK_FNQ_BACKEND_ENDPOINT | http://localhost:8888 | consumer | Backend RemoteCallService endpoint |
The consumer proxy selects TLS or plaintext based on the URL scheme — https:// enables
TLS with system CA roots, http:// uses plaintext.
A typical deployment runs three components:
# Deploy to a namespace
kubectl apply -f k8s/redis.yaml -f k8s/function-queue.yaml -n <namespace>The consumer can be scaled independently of the server. Adding more consumer replicas increases throughput since they compete on the same Redis list (RPOP is atomic).
The gRPC interface is defined in chalk.runtime.v1.remote_python_call:
service RemoteCallService {
rpc CallFunction(stream CallFunctionRequest) returns (stream CallFunctionResponse) {}
}
message CallFunctionRequest {
string name = 1;
bytes feather_stream = 2;
}
message CallFunctionResponse {
bytes feather_stream = 1;
}The feather_stream field carries arguments and results in Apache Arrow IPC format. Both
directions are bidirectional streams — the caller can send multiple requests on a single
connection, and the server streams back results as they arrive from each call’s result
stream.