What we're building

A single remote function that implements the retrieval half of RAG:

  1. Embed the user’s question using an embedding model via Chalk’s OpenAI-compatible router.
  2. Run a k-NN vector search against an OpenSearch index to find relevant documents.
  3. Return the top matches with their text, ready to be stuffed into an LLM prompt.

This is the pattern you’d use inside a resolver or as a building block for a larger agent — the function handles the “R” in RAG so the caller only deals with the final generation step.


Define the container

The function needs the OpenAI SDK (for embeddings) and the OpenSearch client. We also inject secrets for both services so credentials stay out of the code.

# rag.py
import chalkcompute
from chalkcompute import Container, Image, Secret

rag_container = Container(
    image=(
        Image.base("python:3.12-slim")
        .pip_install(["openai", "opensearch-py"])
    ),
    cpu="1",
    memory="2Gi",
    secrets=[
        Secret(name="OPENAI_API_KEY"),
        Secret(name="OPENSEARCH_HOST"),
        Secret(name="OPENSEARCH_USERNAME"),
        Secret(name="OPENSEARCH_PASSWORD"),
    ],
)

The retrieve function

The function takes a plain-text query, embeds it, and searches OpenSearch in one shot. No volume needed here — the document corpus already lives in the index.

@chalkcompute.function(name="retrieve", container=rag_container)
def retrieve(query: str, top_k: int = 5) -> list[dict]:
    """Embed a query and return the top-k matching documents from OpenSearch."""
    import os
    import openai
    from opensearchpy import OpenSearch

    # 1. Embed the query through Chalk's router.
    ai = openai.OpenAI(
        api_key=os.environ["OPENAI_API_KEY"],
        base_url="https://router.chalk.ai/v1",
    )
    response = ai.embeddings.create(
        model="text-embedding-3-small",
        input=query,
    )
    query_vector = response.data[0].embedding

    # 2. Connect to OpenSearch.
    client = OpenSearch(
        hosts=[os.environ["OPENSEARCH_HOST"]],
        http_auth=(
            os.environ["OPENSEARCH_USERNAME"],
            os.environ["OPENSEARCH_PASSWORD"],
        ),
        use_ssl=True,
        verify_certs=True,
    )

    # 3. Run a k-NN search against the document index.
    results = client.search(
        index="documents",
        body={
            "size": top_k,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_vector,
                        "k": top_k,
                    }
                }
            },
            "_source": ["text", "title", "url"],
        },
    )

    # 4. Return the hits in a simple format.
    return [
        {
            "title": hit["_source"].get("title", ""),
            "text": hit["_source"].get("text", ""),
            "url": hit["_source"].get("url", ""),
            "score": hit["_score"],
        }
        for hit in results["hits"]["hits"]
    ]

The function is stateless — every call creates a fresh OpenSearch connection. For high-throughput use cases, you can cache the client in a module-level variable since each container instance is long-lived.


Deploy it

chalk compute deploy rag.py
# ✓ Container created successfully
# Container ID: b2c71e45-3da8-4f19-9b56-1e8d4fa20c73
# Name: rag-retriever
# Status: Running
# Pod Name: chalk-container-rag-retriever
# URL: https://b2c71e45-3da8-4f19-9b56-1e8d4fa20c73.compute.chalk.ai

Calling the function

From any Python process with chalkcompute installed:

import chalkcompute

retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()

docs = retrieve("How do I set up streaming resolvers?", top_k=3)
for doc in docs:
    print(f"[{doc['score']:.3f}] {doc['title']}")
    print(f"  {doc['text'][:120]}...")
    print()
[0.891] Streaming Resolvers
  Streaming resolvers let you subscribe to a data source and compute features as events arrive, rather than polling on a ...

[0.847] Resolver Overview
  Resolvers are Python functions that compute feature values. They can query databases, call APIs, or run arbitrary logic...

[0.823] Kafka Integration
  Connect Chalk to a Kafka topic to power streaming resolvers. Events are delivered to your resolver function as they arr...

Request lifecycle

  Caller                 retrieve()              Chalk Router           OpenSearch
    │                       │                       │                       │
    │  retrieve("How do     │                       │                       │
    │   I set up streaming  │                       │                       │
    │   resolvers?")        │                       │                       │
    │──────────────────────▸│                       │                       │
    │                       │                       │                       │
    │                       │  embeddings.create()  │                       │
    │                       │──────────────────────▸│                       │
    │                       │                       │                       │
    │                       │    [1536-dim vector]  │                       │
    │                       │◂──────────────────────│                       │
    │                       │                       │                       │
    │                       │  k-NN search(vector, k=5)                     │
    │                       │──────────────────────────────────────────────▸│
    │                       │                       │                       │
    │                       │              [{title, text, url, score}, ...] │
    │                       │◂──────────────────────────────────────────────│
    │                       │                       │                       │
    │  [{title, text,       │                       │                       │
    │    url, score}, ...]  │                       │                       │
    │◂──────────────────────│                       │                       │
    │                       │                       │                       │

When plugged into generation, the caller adds one more hop — passing the retrieved documents into an LLM to produce the final answer:

  ask()                  retrieve()              Chalk Router           OpenSearch
    │                       │                       │                       │
    │  retrieve(question)   │                       │                       │
    │──────────────────────▸│        (as above)     │                       │
    │  [{docs}]             │                       │                       │
    │◂──────────────────────│                       │                       │
    │                       │                       │                       │
    │  chat.completions.create(                     │                       │
    │    model="gpt-4o",                            │                       │
    │    messages=[system: {docs}, user: question]) │                       │
    │──────────────────────────────────────────────▸│                       │
    │                                               │                       │
    │  "Streaming resolvers let you..."             │                       │
    │◂──────────────────────────────────────────────│                       │
    │                                                                       │

Plugging into generation

The retrieve function returns plain dicts, so wiring it into a generation step is straightforward — call retrieve, format the results into a prompt, and pass it to your LLM:

import chalkcompute
import openai

retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()

def ask(question: str) -> str:
    # Retrieval
    docs = retrieve(question, top_k=5)
    context = "\n\n---\n\n".join(
        f"## {d['title']}\n{d['text']}" for d in docs
    )

    # Generation
    client = openai.OpenAI(
        base_url="https://router.chalk.ai/v1",
    )
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer the user's question using only the context below.\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": question},
        ],
    )
    return response.choices[0].message.content

print(ask("How do I set up streaming resolvers?"))

The retrieval runs on Chalk Compute (close to your OpenSearch cluster), while generation can run anywhere — your laptop, a notebook, or another remote function.