1. End-to-End Model Tutorial

This tutorial walks through a complete machine learning workflow in Chalk: defining features, connecting to Snowflake data sources, creating point-in-time correct training datasets, training a model, registering it, and deploying it for inference.

We’ll build an event engagement prediction model that predicts whether a user is likely to engage with a notification.


Step 1: Define Your Features

First, define feature classes that represent the entities in your domain. Feature classes use the @features decorator and define typed attributes for each feature.

src/features.py
from datetime import datetime

from chalk import windowed, Windowed
from chalk.features import features, DataFrame, _
import chalk.functions as F


@features
class User:
    id: int

    # Account features
    email: str
    signup_date: datetime
    account_type: str  # one of: "free", "basic", "premium"

    # Windowed activity features - automatically creates versions for each window
    total_logins: Windowed[int] = windowed(
        "1d",
        "7d",
        "30d",
        expression=_.events[
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].count(),
        default=0
    )
    first_login_unix_seconds: Windowed[int | None] = windowed(
        "1d",
        "7d",
        "30d",
        "all",
        expression=_.events[
            _.ts_unix_seconds,
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].min(),
        default=None
    )

    # Non-windowed activity feature
    days_since_last_login: int | None = (
        (F.unix_seconds(_.chalk_now) - _.first_login_unix_seconds["all"]) / 86400
    )

    # Windowed billing features
    monthly_spend: float

    # Computed features
    account_age_days: int
    engagement_score: float

    # Has-many relationship to events
    events: "DataFrame[Event]"

    # Timestamp for feature time
    ts: datetime


@features
class Event:
    id: int
    user_id: User.id
    user: User
    type: str
    ts: datetime
    ts_unix_seconds: int = F.unix_seconds(_.ts)
    event_clicked: "EventClicked | None"
    event_was_clicked: bool = F.is_not_null(_.event_clicked.ts)


@features
class EventClicked:
    id: Event.id
    ts: datetime

The windowed() function creates multiple versions of a feature, one for each time window. You can reference specific windows using bracket notation:

  • User.total_logins["1d"] - logins in the last day
  • User.total_logins["7d"] - logins in the last 7 days
  • User.total_logins["30d"] - logins in the last 30 days

Step 2: Configure Snowflake Data Sources

Connect Chalk to your Snowflake data warehouse. First, configure the integration in the Chalk dashboard, then reference it in your code.

Dashboard Configuration

In the Chalk dashboard, navigate to Integrations > Data Sources and add a Snowflake integration with your credentials (account, warehouse, database, schema, role).

Define Sources in Code

src/datasources.py
from chalk.sql import SnowflakeSource

# Reference the integration configured in the dashboard
snowflake = SnowflakeSource(name="sf")

Create SQL File Resolvers

Create SQL files that map your Snowflake tables to Chalk features.

-- type: offline
-- resolves: User
-- source: sf

SELECT
    id,
    email,
    signup_date,
    account_type,
    monthly_spend
FROM users
-- type: offline
-- resolves: Event
-- source: sf

SELECT
    id,
    user_id,
    ts,
    type
FROM events
-- type: offline
-- resolves: EventClicked
-- source: sf

SELECT
    id,
    ts
FROM events_clicked

Create Python Resolvers

Create python resolvers to computed some simple derived features:

src/resolvers/computed.py
from datetime import datetime

from chalk import online, Now
from src.features import User


@online
def compute_account_age(
    signup_date: User.signup_date,
    now: Now
) -> User.account_age_days:
    return (now - signup_date).days


@online
def compute_engagement_score(
    logins: User.total_logins["30d"],      # Reference 30-day window
    days_inactive: User.days_since_last_login,
) -> User.engagement_score:
    # Simple engagement formula using 30-day windowed features
    activity_score = min(logins / 30, 1.0) * 0.6
    recency_score = max(0, 1 - (days_inactive / 30)) * 0.4

    score = (activity_score + recency_score) * 100
    return score

Step 3: Create a Point-in-Time Correct Training Dataset

Use offline_query to create a training dataset with point-in-time correctness. This ensures that feature values reflect what was known at each historical point, preventing data leakage.

from datetime import datetime, timedelta
from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

# Create the dataset with point-in-time correctness
# Use bracket notation to select specific windows for windowed features
dataset = client.offline_query(
    input_sql="""
    SELECT 
        id as "event.id", 
        ts  as "__ts__"
    FROM "sf.prod.events"
    WHERE type='recommendation';
    """,
    output=[
        Event.id,
        Event.user.id,
        Event.user.account_type,
        Event.user.total_logins["1d"],      # 1-day window
        Event.user.total_logins["7d"],      # 7-day window
        Event.user.total_logins["30d"],     # 30-day window
        Event.user.days_since_last_login,
        Event.user.monthly_spend,
        Event.user.account_age_days,
        Event.user.engagement_score,
        Event.event_was_clicked
    ],
    dataset_name="event_recommendation_dataset",
    run_asynchronously=True,  # For large datasets
)

# Retrieve the data as a pandas DataFrame
df = dataset.get_data_as_pandas()
print(f"Dataset shape: {df.shape}")
print(df.head())

Step 4: Train Your Model

