# End-to-End Model Tutorial
source: https://docs.chalk.ai/docs/model-tutorial

## Build a complete ML pipeline from features to inference

This tutorial walks through a complete machine learning workflow in Chalk: defining features, connecting to Snowflake data sources, creating point-in-time correct training datasets, training a model, registering it, and deploying it for inference.

We'll build an event engagement prediction model that predicts whether a user is likely to engage with a notification.

### Step 1: Define Your Features

First, define feature classes that represent the entities in your domain. Feature classes use the @features decorator and define typed attributes for each feature.

```
from datetime import datetime

from chalk import windowed, Windowed
from chalk.features import features, DataFrame, _
import chalk.functions as F


@features
class User:
    id: int

    # Account features
    email: str
    signup_date: datetime
    account_type: str  # one of: "free", "basic", "premium"

    # Windowed activity features - automatically creates versions for each window
    total_logins: Windowed[int] = windowed(
        "1d",
        "7d",
        "30d",
        expression=_.events[
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].count(),
        default=0
    )
    first_login_unix_seconds: Windowed[int | None] = windowed(
        "1d",
        "7d",
        "30d",
        "all",
        expression=_.events[
            _.ts_unix_seconds,
            _.type == "login",
            _.ts > _.chalk_window,
            _.ts <= _.chalk_now
        ].min(),
        default=None
    )

    # Non-windowed activity feature
    days_since_last_login: int | None = (
        (F.unix_seconds(_.chalk_now) - _.first_login_unix_seconds["all"]) / 86400
    )

    # Windowed billing features
    monthly_spend: float

    # Computed features
    account_age_days: int
    engagement_score: float

    # Has-many relationship to events
    events: "DataFrame[Event]"

    # Timestamp for feature time
    ts: datetime


@features
class Event:
    id: int
    user_id: User.id
    user: User
    type: str
    ts: datetime
    ts_unix_seconds: int = F.unix_seconds(_.ts)
    event_clicked: "EventClicked | None"
    event_was_clicked: bool = F.is_not_null(_.event_clicked.ts)


@features
class EventClicked:
    id: Event.id
    ts: datetime
```

The windowed() function creates multiple versions of a feature, one for each time window. You can reference specific windows using bracket notation:

- User.total_logins["1d"] - logins in the last day
- User.total_logins["7d"] - logins in the last 7 days
- User.total_logins["30d"] - logins in the last 30 days

### Step 2: Configure Snowflake Data Sources

Connect Chalk to your Snowflake data warehouse. First, configure the integration in the Chalk dashboard, then reference it in your code.

### Dashboard Configuration

In the Chalk dashboard, navigate to Integrations > Data Sources and add a Snowflake integration with your credentials (account, warehouse, database, schema, role).

### Define Sources in Code

```
from chalk.sql import SnowflakeSource

# Reference the integration configured in the dashboard
snowflake = SnowflakeSource(name="sf")
```

### Create SQL File Resolvers

Create SQL files that map your Snowflake tables to Chalk features.

```
-- type: offline
-- resolves: User
-- source: sf

SELECT
    id,
    email,
    signup_date,
    account_type,
    monthly_spend
FROM users
```

```
-- type: offline
-- resolves: Event
-- source: sf

SELECT
    id,
    user_id,
    ts,
    type
FROM events
```

```
-- type: offline
-- resolves: EventClicked
-- source: sf

SELECT
    id,
    ts
FROM events_clicked
```

### Create Python Resolvers

Create python resolvers to computed some simple derived features:

```
from datetime import datetime

from chalk import online, Now
from src.features import User


@online
def compute_account_age(
    signup_date: User.signup_date,
    now: Now
) -> User.account_age_days:
    return (now - signup_date).days


@online
def compute_engagement_score(
    logins: User.total_logins["30d"],      # Reference 30-day window
    days_inactive: User.days_since_last_login,
) -> User.engagement_score:
    # Simple engagement formula using 30-day windowed features
    activity_score = min(logins / 30, 1.0) * 0.6
    recency_score = max(0, 1 - (days_inactive / 30)) * 0.4

    score = (activity_score + recency_score) * 100
    return score
```

### Step 3: Create a Point-in-Time Correct Training Dataset

Use offline_query to create a training dataset with point-in-time correctness. This ensures that feature values reflect what was known at each historical point, preventing data leakage.

```
from datetime import datetime, timedelta
from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

# Create the dataset with point-in-time correctness
# Use bracket notation to select specific windows for windowed features
dataset = client.offline_query(
    input_sql="""
    SELECT 
        id as "event.id", 
        ts  as "__ts__"
    FROM "sf.prod.events"
    WHERE type='recommendation';
    """,
    output=[
        Event.id,
        Event.user.id,
        Event.user.account_type,
        Event.user.total_logins["1d"],      # 1-day window
        Event.user.total_logins["7d"],      # 7-day window
        Event.user.total_logins["30d"],     # 30-day window
        Event.user.days_since_last_login,
        Event.user.monthly_spend,
        Event.user.account_age_days,
        Event.user.engagement_score,
        Event.event_was_clicked
    ],
    dataset_name="event_recommendation_dataset",
    run_asynchronously=True,  # For large datasets
)

# Retrieve the data as a pandas DataFrame
df = dataset.get_data_as_pandas()
print(f"Dataset shape: {df.shape}")
print(df.head())
```

### Step 4: Train Your Model

With your point-in-time correct dataset, train a model and save it as a pickle. The pickle will be uploaded to a volume and loaded by the handler at container startup.

