# Webhook Agent Orchestrator
source: https://docs.chalk.ai/docs/compute/webhook-agent-orchestrator

## Build a container that listens for webhooks, spawns coding agents, and tracks state in SQLite.

### What we're building

A single long-running container that:

- Exposes an HTTP endpoint for incoming webhooks (GitHub, Linear, Slack, etc.).
- On each webhook, creates an agent session via the Chalk Controller API.
- Tracks every agent's lifecycle — created, running, completed, failed — in a SQLite database persisted on a Volume.

This pattern is useful when you want a lightweight orchestration layer that turns
external events into autonomous coding sessions without running a full queue system.

### Secrets

Container secrets work like environment variables, but are managed through Chalk so
they never appear in your source code or image layers. Declare the keys you need
upfront — the container won't start if any required secret is missing.

```
from chalkcompute import Container, Image, Secret, Volume

container = Container(
    image=image,
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("GITHUB_WEBHOOK_SECRET"),
        Secret.from_env("CONTROLLER_URL"),
        Secret.from_env("CONTROLLER_API_KEY"),
    ],
    # ...
)
```

Each Secret.from_env("...") references a secret stored in your Chalk project settings.
Secrets are named after the environment variable they inject — use the alias
parameter if you need to alias a secret to a different environment variable name.

### Project layout

```
webhook-orchestrator/
├── orchestrator.py          # FastAPI app: webhook handler + agent tracker
└── deploy.py                # Chalk container definition
```

We'll walk through each file, then deploy the whole thing with a single command.

### The orchestrator

The orchestrator is a small FastAPI app. It does three things: initializes a SQLite
database, accepts webhook POST requests, and calls the Chalk Controller to spawn agent
sessions.

### Database setup

We store agent state in SQLite on a mounted volume. This means the data survives
container restarts — if the orchestrator crashes at 3 AM, it comes back up and knows
exactly which agents are still running.

```
# orchestrator.py
import sqlite3
import os
import httpx
from datetime import datetime, timezone
from contextlib import contextmanager
from fastapi import FastAPI, Request, HTTPException

DB_PATH = os.environ.get("DB_PATH", "/data/agents.db")
CONTROLLER_URL = os.environ["CONTROLLER_URL"]
CONTROLLER_API_KEY = os.environ.get("CONTROLLER_API_KEY", "")

app = FastAPI()

def init_db() -> None:
    with _db() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS agents (
                id TEXT PRIMARY KEY,
                webhook_event TEXT NOT NULL,
                repo TEXT NOT NULL,
                prompt TEXT NOT NULL,
                status TEXT NOT NULL DEFAULT 'pending',
                session_id TEXT,
                task_id TEXT,
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL
            )
        """)

@contextmanager
def _db():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    try:
        yield conn
        conn.commit()
    finally:
        conn.close()
```

Nothing exotic here — a single agents table that tracks the mapping from webhook
event to controller session.

### Webhook handler

When a webhook arrives, we parse it, insert a row, and kick off an agent session
in the background. The endpoint returns 200 immediately so the webhook source
doesn't time out.

```
import uuid
import asyncio

@app.on_event("startup")
def on_startup() -> None:
    init_db()

@app.post("/webhook")
async def handle_webhook(request: Request) -> dict:
    payload = await request.json()

    # Extract the relevant fields — adapt this to your webhook source.
    repo = payload.get("repository", {}).get("full_name", "unknown/unknown")
    event_type = request.headers.get("X-GitHub-Event", "unknown")
    prompt = _build_prompt(event_type, payload)

    agent_id = str(uuid.uuid4())
    now = datetime.now(timezone.utc).isoformat()

    with _db() as conn:
        conn.execute(
            "INSERT INTO agents (id, webhook_event, repo, prompt, status, created_at, updated_at)"
            " VALUES (?, ?, ?, ?, 'pending', ?, ?)",
            (agent_id, event_type, repo, prompt, now, now),
        )

    # Fire-and-forget: spawn the agent without blocking the response.
    asyncio.create_task(_spawn_agent(agent_id, repo, prompt))

    return {"agent_id": agent_id, "status": "pending"}
```

### Prompt construction

Different webhook events need different instructions. A new issue gets an
investigative prompt; a PR review comment gets a targeted fix prompt.

```
def _build_prompt(event_type: str, payload: dict) -> str:
    if event_type == "issues" and payload.get("action") == "opened":
        issue = payload["issue"]
        return (
            f"Investigate GitHub issue #{issue['number']}: {issue['title']}\n\n"
            f"{issue.get('body', 'No description provided.')}\n\n"
            "Diagnose the root cause and open a PR with a fix."
        )

    if event_type == "issue_comment":
        comment = payload["comment"]
        issue = payload["issue"]
        return (
            f"Respond to comment on #{issue['number']}: {issue['title']}\n\n"
            f"Comment by @{comment['user']['login']}:\n{comment['body']}\n\n"
            "Address the feedback and push an updated commit."
        )

    return f"Handle {event_type} event for this repository."
```