With your point-in-time correct dataset, train a model and save it as a pickle. The pickle will be uploaded to a volume and loaded by the handler at container startup.

train_model.py
import joblib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split

# Column names from the offline_query dataset in Step 3
feature_columns = [
    'event.user.total_logins__30d__',
    'event.user.days_since_last_login',
    'event.user.monthly_spend',
    'event.user.account_age_days',
    'event.user.engagement_score',
]

# Use the dataset from Step 3
X = df[feature_columns]
y = df["event.event_was_clicked"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y,
)

model = GradientBoostingClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
)
model.fit(X_train, y_train)

y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)

metrics = {
    "auc_roc": float(roc_auc_score(y_test, y_pred_proba)),
    "precision": float(precision_score(y_test, y_pred)),
    "recall": float(recall_score(y_test, y_pred)),
}
print(f"Model metrics: {metrics}")

joblib.dump(model, "model.pkl")

Next, create a handler that loads the pickle on startup and runs inference. The handler receives input features as PyArrow arrays and returns a PyArrow array of predictions.

src/handler.py
import joblib
import numpy as np
import pyarrow as pa

input_features = [
    "login_count_30d",
    "days_since_last_login",
    "monthly_spend",
    "account_age_days",
    "engagement_score",
]

model = None


def on_startup():
    global model
    model = joblib.load("/app/artifacts/model.pkl")


def handler(event: dict[str, pa.Array], context: dict) -> pa.Array:
    X = np.column_stack([event[name].to_numpy() for name in input_features])
    probabilities = model.predict_proba(X)[:, 1]
    return pa.array(probabilities, type=pa.float64())

The handler's `on_startup` function runs once when the container starts, loading the pickle so the model is ready before any inference requests arrive.


Step 5: Register the Model

Build a container image with the handler, register the model version, and upload the trained pickle to a volume.

register_model.py
import os

import pyarrow as pa
from chalkcompute import Image

from chalk.client import ChalkClient
from chalk.client.model_image import chalk_handler_volume_name, upload_model_to_volume

input_features = [
    "login_count_30d",
    "days_since_last_login",
    "monthly_spend",
    "account_age_days",
    "engagement_score",
]

DIR = os.path.dirname(__file__)

client = ChalkClient()

image = (
    Image.debian_slim("3.11")
    .pip_install([
        "chalk-remote-call-python",
        "pyarrow",
        "scikit-learn>=1.4",
        "numpy>=1.26",
        "joblib",
    ])
    .add_local_file(os.path.join(DIR, "src/handler.py"), "/app/handler.py", strategy="copy")
    .env({"PYTHONPATH": "/app"})
    .workdir("/app")
    .entrypoint([
        "chalk-remote-call",
        "--handler",
        "handler.handler",
        "--on-startup",
        "handler.on_startup",
        "--port",
        "8080",
    ])
)

result = client.register_model_version(
    name="interaction-model",
    input_schema={name: pa.float64() for name in input_features},
    output_schema={"interaction_probability": pa.float64()},
    model_image=image,
)

print(f"Registered model: {result.model_name} v{result.model_version}")

volume_name = chalk_handler_volume_name("interaction-model", result.model_version)
upload_model_to_volume(
    volume_name=volume_name,
    model_filename="model.pkl",
    model_file_path=os.path.join(DIR, "model.pkl"),
    chalk_client=client,
)
print(f"Uploaded model pickle to volume: {volume_name}")

Step 6: Deploy the Model for Inference

Deploy the registered model version to a scaling group so it can serve real-time predictions.

deploy_model.py
from chalk.client import ChalkClient
from chalk.scalinggroup import AutoScalingSpec, ScalingGroupResourceRequest

client = ChalkClient()

# Use the version returned from registration, or look it up
model_version = 1

client.deploy_model_version_to_scaling_group(
    name="interaction-model-sg",
    model_name="interaction-model",
    model_version=model_version,
    handler="handler.handler",
    scaling=AutoScalingSpec(
        min_replicas=1,
        max_replicas=2,
        target_cpu_utilization_percentage=70,
    ),
    resources=ScalingGroupResourceRequest(cpu="1000m", memory="1Gi"),
)

print(f"Deployed interaction-model v{model_version} to scaling group interaction-model-sg")

Step 7: Wire Up Predictions with catalog_call

Add an interaction_probability feature to your Event class that calls your deployed model using F.catalog_call. The first argument is model.<scaling-group-name>, and the remaining arguments are the input features in the same order as your input_schema.

src/features.py
from chalk.features import features, _
import chalk.functions as F

@features
class Event:
    # ... existing features ...

    interaction_probability: float = F.catalog_call(
        "model.interaction-model-sg",
        _.user.total_logins["30d"],
        _.user.days_since_last_login,
        _.user.monthly_spend,
        _.user.account_age_days,
        _.user.engagement_score,
    )

Deploy to make the feature available:

chalk apply

Now you can query predictions through the Chalk API:

To get good performance, you'll want to add an online source for your features (like a postgres or a stream).

from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

result = client.query(
    input={Event.id: 12345},
    output=[Event.interaction_probability],
)

print(f"Interaction probability: {result.get_feature_value(Event.interaction_probability)}")

Next Steps

For more details on specific topics: