Compute
Subscribe to a Kafka topic from a container and fan out processing to remote functions.
A long-running container that consumes messages from a Kafka topic and dispatches each one to a remote worker function for processing. The consumer handles backpressure, offset commits, and retries; the workers are stateless and scale independently.
This pattern is useful when you need to react to a stream of events — new orders, log entries, user actions — and each event requires non-trivial compute like model inference, enrichment, or writing to an external system.
Start with the leaf. Each Kafka message contains a JSON payload describing an event. The worker parses it, does some work, and returns a result. For this example, we’ll classify support tickets and write the label back to a database.
# worker.py
import json
import os
import chalkcompute
from chalkcompute import Container, Image, Secret
worker_container = Container(
image=(
Image.base("python:3.12-slim")
.pip_install(["openai", "psycopg2-binary"])
),
cpu="1",
memory="1Gi",
min_instances=0,
max_instances=10,
max_concurrent_requests=8,
secrets=[
Secret(name="OPENAI_API_KEY"),
Secret(name="DATABASE_URL"),
],
)
@chalkcompute.function(name="classify-ticket", container=worker_container)
def classify_ticket(message_json: str) -> str:
"""Classify a support ticket and persist the label."""
import openai
import psycopg2
message = json.loads(message_json)
ticket_id = message["ticket_id"]
subject = message["subject"]
body = message["body"]
# 1. Classify via LLM.
client = openai.OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
base_url="https://router.chalk.ai/v1",
)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"Classify the support ticket into exactly one category: "
"billing, bug, feature_request, account, other. "
"Respond with only the category name."
),
},
{"role": "user", "content": f"Subject: {subject}\n\n{body}"},
],
)
label = response.choices[0].message.content.strip().lower()
# 2. Write the label back to Postgres.
conn = psycopg2.connect(os.environ["DATABASE_URL"])
try:
with conn.cursor() as cur:
cur.execute(
"UPDATE tickets SET category = %s, classified_at = NOW() WHERE id = %s",
(label, ticket_id),
)
conn.commit()
finally:
conn.close()
return f"{ticket_id}:{label}"Deploy the worker:
chalk compute deploy worker.py
# Deployed function "classify-ticket" on container ...With max_instances=10 and max_concurrent_requests=8, up to 80 tickets can be
classified in parallel. Chalk scales the workers up when the queue backs up and
down to zero when it drains.
The consumer is a long-running container that reads from a topic and calls the
worker function for each message. It runs a simple consume loop — no framework
needed beyond confluent-kafka.
# consumer.py
import json
import os
import logging
import asyncio
from concurrent.futures import Future
import chalkcompute
from confluent_kafka import Consumer, KafkaError
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__)
TOPIC = os.environ.get("KAFKA_TOPIC", "support-tickets")
GROUP_ID = os.environ.get("KAFKA_GROUP_ID", "ticket-classifier")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "20"))
def main() -> None:
consumer = Consumer({
"bootstrap.servers": os.environ["KAFKA_BROKERS"],
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"security.protocol": os.environ.get("KAFKA_SECURITY_PROTOCOL", "SASL_SSL"),
"sasl.mechanisms": "PLAIN",
"sasl.username": os.environ["KAFKA_USERNAME"],
"sasl.password": os.environ["KAFKA_PASSWORD"],
})
consumer.subscribe([TOPIC])
# Get a reference to the worker function.
classify = chalkcompute.function_ref("classify-ticket")
classify.wait_ready()
log.info("Consuming from %s (group: %s)", TOPIC, GROUP_ID)
pending: list[tuple[str, Future]] = []
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
log.error("Consumer error: %s", msg.error())
continue
# Dispatch to the worker.
value = msg.value().decode("utf-8")
future = classify.async_call(value)
pending.append((value, future))
# Flush in batches to balance throughput and latency.
if len(pending) >= BATCH_SIZE:
_flush(pending)
consumer.commit()
pending = []
except KeyboardInterrupt:
log.info("Shutting down...")
finally:
if pending:
_flush(pending)
consumer.commit()
consumer.close()
def _flush(pending: list[tuple[str, Future]]) -> None:
"""Wait for all in-flight worker calls to complete."""
for value, future in pending:
try:
result = future.result(timeout=60)
log.info("Classified: %s", result)
except Exception:
ticket = json.loads(value).get("ticket_id", "?")
log.exception("Failed to classify ticket %s", ticket)
log.info("Flushed batch of %d messages", len(pending))
if __name__ == "__main__":
main()The consumer commits offsets only after the entire batch has been processed. If a worker fails, the message will be redelivered on the next consumer restart — no data loss.
The consumer runs as a plain container (not a function). It needs Kafka credentials
and chalkcompute to call the worker.
# deploy_consumer.py
from chalkcompute import Container, Image, Secret
image = (
Image.base("python:3.12-slim")
.pip_install(["confluent-kafka", "chalkcompute"])
.add_local_file("consumer.py", "/app/consumer.py")
)
container = Container(
image=image,
name="ticket-consumer",
cpu="1",
memory="1Gi",
secrets=[
Secret(name="KAFKA_BROKERS"),
Secret(name="KAFKA_USERNAME"),
Secret(name="KAFKA_PASSWORD"),
],
env={
"KAFKA_TOPIC": "support-tickets",
"KAFKA_GROUP_ID": "ticket-classifier",
"BATCH_SIZE": "20",
},
entrypoint=["python", "/app/consumer.py"],
).run()
print(f"Consumer running: {container.info.web_url}")chalk compute deploy deploy_consumer.py
# Deployed container ... Kafka Consumer Container classify-ticket workers
(support-tickets) (0–10 instances)
│ │ │
│ msg: {ticket_id, │ │
│ subject, body} │ │
│────────────────────────▸│ │
│ msg │ │
│────────────────────────▸│ │
│ msg │ │
│────────────────────────▸│ │
│ ... │ │
│────────────────────────▸│ │
│ │ │
│ │ async_call(msg_json) x N │
│ │──────────────────────────▸│──▸ OpenAI (classify)
│ │ │──▸ Postgres (persist)
│ │ │
│ │ results (batch) │
│ │◂──────────────────────────│
│ │ │
│◂─────────────────────── │ │
│ commit offsets │ │
│ │ │
The consumer polls messages, fans them out as async function calls, waits for the batch to complete, then commits. Workers scale independently — the consumer stays at one instance while workers expand to handle throughput spikes.
For topics with many partitions, run multiple consumer instances in the same consumer group. Kafka will distribute partitions across them automatically:
container = Container(
image=image,
name="ticket-consumer",
cpu="1",
memory="1Gi",
min_instances=2,
max_instances=5,
secrets=[
Secret(name="KAFKA_BROKERS"),
Secret(name="KAFKA_USERNAME"),
Secret(name="KAFKA_PASSWORD"),
],
env={
"KAFKA_TOPIC": "support-tickets",
"KAFKA_GROUP_ID": "ticket-classifier",
"BATCH_SIZE": "20",
},
entrypoint=["python", "/app/consumer.py"],
).run()With min_instances=2, you always have two consumers sharing the partition load.
Set max_instances to no more than your topic’s partition count — extra consumers
beyond that will sit idle.