Overview

The Function Queue is an asynchronous execution bridge between gRPC callers and Python function workers. It decouples the call site from the execution site using Redis as an intermediary, enabling callers to submit function invocations and stream back results — including multi-chunk generator outputs — without holding a direct connection to the worker.

The system consists of two cooperating processes:

  • fnq-server — a gRPC frontend that accepts CallFunction RPCs, assigns each call a UUID, enqueues it to a Redis list, and polls a Redis stream for results to relay back to the caller.
  • fnq-consumer — a queue consumer that pops work items, forwards them to a backend RemoteCallService (the actual Python worker), and writes responses back to the Redis stream.

Architecture

┌──────────┐         gRPC (bidi stream)         ┌──────────────┐
│  Caller  │ ─────────────────────────────────▸ │  fnq-server  │
│          │ ◂───────────────────────────────── │              │
└──────────┘   CallFunctionRequest/Response     └──────┬───────┘
                                                       │
                                          LPUSH        │       XREAD
                                     (enqueue call)    │  (poll for results)
                                                       │
                                                ┌──────▾───────┐
                                                │    Redis     │
                                                │              │
                                                │  List:       │
                                                │   function_  │
                                                │   queue      │
                                                │              │
                                                │  Stream:     │
                                                │   gen:{id}:  │
                                                │   stream     │
                                                └──────┬───────┘
                                                       │
                                          RPOP         │       XADD
                                      (pop work)       │  (write results)
                                                       │
                                                ┌──────▾───────┐
                                                │ fnq-consumer │
                                                └──────┬───────┘
                                                       │
                                          gRPC         │
                                     (bidi stream)     │
                                                       │
                                                ┌──────▾───────┐
                                                │   Backend    │
                                                │  (Python     │
                                                │   worker)    │
                                                └──────────────┘

Sequence Flow

A complete request lifecycle looks like this:

Caller              fnq-server              Redis                fnq-consumer             Backend
  │                     │                     │                       │                      │
  │ CallFunctionRequest │                     │                       │                      │
  │────────────────────▸│                     │                       │                      │
  │                     │ LPUSH envelope      │                       │                      │
  │                     │────────────────────▸│                       │                      │
  │                     │ XADD sentinel       │                       │                      │
  │                     │ EXPIRE 24h          │                       │                      │
  │                     │────────────────────▸│                       │                      │
  │                     │                     │                       │                      │
  │                     │                     │ RPOP                  │                      │
  │                     │                     │◂──────────────────────│                      │
  │                     │                     │                       │                      │
  │                     │                     │                       │ CallFunctionRequest  │
  │                     │                     │                       │─────────────────────▸│
  │                     │                     │                       │                      │
  │                     │                     │                       │ CallFunctionResponse │
  │                     │                     │                       │◂─────────────────────│
  │                     │                     │ XADD chunk seq=1      │                      │
  │                     │                     │◂──────────────────────│                      │
  │                     │                     │                       │                      │
  │                     │ XREAD               │                       │ (stream ends)        │
  │                     │◂────────────────────│                       │                      │
  │ CallFunctionResponse│                     │ XADD end final=1      │                      │
  │◂────────────────────│                     │◂──────────────────────│                      │
  │                     │ XREAD               │                       │                      │
  │                     │◂────────────────────│                       │                      │
  │    (stream ends)    │                     │                       │                      │

Result Stream Protocol

Each function invocation gets a dedicated Redis stream at gen:{call_id}:stream with a 24-hour TTL. The worker writes entries to this stream as results become available, and the server polls it to relay results back to the caller.

Every stream entry has four fields:

FieldDescription
typeOne of chunk, end, or error
seqMonotonically increasing integer
dataPayload bytes (Arrow IPC, JSON, or raw)
final"0" or "1" — marks the terminal entry

The protocol supports three outcome shapes.

Single result

The function returns a single value. The worker writes one chunk entry followed by end:

XADD gen:{id}:stream * type chunk seq 1 data <bytes> final 0
XADD gen:{id}:stream * type end   seq 2 data ""      final 1

Generator (async)

The function is an async generator that yields multiple values. Each yield becomes a chunk entry, and the stream terminates with end when the generator is exhausted:

XADD gen:{id}:stream * type chunk seq 1 data <bytes> final 0
XADD gen:{id}:stream * type chunk seq 2 data <bytes> final 0
XADD gen:{id}:stream * type chunk seq 3 data <bytes> final 0
XADD gen:{id}:stream * type end   seq 4 data ""      final 1

Exception

The function raises an exception. The worker writes a single error entry with a JSON-serialized exception payload:

XADD gen:{id}:stream * type error seq 1 data '{"exc_type":"ValueError","message":"...","traceback":"..."}' final 1

Async Generator Support

The result stream protocol is designed around async generators as the primary abstraction. A single return value is just a generator that yields once. This means callers can start processing partial results before the function finishes — useful for streaming inference, pagination, or any computation that produces output incrementally.

