What we're building

A long-running container that consumes messages from a Kafka topic and dispatches each one to a remote worker function for processing. The consumer handles backpressure, offset commits, and retries; the workers are stateless and scale independently.

This pattern is useful when you need to react to a stream of events — new orders, log entries, user actions — and each event requires non-trivial compute like model inference, enrichment, or writing to an external system.


The worker function

Start with the leaf. Each Kafka message contains a JSON payload describing an event. The worker parses it, does some work, and returns a result. For this example, we’ll classify support tickets and write the label back to a database.

# worker.py
import json
import os

import chalkcompute
from chalkcompute import Container, Image, Secret

worker_container = Container(
    image=(
        Image.base("python:3.12-slim")
        .pip_install(["openai", "psycopg2-binary"])
    ),
    cpu="1",
    memory="1Gi",
    min_instances=0,
    max_instances=10,
    max_concurrent_requests=8,
    secrets=[
        Secret(name="OPENAI_API_KEY"),
        Secret(name="DATABASE_URL"),
    ],
)


@chalkcompute.function(name="classify-ticket", container=worker_container)
def classify_ticket(message_json: str) -> str:
    """Classify a support ticket and persist the label."""
    import openai
    import psycopg2

    message = json.loads(message_json)
    ticket_id = message["ticket_id"]
    subject = message["subject"]
    body = message["body"]

    # 1. Classify via LLM.
    client = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "Classify the support ticket into exactly one category: "
                    "billing, bug, feature_request, account, other. "
                    "Respond with only the category name."
                ),
            },
            {"role": "user", "content": f"Subject: {subject}\n\n{body}"},
        ],
    )
    label = response.choices[0].message.content.strip().lower()

    # 2. Write the label back to Postgres.
    conn = psycopg2.connect(os.environ["DATABASE_URL"])
    try:
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE tickets SET category = %s, classified_at = NOW() WHERE id = %s",
                (label, ticket_id),
            )
        conn.commit()
    finally:
        conn.close()

    return f"{ticket_id}:{label}"

Deploy the worker:

chalk compute deploy worker.py
# Deployed function "classify-ticket" on container ...

With max_instances=10 and max_concurrent_requests=8, up to 80 tickets can be classified in parallel. Chalk scales the workers up when the queue backs up and down to zero when it drains.


The Kafka consumer

The consumer is a long-running container that reads from a topic and calls the worker function for each message. It runs a simple consume loop — no framework needed beyond confluent-kafka.

# consumer.py
import json
import os
import logging
import asyncio
from concurrent.futures import Future

import chalkcompute
from confluent_kafka import Consumer, KafkaError

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__)

TOPIC = os.environ.get("KAFKA_TOPIC", "support-tickets")
GROUP_ID = os.environ.get("KAFKA_GROUP_ID", "ticket-classifier")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "20"))


def main() -> None:
    consumer = Consumer({
        "bootstrap.servers": os.environ["KAFKA_BROKERS"],
        "group.id": GROUP_ID,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "security.protocol": os.environ.get("KAFKA_SECURITY_PROTOCOL", "SASL_SSL"),
        "sasl.mechanisms": "PLAIN",
        "sasl.username": os.environ["KAFKA_USERNAME"],
        "sasl.password": os.environ["KAFKA_PASSWORD"],
    })
    consumer.subscribe([TOPIC])

    # Get a reference to the worker function.
    classify = chalkcompute.function_ref("classify-ticket")
    classify.wait_ready()

    log.info("Consuming from %s (group: %s)", TOPIC, GROUP_ID)

    pending: list[tuple[str, Future]] = []

    try:
        while True:
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                log.error("Consumer error: %s", msg.error())
                continue

            # Dispatch to the worker.
            value = msg.value().decode("utf-8")
            future = classify.async_call(value)
            pending.append((value, future))

            # Flush in batches to balance throughput and latency.
            if len(pending) >= BATCH_SIZE:
                _flush(pending)
                consumer.commit()
                pending = []

    except KeyboardInterrupt:
        log.info("Shutting down...")
    finally:
        if pending:
            _flush(pending)
            consumer.commit()
        consumer.close()


def _flush(pending: list[tuple[str, Future]]) -> None:
    """Wait for all in-flight worker calls to complete."""
    for value, future in pending:
        try:
            result = future.result(timeout=60)
            log.info("Classified: %s", result)
        except Exception:
            ticket = json.loads(value).get("ticket_id", "?")
            log.exception("Failed to classify ticket %s", ticket)

    log.info("Flushed batch of %d messages", len(pending))


if __name__ == "__main__":
    main()

The consumer commits offsets only after the entire batch has been processed. If a worker fails, the message will be redelivered on the next consumer restart — no data loss.


Deploy the consumer

The consumer runs as a plain container (not a function). It needs Kafka credentials and chalkcompute to call the worker.

# deploy_consumer.py
from chalkcompute import Container, Image, Secret

image = (
    Image.base("python:3.12-slim")
    .pip_install(["confluent-kafka", "chalkcompute"])
    .add_local_file("consumer.py", "/app/consumer.py")
)

container = Container(
    image=image,
    name="ticket-consumer",
    cpu="1",
    memory="1Gi",
    secrets=[
        Secret(name="KAFKA_BROKERS"),
        Secret(name="KAFKA_USERNAME"),
        Secret(name="KAFKA_PASSWORD"),
    ],
    env={
        "KAFKA_TOPIC": "support-tickets",
        "KAFKA_GROUP_ID": "ticket-classifier",
        "BATCH_SIZE": "20",
    },
    entrypoint=["python", "/app/consumer.py"],
).run()

print(f"Consumer running: {container.info.web_url}")
chalk compute deploy deploy_consumer.py
# Deployed container ...

How it fits together

  Kafka                  Consumer Container          classify-ticket workers
  (support-tickets)                                  (0–10 instances)
    │                         │                           │
    │  msg: {ticket_id,       │                           │
    │    subject, body}       │                           │
    │────────────────────────▸│                           │
    │  msg                    │                           │
    │────────────────────────▸│                           │
    │  msg                    │                           │
    │────────────────────────▸│                           │
    │  ...                    │                           │
    │────────────────────────▸│                           │
    │                         │                           │
    │                         │  async_call(msg_json) x N │
    │                         │──────────────────────────▸│──▸ OpenAI (classify)
    │                         │                           │──▸ Postgres (persist)
    │                         │                           │
    │                         │   results (batch)         │
    │                         │◂──────────────────────────│
    │                         │                           │
    │◂─────────────────────── │                           │
    │   commit offsets        │                           │
    │                         │                           │

The consumer polls messages, fans them out as async function calls, waits for the batch to complete, then commits. Workers scale independently — the consumer stays at one instance while workers expand to handle throughput spikes.


Scaling the consumer

For topics with many partitions, run multiple consumer instances in the same consumer group. Kafka will distribute partitions across them automatically:

container = Container(
    image=image,
    name="ticket-consumer",
    cpu="1",
    memory="1Gi",
    min_instances=2,
    max_instances=5,
    secrets=[
        Secret(name="KAFKA_BROKERS"),
        Secret(name="KAFKA_USERNAME"),
        Secret(name="KAFKA_PASSWORD"),
    ],
    env={
        "KAFKA_TOPIC": "support-tickets",
        "KAFKA_GROUP_ID": "ticket-classifier",
        "BATCH_SIZE": "20",
    },
    entrypoint=["python", "/app/consumer.py"],
).run()

With min_instances=2, you always have two consumers sharing the partition load. Set max_instances to no more than your topic’s partition count — extra consumers beyond that will sit idle.