# Kafka Consumer
source: https://docs.chalk.ai/docs/compute/kafka-consumer

## Subscribe to a Kafka topic from a container and fan out processing to remote functions.

### What we're building

A long-running container that consumes messages from a Kafka topic and dispatches each
one to a remote worker function for processing. The consumer handles backpressure,
offset commits, and retries; the workers are stateless and scale independently.

This pattern is useful when you need to react to a stream of events — new orders,
log entries, user actions — and each event requires non-trivial compute like model
inference, enrichment, or writing to an external system.

### The worker function

Start with the leaf. Each Kafka message contains a JSON payload describing an event.
The worker parses it, does some work, and returns a result. For this example, we'll
classify support tickets and write the label back to a database.

```
# worker.py
import json
import os

import chalkcompute
from chalkcompute import Container, Image, Secret

worker_container = Container(
    image=(
        Image.base("python:3.12-slim")
        .pip_install(["openai", "psycopg2-binary"])
    ),
    cpu="1",
    memory="1Gi",
    min_instances=0,
    max_instances=10,
    max_concurrent_requests=8,
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("DATABASE_URL"),
    ],
)


@chalkcompute.function(name="classify-ticket", container=worker_container)
def classify_ticket(message_json: str) -> str:
    """Classify a support ticket and persist the label."""
    import openai
    import psycopg2

    message = json.loads(message_json)
    ticket_id = message["ticket_id"]
    subject = message["subject"]
    body = message["body"]

    # 1. Classify via LLM.
    client = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "Classify the support ticket into exactly one category: "
                    "billing, bug, feature_request, account, other. "
                    "Respond with only the category name."
                ),
            },
            {"role": "user", "content": f"Subject: {subject}\n\n{body}"},
        ],
    )
    label = response.choices[0].message.content.strip().lower()

    # 2. Write the label back to Postgres.
    conn = psycopg2.connect(os.environ["DATABASE_URL"])
    try:
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE tickets SET category = %s, classified_at = NOW() WHERE id = %s",
                (label, ticket_id),
            )
        conn.commit()
    finally:
        conn.close()

    return f"{ticket_id}:{label}"
```

Deploy the worker:

```
python worker.py
```

With max_instances=10 and max_concurrent_requests=8, up to 80 tickets can be
classified in parallel. Chalk scales the workers up when the queue backs up and
down to zero when it drains.

### The Kafka consumer

The consumer is a long-running container that reads from a topic and calls the
worker function for each message. It runs a simple consume loop — no framework
needed beyond confluent-kafka.

```
# consumer.py
import json
import os
import logging
import asyncio
from concurrent.futures import Future

import chalkcompute
from confluent_kafka import Consumer, KafkaError

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__)

TOPIC = os.environ.get("KAFKA_TOPIC", "support-tickets")
GROUP_ID = os.environ.get("KAFKA_GROUP_ID", "ticket-classifier")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "20"))


def main() -> None:
    consumer = Consumer({
        "bootstrap.servers": os.environ["KAFKA_BROKERS"],
        "group.id": GROUP_ID,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "security.protocol": os.environ.get("KAFKA_SECURITY_PROTOCOL", "SASL_SSL"),
        "sasl.mechanisms": "PLAIN",
        "sasl.username": os.environ["KAFKA_USERNAME"],
        "sasl.password": os.environ["KAFKA_PASSWORD"],
    })
    consumer.subscribe([TOPIC])

    # Get a reference to the worker function.
    classify = chalkcompute.function_ref("classify-ticket")
    classify.wait_ready()

    log.info("Consuming from %s (group: %s)", TOPIC, GROUP_ID)

    pending: list[tuple[str, Future]] = []

    try:
        while True:
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                log.error("Consumer error: %s", msg.error())
                continue

            # Dispatch to the worker.
            value = msg.value().decode("utf-8")
            future = classify.async_call(value)
            pending.append((value, future))

            # Flush in batches to balance throughput and latency.
            if len(pending) >= BATCH_SIZE:
                _flush(pending)
                consumer.commit()
                pending = []

    except KeyboardInterrupt:
        log.info("Shutting down...")
    finally:
        if pending:
            _flush(pending)
            consumer.commit()
        consumer.close()


def _flush(pending: list[tuple[str, Future]]) -> None:
    """Wait for all in-flight worker calls to complete."""
    for value, future in pending:
        try:
            result = future.get(timeout=60)
            log.info("Classified: %s", result)
        except Exception:
            ticket = json.loads(value).get("ticket_id", "?")
            log.exception("Failed to classify ticket %s", ticket)

    log.info("Flushed batch of %d messages", len(pending))


if __name__ == "__main__":
    main()
```

