# RAG Pipeline
source: https://docs.chalk.ai/docs/compute/rag-pipeline

## Build a retrieval-augmented generation function that embeds a query and searches OpenSearch.

### What we're building

A single remote function that implements the retrieval half of RAG:

- Embed the user's question using an embedding model via Chalk's OpenAI-compatible router.
- Run a k-NN vector search against an OpenSearch index to find relevant documents.
- Return the top matches with their text, ready to be stuffed into an LLM prompt.

This is the pattern you'd use inside a resolver or as a building block for a
larger agent — the function handles the "R" in RAG so the caller only deals with
the final generation step.

### Define the container

The function needs the OpenAI SDK (for embeddings) and the OpenSearch client.
We also inject secrets for both services so credentials stay out of the code.

```
# rag.py
import chalkcompute
from chalkcompute import Container, Image, Secret

rag_container = Container(
    image=(
        Image.base("python:3.12-slim")
        .pip_install(["openai", "opensearch-py"])
    ),
    cpu="1",
    memory="2Gi",
    secrets=[
        Secret.from_env("OPENAI_API_KEY"),
        Secret.from_env("OPENSEARCH_HOST"),
        Secret.from_env("OPENSEARCH_USERNAME"),
        Secret.from_env("OPENSEARCH_PASSWORD"),
    ],
)
```

### The retrieve function

The function takes a plain-text query, embeds it, and searches OpenSearch in one
shot. No volume needed here — the document corpus already lives in the index.

```
@chalkcompute.function(name="retrieve", container=rag_container)
def retrieve(query: str, top_k: int = 5) -> list[dict]:
    """Embed a query and return the top-k matching documents from OpenSearch."""
    import os
    import openai
    from opensearchpy import OpenSearch

    # 1. Embed the query through Chalk's router.
    ai = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = ai.embeddings.create(
        model="text-embedding-3-small",
        input=query,
    )
    query_vector = response.data[0].embedding

    # 2. Connect to OpenSearch.
    client = OpenSearch(
        hosts=[os.environ["OPENSEARCH_HOST"]],
        http_auth=(
            os.environ["OPENSEARCH_USERNAME"],
            os.environ["OPENSEARCH_PASSWORD"],
        ),
        use_ssl=True,
        verify_certs=True,
    )

    # 3. Run a k-NN search against the document index.
    results = client.search(
        index="documents",
        body={
            "size": top_k,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_vector,
                        "k": top_k,
                    }
                }
            },
            "_source": ["text", "title", "url"],
        },
    )

    # 4. Return the hits in a simple format.
    return [
        {
            "title": hit["_source"].get("title", ""),
            "text": hit["_source"].get("text", ""),
            "url": hit["_source"].get("url", ""),
            "score": hit["_score"],
        }
        for hit in results["hits"]["hits"]
    ]
```

The function is stateless — every call creates a fresh OpenSearch connection. For
high-throughput use cases, you can cache the client in a module-level variable
since each container instance is long-lived.

### Deploy it

```
python rag.py
```

### Calling the function

From any Python process with chalkcompute installed:

```
import chalkcompute

retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()

docs = retrieve("How do I set up streaming resolvers?", top_k=3)
for doc in docs:
    print(f"[{doc['score']:.3f}] {doc['title']}")
    print(f"  {doc['text'][:120]}...")
    print()
```

```
[0.891] Streaming Resolvers
  Streaming resolvers let you subscribe to a data source and compute features as events arrive, rather than polling on a ...

[0.847] Resolver Overview
  Resolvers are Python functions that compute feature values. They can query databases, call APIs, or run arbitrary logic...

[0.823] Kafka Integration
  Connect Chalk to a Kafka topic to power streaming resolvers. Events are delivered to your resolver function as they arr...
```

### Request lifecycle

```
  Caller                 retrieve()              Chalk Router           OpenSearch
    │                       │                       │                       │
    │  retrieve("How do     │                       │                       │
    │   I set up streaming  │                       │                       │
    │   resolvers?")        │                       │                       │
    │──────────────────────▸│                       │                       │
    │                       │                       │                       │
    │                       │  embeddings.create()  │                       │
    │                       │──────────────────────▸│                       │
    │                       │                       │                       │
    │                       │    [1536-dim vector]  │                       │
    │                       │◂──────────────────────│                       │
    │                       │                       │                       │
    │                       │  k-NN search(vector, k=5)                     │
    │                       │──────────────────────────────────────────────▸│
    │                       │                       │                       │
    │                       │              [{title, text, url, score}, ...] │
    │                       │◂──────────────────────────────────────────────│
    │                       │                       │                       │
    │  [{title, text,       │                       │                       │
    │    url, score}, ...]  │                       │                       │
    │◂──────────────────────│                       │                       │
    │                       │                       │                       │
```

When plugged into generation, the caller adds one more hop — passing the
retrieved documents into an LLM to produce the final answer:

```
  ask()                  retrieve()              Chalk Router           OpenSearch
    │                       │                       │                       │
    │  retrieve(question)   │                       │                       │
    │──────────────────────▸│        (as above)     │                       │
    │  [{docs}]             │                       │                       │
    │◂──────────────────────│                       │                       │
    │                       │                       │                       │
    │  chat.completions.create(                     │                       │
    │    model="gpt-4o",                            │                       │
    │    messages=[system: {docs}, user: question]) │                       │
    │──────────────────────────────────────────────▸│                       │
    │                                               │                       │
    │  "Streaming resolvers let you..."             │                       │
    │◂──────────────────────────────────────────────│                       │
    │                                                                       │
```

### Plugging into generation

The retrieve function returns plain dicts, so wiring it into a generation step
is straightforward — call retrieve, format the results into a prompt, and pass
it to your LLM:

```
import chalkcompute
import openai

retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()

def ask(question: str) -> str:
    # Retrieval
    docs = retrieve(question, top_k=5)
    context = "\n\n---\n\n".join(
        f"## {d['title']}\n{d['text']}" for d in docs
    )

    # Generation
    client = openai.OpenAI(
        base_url="https://router.chalk.ai/v1",
    )
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer the user's question using only the context below.\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": question},
        ],
    )
    return response.choices[0].message.content

print(ask("How do I set up streaming resolvers?"))
```

The retrieval runs on Chalk Compute (close to your OpenSearch cluster), while
generation can run anywhere — your laptop, a notebook, or another remote function.