Here is a Python example showing how a worker would produce results for an async generator, and how a consumer reads them:

Worker side

import asyncio
import json
import redis.asyncio as redis

async def execute_and_stream(
    conn: redis.Redis,
    call_id: str,
    func,
    args,
):
    """Execute an async generator and write each yield to the result stream."""
    stream_key = f"gen:{call_id}:stream"
    seq = 1

    try:
        async for chunk in func(*args):
            await conn.xadd(stream_key, {
                "type": "chunk",
                "seq": str(seq),
                "data": chunk,
                "final": "0",
            })
            seq += 1

        # Generator exhausted — write terminal entry.
        await conn.xadd(stream_key, {
            "type": "end",
            "seq": str(seq),
            "data": b"",
            "final": "1",
        })

    except Exception as exc:
        # Write exception so the caller unblocks.
        await conn.xadd(stream_key, {
            "type": "error",
            "seq": str(seq),
            "data": json.dumps({
                "exc_type": type(exc).__name__,
                "message": str(exc),
            }),
            "final": "1",
        })

Caller side

async def read_results(conn: redis.Redis, call_id: str):
    """Read chunks from the result stream as they arrive."""
    stream_key = f"gen:{call_id}:stream"
    last_id = "0-0"

    while True:
        entries = await conn.xread(
            {stream_key: last_id}, count=64, block=5000
        )

        for _, messages in entries:
            for msg_id, fields in messages:
                last_id = msg_id
                entry_type = fields[b"type"].decode()

                if entry_type == "chunk":
                    yield fields[b"data"]
                elif entry_type == "end":
                    return
                elif entry_type == "error":
                    exc = json.loads(fields[b"data"])
                    raise RuntimeError(
                        f"{exc['exc_type']}: {exc['message']}"
                    )

End-to-end example

async def generate_squares(n: int):
    """An async generator that yields squares one at a time."""
    for i in range(n):
        await asyncio.sleep(0.1)  # simulate work
        yield str(i * i).encode()


async def main():
    conn = redis.Redis()
    call_id = "example-call-123"

    # Worker writes results as they're produced.
    worker = asyncio.create_task(
        execute_and_stream(conn, call_id, generate_squares, (5,))
    )

    # Caller reads results as they arrive.
    async for chunk in read_results(conn, call_id):
        print(f"received: {chunk.decode()}")

    await worker
    # Output:
    #   received: 0
    #   received: 1
    #   received: 4
    #   received: 9
    #   received: 16

The caller begins receiving chunks as soon as the first `yield` occurs — it does not wait for the generator to finish. This makes the protocol suitable for streaming LLM token output, incremental ETL results, or any long-running computation that can report progress.


Queue Envelope Format

Each function call is serialized as JSON and pushed onto a Redis list:

{
  "call_id": "a1b2c3d4-...",
  "envelope": {
    "call": {
      "name": "extract_links",
      "feather_stream": "<base64-encoded Arrow IPC bytes>"
    },
    "metadata": {
      "otel_headers": {
        "traceparent": "00-abc123-def456-01"
      }
    }
  }
}

The call_id tells the worker which Redis stream key (gen:{call_id}:stream) to write results to. The metadata.otel_headers field carries OpenTelemetry context so distributed traces span the full call path.


Configuration

Both binaries read configuration from environment variables with the prefix CHALK_FNQ_:

VariableDefaultUsed byDescription
CHALK_FNQ_REDIS_URLredis://localhost:6379bothRedis connection URL
CHALK_FNQ_QUEUE_KEYfunction_queuebothRedis list key for the work queue
CHALK_FNQ_LISTEN_ADDR[::]:50051servergRPC listen address
CHALK_FNQ_BACKEND_ENDPOINThttp://localhost:8888consumerBackend RemoteCallService endpoint

The consumer proxy selects TLS or plaintext based on the URL scheme — https:// enables TLS with system CA roots, http:// uses plaintext.


Deployment

A typical deployment runs three components:

  1. Redis — an in-cluster Redis instance (no persistence needed).
  2. fnq-server — the gRPC frontend, exposed as a Kubernetes Service.
  3. fnq-consumer — one or more consumer replicas polling the queue.
# Deploy to a namespace
kubectl apply -f k8s/redis.yaml -f k8s/function-queue.yaml -n <namespace>

The consumer can be scaled independently of the server. Adding more consumer replicas increases throughput since they compete on the same Redis list (RPOP is atomic).


Proto Definition

The gRPC interface is defined in chalk.runtime.v1.remote_python_call:

service RemoteCallService {
  rpc CallFunction(stream CallFunctionRequest) returns (stream CallFunctionResponse) {}
}

message CallFunctionRequest {
  string name = 1;
  bytes feather_stream = 2;
}

message CallFunctionResponse {
  bytes feather_stream = 1;
}

The feather_stream field carries arguments and results in Apache Arrow IPC format. Both directions are bidirectional streams — the caller can send multiple requests on a single connection, and the server streams back results as they arrive from each call’s result stream.