### Spawning the agent

This is where the orchestrator calls the Chalk Controller. It creates a session
(which provisions a workspace), then submits a task with the generated prompt.
As each step completes, it updates the SQLite row.

```
async def _spawn_agent(agent_id: str, repo: str, prompt: str) -> None:
    try:
        async with httpx.AsyncClient(
            base_url=CONTROLLER_URL,
            headers={"Authorization": f"Bearer {CONTROLLER_API_KEY}"},
            timeout=120,
        ) as client:
            # Step 1: Create a session (provisions a workspace with the repo).
            resp = await client.post("/api/v1/sessions", json={
                "description": f"Webhook agent for {repo}",
                "repo": repo,
            })
            resp.raise_for_status()
            session = resp.json()
            session_id = session["id"]

            _update_agent(agent_id, status="session_created", session_id=session_id)

            # Step 2: Submit the task.
            resp = await client.post(f"/api/v1/sessions/{session_id}/tasks", json={
                "prompt": prompt,
                "model": "claude-sonnet-4-20250514",
            })
            resp.raise_for_status()
            task = resp.json()

            _update_agent(agent_id, status="running", task_id=task["id"])

    except Exception as e:
        _update_agent(agent_id, status="failed")
        raise

def _update_agent(agent_id: str, status: str, session_id: str | None = None, task_id: str | None = None) -> None:
    now = datetime.now(timezone.utc).isoformat()
    with _db() as conn:
        conn.execute(
            "UPDATE agents SET status = ?, session_id = COALESCE(?, session_id),"
            " task_id = COALESCE(?, task_id), updated_at = ? WHERE id = ?",
            (status, session_id, task_id, now, agent_id),
        )
```

### Status endpoint

A simple GET endpoint lets you check on all agents — useful for dashboards or
debugging.

```
@app.get("/agents")
def list_agents(status: str | None = None) -> list[dict]:
    with _db() as conn:
        if status:
            rows = conn.execute(
                "SELECT * FROM agents WHERE status = ? ORDER BY created_at DESC", (status,)
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT * FROM agents ORDER BY created_at DESC"
            ).fetchall()
    return [dict(row) for row in rows]

@app.get("/health")
def health() -> dict:
    return {"status": "ok"}
```

### The deploy script

Now we wire everything together with a Container that mounts a Volume for the
database, injects secrets, and starts the FastAPI server.

```
# deploy.py
from chalkcompute import Container, Image, Secret, Volume

vol = Volume(name="orchestrator-data")

image = (
    Image.base("python:3.12-slim")
    .pip_install(["fastapi", "uvicorn", "httpx"])
    .add_local_file("orchestrator.py", "/app/orchestrator.py")
)

container = Container(
    image=image,
    name="webhook-orchestrator",
    port=8000,
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("GITHUB_WEBHOOK_SECRET"),
        Secret.from_env("CONTROLLER_URL"),
        Secret.from_env("CONTROLLER_API_KEY"),
    ],
    volumes=[("orchestrator-data", "/data")],
    entrypoint=[
        "uvicorn", "orchestrator:app",
        "--host", "0.0.0.0",
        "--port", "8000",
        "--app-dir", "/app",
    ],
).run()

print(f"Orchestrator URL: {container.info.web_url}")
print(f"Point your GitHub webhook to: {container.info.web_url}/webhook")
```

### Deploy it

```
python deploy.py
# Orchestrator URL: https://d81f42e9-6ab7-4c3d-9e15-2f0a8b73c4d1.compute.chalk.ai
# Point your GitHub webhook to: https://d81f42e9-6ab7-4c3d-9e15-2f0a8b73c4d1.compute.chalk.ai/webhook
```

Copy the container URL with /webhook appended into your GitHub repository's webhook settings
(Settings → Webhooks → Add webhook). Select the events you want to trigger agents
for — typically Issues and Issue comments.

### How it fits together

```
GitHub/Linear/Slack                 Chalk Compute
───────────────────                 ─────────────

  webhook POST ──────────────▸ ┌───────────────────┐
                               │  Orchestrator     │
                               │  (FastAPI)        │
                               │                   │
                               │  SQLite on Volume │──▸ /data/agents.db
                               └────────┬──────────┘
                                        │
                                        │ POST /api/v1/sessions
                                        │ POST /api/v1/sessions/:id/tasks
                                        ▼
                               ┌──────────────────┐
                               │ Chalk Controller │
                               │                  │
                               │  Provisions a    │
                               │  workspace,      │
                               │  runs the agent  │
                               └──────────────────┘
```

Each webhook creates one row in SQLite and one agent session in the controller.
The volume ensures the database survives restarts, and secrets keep credentials
out of your code.