```
import joblib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split

# Column names from the offline_query dataset in Step 3
feature_columns = [
    'event.user.total_logins__30d__',
    'event.user.days_since_last_login',
    'event.user.monthly_spend',
    'event.user.account_age_days',
    'event.user.engagement_score',
]

# Use the dataset from Step 3
X = df[feature_columns]
y = df["event.event_was_clicked"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y,
)

model = GradientBoostingClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
)
model.fit(X_train, y_train)

y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)

metrics = {
    "auc_roc": float(roc_auc_score(y_test, y_pred_proba)),
    "precision": float(precision_score(y_test, y_pred)),
    "recall": float(recall_score(y_test, y_pred)),
}
print(f"Model metrics: {metrics}")

joblib.dump(model, "model.pkl")
```

Next, create a handler that loads the pickle on startup and runs inference. The handler receives input features as PyArrow arrays and returns a PyArrow array of predictions.

```
import joblib
import numpy as np
import pyarrow as pa

input_features = [
    "login_count_30d",
    "days_since_last_login",
    "monthly_spend",
    "account_age_days",
    "engagement_score",
]

model = None


def on_startup():
    global model
    model = joblib.load("/app/artifacts/model.pkl")


def handler(event: dict[str, pa.Array], context: dict) -> pa.Array:
    X = np.column_stack([event[name].to_numpy() for name in input_features])
    probabilities = model.predict_proba(X)[:, 1]
    return pa.array(probabilities, type=pa.float64())
```

The handler's on_startup function runs once when the container starts, loading the pickle so
the model is ready before any inference requests arrive.

### Step 5: Register the Model

Build a container image with the handler, register the model version, and upload the trained pickle to a volume.

```
import os

import pyarrow as pa
from chalkcompute import Image

from chalk.client import ChalkClient
from chalk.client.model_image import chalk_handler_volume_name, upload_model_to_volume

input_features = [
    "login_count_30d",
    "days_since_last_login",
    "monthly_spend",
    "account_age_days",
    "engagement_score",
]

DIR = os.path.dirname(__file__)

client = ChalkClient()

image = (
    Image.debian_slim("3.11")
    .pip_install([
        "chalk-remote-call-python",
        "pyarrow",
        "scikit-learn>=1.4",
        "numpy>=1.26",
        "joblib",
    ])
    .add_local_file(os.path.join(DIR, "src/handler.py"), "/app/handler.py", strategy="copy")
    .env({"PYTHONPATH": "/app"})
    .workdir("/app")
    .entrypoint([
        "chalk-remote-call",
        "--handler",
        "handler.handler",
        "--on-startup",
        "handler.on_startup",
        "--port",
        "8080",
    ])
)

result = client.register_model_version(
    name="interaction-model",
    input_schema={name: pa.float64() for name in input_features},
    output_schema={"interaction_probability": pa.float64()},
    model_image=image,
)

print(f"Registered model: {result.model_name} v{result.model_version}")

volume_name = chalk_handler_volume_name("interaction-model", result.model_version)
upload_model_to_volume(
    volume_name=volume_name,
    model_filename="model.pkl",
    model_file_path=os.path.join(DIR, "model.pkl"),
    chalk_client=client,
)
print(f"Uploaded model pickle to volume: {volume_name}")
```

### Step 6: Deploy the Model for Inference

Deploy the registered model version to a scaling group so it can serve real-time predictions.

```
from chalk.client import ChalkClient
from chalk.scalinggroup import AutoScalingSpec, ScalingGroupResourceRequest

client = ChalkClient()

# Use the version returned from registration, or look it up
model_version = 1

client.deploy_model_version_to_scaling_group(
    name="interaction-model-sg",
    model_name="interaction-model",
    model_version=model_version,
    handler="handler.handler",
    scaling=AutoScalingSpec(
        min_replicas=1,
        max_replicas=2,
        target_cpu_utilization_percentage=70,
    ),
    resources=ScalingGroupResourceRequest(cpu="1000m", memory="1Gi"),
)

print(f"Deployed interaction-model v{model_version} to scaling group interaction-model-sg")
```

### Step 7: Wire Up Predictions with catalog_call

Add an interaction_probability feature to your Event class that calls your deployed model using F.catalog_call. The first argument is model.<scaling-group-name>, and the remaining arguments are the input features in the same order as your input_schema.

```
from chalk.features import features, _
import chalk.functions as F

@features
class Event:
    # ... existing features ...

    interaction_probability: float = F.catalog_call(
        "model.interaction-model-sg",
        _.user.total_logins["30d"],
        _.user.days_since_last_login,
        _.user.monthly_spend,
        _.user.account_age_days,
        _.user.engagement_score,
    )
```

Deploy to make the feature available:

```
chalk apply
```

Now you can query predictions through the Chalk API:

 To get good performance, you'll want to add an online source for your features (like a postgres
or a stream). 

```
from chalk.client import ChalkClient
from src.features import Event

client = ChalkClient()

result = client.query(
    input={Event.id: 12345},
    output=[Event.interaction_probability],
)

print(f"Interaction probability: {result.get_feature_value(Event.interaction_probability)}")
```

### Next Steps

- Monitor your model: View model performance metrics and feature distributions in the Chalk dashboard
- A/B test models: Use feature versions to run multiple model versions simultaneously
- Retrain on schedule: Set up periodic retraining using scheduled queries to refresh your training data
- Add more features: Expand your feature set with windowed aggregations and has-many relationships

For more details on specific topics:

- Model Registry - Managing model versions and aliases
- Datasets - Working with offline query results
- Temporal Consistency - Understanding point-in-time correctness
- Snowflake Integration - Configuring Snowflake data sources