The consumer commits offsets only after the entire batch has been processed. If a
worker fails, the message will be redelivered on the next consumer restart — no
data loss.

### Deploy the consumer

The consumer runs as a plain container (not a function). It needs Kafka credentials
and chalkcompute to call the worker.

```
# deploy_consumer.py
from chalkcompute import Container, Image, Secret

image = (
    Image.base("python:3.12-slim")
    .pip_install(["confluent-kafka", "chalkcompute"])
    .add_local_file("consumer.py", "/app/consumer.py")
)

container = Container(
    image=image,
    name="ticket-consumer",
    cpu="1",
    memory="1Gi",
    secrets=[
        Secret.from_env("KAFKA_BROKERS"),
        Secret.from_env("KAFKA_USERNAME"),
        Secret.from_env("KAFKA_PASSWORD"),
    ],
    env={
        "KAFKA_TOPIC": "support-tickets",
        "KAFKA_GROUP_ID": "ticket-classifier",
        "BATCH_SIZE": "20",
    },
    entrypoint=["python", "/app/consumer.py"],
).run()

print(f"Consumer running: {container.info.web_url}")
```

```
python deploy_consumer.py
# Consumer running: https://xxxx.compute.chalk.ai
```

### How it fits together

```
  Kafka                  Consumer Container          classify-ticket workers
  (support-tickets)                                  (0–10 instances)
    │                         │                           │
    │  msg: {ticket_id,       │                           │
    │    subject, body}       │                           │
    │────────────────────────▸│                           │
    │  msg                    │                           │
    │────────────────────────▸│                           │
    │  msg                    │                           │
    │────────────────────────▸│                           │
    │  ...                    │                           │
    │────────────────────────▸│                           │
    │                         │                           │
    │                         │  async_call(msg_json) x N │
    │                         │──────────────────────────▸│──▸ OpenAI (classify)
    │                         │                           │──▸ Postgres (persist)
    │                         │                           │
    │                         │   results (batch)         │
    │                         │◂──────────────────────────│
    │                         │                           │
    │◂─────────────────────── │                           │
    │   commit offsets        │                           │
    │                         │                           │
```

The consumer polls messages, fans them out as async function calls, waits for the
batch to complete, then commits. Workers scale independently — the consumer stays
at one instance while workers expand to handle throughput spikes.

### Scaling the consumer

For topics with many partitions, run multiple consumer instances in the same
consumer group. Kafka will distribute partitions across them automatically:

```
container = Container(
    image=image,
    name="ticket-consumer",
    cpu="1",
    memory="1Gi",
    min_instances=2,
    max_instances=5,
    secrets=[
        Secret.from_env("KAFKA_BROKERS"),
        Secret.from_env("KAFKA_USERNAME"),
        Secret.from_env("KAFKA_PASSWORD"),
    ],
    env={
        "KAFKA_TOPIC": "support-tickets",
        "KAFKA_GROUP_ID": "ticket-classifier",
        "BATCH_SIZE": "20",
    },
    entrypoint=["python", "/app/consumer.py"],
).run()
```

With min_instances=2, you always have two consumers sharing the partition load.
Set max_instances to no more than your topic's partition count — extra consumers
beyond that will sit idle.





