Compute
Build a retrieval-augmented generation function that embeds a query and searches OpenSearch.
A single remote function that implements the retrieval half of RAG:
This is the pattern you’d use inside a resolver or as a building block for a larger agent — the function handles the “R” in RAG so the caller only deals with the final generation step.
The function needs the OpenAI SDK (for embeddings) and the OpenSearch client. We also inject secrets for both services so credentials stay out of the code.
# rag.py
import chalkcompute
from chalkcompute import Container, Image, Secret
rag_container = Container(
image=(
Image.base("python:3.12-slim")
.pip_install(["openai", "opensearch-py"])
),
cpu="1",
memory="2Gi",
secrets=[
Secret(name="OPENAI_API_KEY"),
Secret(name="OPENSEARCH_HOST"),
Secret(name="OPENSEARCH_USERNAME"),
Secret(name="OPENSEARCH_PASSWORD"),
],
)The function takes a plain-text query, embeds it, and searches OpenSearch in one shot. No volume needed here — the document corpus already lives in the index.
@chalkcompute.function(name="retrieve", container=rag_container)
def retrieve(query: str, top_k: int = 5) -> list[dict]:
"""Embed a query and return the top-k matching documents from OpenSearch."""
import os
import openai
from opensearchpy import OpenSearch
# 1. Embed the query through Chalk's router.
ai = openai.OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
base_url="https://router.chalk.ai/v1",
)
response = ai.embeddings.create(
model="text-embedding-3-small",
input=query,
)
query_vector = response.data[0].embedding
# 2. Connect to OpenSearch.
client = OpenSearch(
hosts=[os.environ["OPENSEARCH_HOST"]],
http_auth=(
os.environ["OPENSEARCH_USERNAME"],
os.environ["OPENSEARCH_PASSWORD"],
),
use_ssl=True,
verify_certs=True,
)
# 3. Run a k-NN search against the document index.
results = client.search(
index="documents",
body={
"size": top_k,
"query": {
"knn": {
"embedding": {
"vector": query_vector,
"k": top_k,
}
}
},
"_source": ["text", "title", "url"],
},
)
# 4. Return the hits in a simple format.
return [
{
"title": hit["_source"].get("title", ""),
"text": hit["_source"].get("text", ""),
"url": hit["_source"].get("url", ""),
"score": hit["_score"],
}
for hit in results["hits"]["hits"]
]The function is stateless — every call creates a fresh OpenSearch connection. For high-throughput use cases, you can cache the client in a module-level variable since each container instance is long-lived.
chalk compute deploy rag.py
# ✓ Container created successfully
# Container ID: b2c71e45-3da8-4f19-9b56-1e8d4fa20c73
# Name: rag-retriever
# Status: Running
# Pod Name: chalk-container-rag-retriever
# URL: https://b2c71e45-3da8-4f19-9b56-1e8d4fa20c73.compute.chalk.aiFrom any Python process with chalkcompute installed:
import chalkcompute
retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()
docs = retrieve("How do I set up streaming resolvers?", top_k=3)
for doc in docs:
print(f"[{doc['score']:.3f}] {doc['title']}")
print(f" {doc['text'][:120]}...")
print()[0.891] Streaming Resolvers
Streaming resolvers let you subscribe to a data source and compute features as events arrive, rather than polling on a ...
[0.847] Resolver Overview
Resolvers are Python functions that compute feature values. They can query databases, call APIs, or run arbitrary logic...
[0.823] Kafka Integration
Connect Chalk to a Kafka topic to power streaming resolvers. Events are delivered to your resolver function as they arr...
Caller retrieve() Chalk Router OpenSearch
│ │ │ │
│ retrieve("How do │ │ │
│ I set up streaming │ │ │
│ resolvers?") │ │ │
│──────────────────────▸│ │ │
│ │ │ │
│ │ embeddings.create() │ │
│ │──────────────────────▸│ │
│ │ │ │
│ │ [1536-dim vector] │ │
│ │◂──────────────────────│ │
│ │ │ │
│ │ k-NN search(vector, k=5) │
│ │──────────────────────────────────────────────▸│
│ │ │ │
│ │ [{title, text, url, score}, ...] │
│ │◂──────────────────────────────────────────────│
│ │ │ │
│ [{title, text, │ │ │
│ url, score}, ...] │ │ │
│◂──────────────────────│ │ │
│ │ │ │
When plugged into generation, the caller adds one more hop — passing the retrieved documents into an LLM to produce the final answer:
ask() retrieve() Chalk Router OpenSearch
│ │ │ │
│ retrieve(question) │ │ │
│──────────────────────▸│ (as above) │ │
│ [{docs}] │ │ │
│◂──────────────────────│ │ │
│ │ │ │
│ chat.completions.create( │ │
│ model="gpt-4o", │ │
│ messages=[system: {docs}, user: question]) │ │
│──────────────────────────────────────────────▸│ │
│ │ │
│ "Streaming resolvers let you..." │ │
│◂──────────────────────────────────────────────│ │
│ │
The retrieve function returns plain dicts, so wiring it into a generation step
is straightforward — call retrieve, format the results into a prompt, and pass
it to your LLM:
import chalkcompute
import openai
retrieve = chalkcompute.function_ref("retrieve")
retrieve.wait_ready()
def ask(question: str) -> str:
# Retrieval
docs = retrieve(question, top_k=5)
context = "\n\n---\n\n".join(
f"## {d['title']}\n{d['text']}" for d in docs
)
# Generation
client = openai.OpenAI(
base_url="https://router.chalk.ai/v1",
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Answer the user's question using only the context below.\n\n"
f"{context}"
),
},
{"role": "user", "content": question},
],
)
return response.choices[0].message.content
print(ask("How do I set up streaming resolvers?"))The retrieval runs on Chalk Compute (close to your OpenSearch cluster), while generation can run anywhere — your laptop, a notebook, or another remote function.