# Tutorial: AWS SageMaker with Chalk
source: https://docs.chalk.ai/docs/sagemaker-tutorial

## Build a Chalk feature pipeline for training and serving models with AWS SageMaker.

### Introduction

Chalk enables you to build feature pipelines for machine learning models. AWS
SageMaker is a popular service for building, training, and deploying models. Chalk
and SageMaker fit together naturally to create a streamlined machine learning architecture that uses the best aspects of
each system.

### Why use Chalk?

There are several advantages to using Chalk in your ML architecture alongside SageMaker.

- Use the same Python codebase for notebook iteration, training, and serving. Chalk enables you to maintain a
single Python codebase that integrates with many data sources, including Postgres, Databricks, Snowflake, Kafka, and
others. As you tweak your feature definitions, you can deploy your updates to separate
branches to try your new features without needing to wait for a long build step.
- Generate features when they're needed. Chalk computes features in response to queries. Other systems such as
SageMaker Feature Store require you to precompute features from batch data. We believe the precomputation style is
limiting because this style prevents users from relying on real-time data. For example, precomputation prevents users
from writing features reliant on recency (such as the number of failed logins in the past minute) or third-party APIs
(which may be costly to exhaustively pre-fetch).

### What you'll learn

In this tutorial, we'll take a deep dive into how you can use Chalk to serve features combined with SageMaker for model
training and serving. You will:

- Set up your feature pipeline with Chalk:Connect your data sources (in this tutorial, DynamoDB)Write feature definitions in PythonWrite resolvers defining how to generate those features from your data source
- Set up your SageMaker pipeline:Run Chalk offline queries and prepare datasets for training and evaluationTrain your modelEvaluate your model
- Deploy your model with SageMaker

We'll go through each of these steps using fraud detection as our example use case. The full code for this tutorial can
be found in GitHub.

### Set up Chalk

In this section, we'll set up your Chalk feature pipeline.

### Connect your data source

Chalk integrates with AWS, GCP, various SQL databases, Databricks, and more.
For this tutorial, we'll pull transaction data from AWS DynamoDB, but you can add as many data sources as you need.

To connect DynamoDB, enter your AWS access key into the Chalk dashboard. After adding AWS, define your DynamoDB data
source in Python and give it a name:

```
from chalk.sql import DynamoDBSource

DynamoDBSource(name="our_dynamo")
```

### Define features

Features are structured data you can use as inputs to your model. With Chalk, you define your features
in one place as Pydantic-style Python classes:

```
from datetime import date

from chalk import DataFrame, FeatureTime
from chalk.features import features, feature


@features
class Transaction:
    id: int
    amount: float
    confirmed_fraud: bool
    # Use strings to reference Customer because Customer is defined beneath this class
    customer_id: "Customer.id"
    customer: "Customer"
    transacted_at: FeatureTime


@features
class Customer:
    id: int
    name: str
    email: str
    dob: date
    income: int
    # :tags: pii
    ssn: str
    # Set max_staleness on fico because fico relies on a third-party API call.
    # We will read this value from a cache (as long as it was computed
    # sometime in the last day).
    fico: int = feature(max_staleness="1d")
    # The transactions, linked by the Customer.id type on the Transaction.customer_id field
    transactions: DataFrame[Transaction]
```

In this code, Transaction is the feature class and each of its properties is a feature.

### Define resolvers

Resolvers are Chalk's way of defining how to retrieve data and convert them into features.

Most of our features will be loaded from DynamoDB using SQL file resolvers. Here's how we retrieve basic
Transaction and Customer data:

```
-- type: online
-- resolves: transaction
-- source: our_dynamo
SELECT
  id,
  amount,
  confirmed_fraud,
  customer_id,
  created_at AS transacted_at
FROM
  txns;
```

```
-- type: online
-- resolves: customer
-- source: our_dynamo
SELECT
  id,
  name,
  email,
  dob,
  income,
  ssn
FROM
  customers;
```

Each column in the result must match the name of a feature. Use AS to rename columns as needed.

For features requiring computation, we use Python resolvers. Here's a resolver
for retrieving a customer's FICO score from an API:

```
from chalk import online

@online
def get_fico(name: Customer.name, ssn: Customer.ssn) -> Customer.fico:
    # Replace with your preferred provider.
    fico_score = FICOApi().get_score(name, ssn)
    return Customer(fico=fico_score)
```

This resolver depends on Customer.name and Customer.ssn and returns Customer.fico. When you deploy, Chalk uses
your resolver inputs and outputs to create a dependency graph to confirm your features can be generated safely.

The get_fico resolver will be invoked in response to Chalk queries. Resolvers do not run
exhaustively over your customer dataset, which helps you reduce costs related to unnecessary computation and API usage.
Our resolver documentation goes into greater detail about how you can schedule
resolvers and set up batch offline jobs. We also set max_staleness to 1 day on the fico
feature, so this feature will be read from the online store's cache if the value has already been computed in the past
day.

With a combination of Python and SQL resolvers, we have now told Chalk how to retrieve each of our features.

### Deploy Chalk

At this point, you can deploy Chalk to a local branch and start writing queries.

To deploy to a non-production branch, pass --branch to chalk apply:

```
$ chalk apply --branch sagemaker_tutorial
✓ Found resolvers
✓ Deployed branch
```

You can run ad hoc queries on this branch from the CLI:

```
$ chalk query --branch sagemaker_tutorial  \
              --in     transaction.id=1 \
              --out    transaction.confirmed_fraud
```

Once you're satisfied with your feature pipeline, you can move on to setting up SageMaker.

### Set up SageMaker

There are many ways to set up SageMaker. In this tutorial, we'll use SageMaker
steps to compose our model training
pipeline. Steps allow us to modularize the code and we find them easier to work with than some alternatives, such as
running the pipeline directly from one super long Jupyter notebook.

In this section, we'll write steps for creating the dataset, training a candidate model, and evaluating the model.

### Create the dataset

In this step, we'll use Chalk to generate a random sample of features from our dataset, split the dataset for training
and testing using
scikit-learn, write
the results to S3, and return the S3 paths to be used in the next step.

Writing the split datasets to S3 provides a clear history of the data you used to train and evaluate your
model. Additionally, it allows you re-run later steps in your pipeline without re-executing the dataset generation
step.

In the previous section, we set up our features and resolvers without going into great detail about how the code would
be executed. Here, we'll finally show how the code gets executed through a Chalk offline query:

```
from sagemaker.workflow.function_step import step
from src.feature_sets import Transaction

@step(name="create_dataset")
def create_dataset(
    test_size: float, run_bucket: str, model_name: str
) -> tuple[str, str, str, str]:
    from chalk.client import ChalkClient
    from sklearn.model_selection import train_test_split

+    client = ChalkClient(branch="sagemaker_tutorial")
+
+    chalk_dataset = ChalkClient().offline_query(
+        recompute_features=True,
+        output=[
+            Transaction.amount,
+            Transaction.customer.fico,
+            # ... and more features
+        ],
+        # By giving this dataset a name, we will be able to view it in the Chalk
+        # dashboard, download it locally, or share it with collaborators.
+        name=model_name,
+    )
+
+    # Converting a chalk_dataset to a pandas (or polars) dataframe is
+    # efficient because both are backed by Apache Arrow.
+    dataset = chalk_dataset.to_pandas()

    # Create training data by removing the target column.
    X = dataset.drop(columns=[Transaction.confirmed_fraud])

    # Pull target from dataset.
    y = dataset[Transaction.confirmed_fraud]

    # Split data into train and test set. We will use k-fold cross-validation
    # for hyperparameter tuning
    X_train, X_test, y_train, y_test = train_test_split(
        X, y,test_size=test_size
    )

    xtrain_path = f"{run_bucket}/input/X_train.parquet"
    xtest_path = f"{run_bucket}/input/X_test.parquet"
    ytrain_path = f"{run_bucket}/input/y_train.parquet"
    ytest_path = f"{run_bucket}/input/y_test.parquet"

    dataset.to_parquet(f"{run_bucket}/raw_data/data.parquet")
    X_train.to_parquet(xtrain_path)
    y_train.to_parquet(ytrain_path)
    X_test.to_parquet(xtest_path)
    y_test.to_parquet(ytest_path)
    return xtrain_path, xtest_path, ytrain_path, ytest_path
```

The full code for this step can be found in
steps/dataset.py.

### Train and evaluate

Our last two steps will retrieve our features from S3 and train and evaluate our model. These steps don't depend on
Chalk or any other data sources, so you can rapidly iterate here without being blocked on feature engineering if your
training data does not require changes.

In steps/training.py, we load the training data from S3 and train a model using
scikit-learn:

```
from sagemaker.workflow.function_step import step

PARAM_GRID = {
    'xgb__n_estimators': [20, 50, 100, 200],
    'xgb__learning_rate': [0.01, 0.1, 0.2],
    'xgb__max_depth': [3, 5, 7, 9],
}

@step(
    name="model-training",
    instance_type="ml.m5.xlarge",
    keep_alive_period_in_seconds=300,
)
def train(
    xtrain_path: str,
    ytrain_path: str,
    num_rounds: int
):
    from sklearn.pipeline import Pipeline
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    from sklearn.impute import SimpleImputer
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import RandomizedSearchCV

    # Read data files from S3.
    X_train = pd.read_parquet(xtrain_path)
    y_train = pd.read_parquet(ytrain_path)

    pipeline = Pipeline(
        steps=[
            ("impute", (SimpleImputer())),
            ("scaler", StandardScaler()),
            ("xgb", GradientBoostingClassifier()),
        ]
    )
    # Note: Fraud detection commonly experiences the imbalanced learning problem.
    # In other words, fraud may be over or underrepresented in the dataset, leading
    # to unreliable results in production. When training a model for production,
    # the imbalance should be addressed, for example, by upsampling, downsampling,
    # or model choice.
    rsc = RandomizedSearchCV(
        pipeline,
        param_distributions=PARAM_GRID,
        n_iter=num_rounds,
        cv=3,
        scoring="f1",
        n_jobs=-1,
    )
    rsc.fit(X_train, y_train)

    return rsc.best_estimator_
```

In
steps/evaluate.py,
we load our newly trained model and evaluate it against our test dataset:

```
from sagemaker.workflow.function_step import step

@step(
    name="model-evaluation",
    instance_type='ml.t3.medium',
    keep_alive_period_in_seconds=300,
)
def evaluate(model, xtest_path: str, ytest_path: str, run_bucket: str) -> str:
    import pandas as pd
    from sklearn.metrics import (
        accuracy_score,
        f1_score,
        precision_score,
        recall_score,
    )
    import s3fs
    import json

    X_test = pd.read_parquet(xtest_path)
    y_test = pd.read_parquet(ytest_path)

    predictions = model.predict(X_test)

    results = {
        "accuracy": accuracy_score(y_test, predictions),
        "f1": f1_score(y_test, predictions),
        "precision": precision_score(y_test, predictions),
        "recall": recall_score(y_test, predictions),
    }

    # Upload evaluation report to s3
    s3_fs = s3fs.S3FileSystem()
    eval_src_s3 = f"{run_bucket}/evaluation/evaluation.json"

    with s3_fs.open(eval_src_s3, "wb") as file:
        file.write(json.dumps(results))

    return eval_src_s3
```

### Assemble the steps into a SageMaker pipeline

Finally, let's assemble our three steps into a SageMaker pipeline in
chalk_sagemaker_pipeline.py:

```
from sagemaker.workflow.pipeline import Pipeline
from steps.dataset import create_dataset
from steps.training import train
from steps.evaluate import evaluate
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from uuid import uuid4

BUCKET_PREFIX = "s3://chalk-sagemaker-models/"

if __name__ == "__main__":
    # Create Run Parameters
    model_package_group = "chalk-sagemaker-xgb"
    run_bucket = f"s3://chalk-sagemaker-models/{model_package_group}/{uuid4()}/"

    # Required F1 Threshold for model registration
    f1_threshold = ParameterFloat(name="F1Threshold", default_value=0.8)

    # Size of test split
    test_size = ParameterFloat(name="TestSize", default_value=0.2)

    # Number of estimators to evaluate
    num_rounds = ParameterInteger(name="NumRounds", default_value=50)
    run_bucket = ParameterString(name="RunBucket", default_value=run_bucket)
    model_package_group = ParameterString(name="ModelPackageGroup", default_value="chalk-sagemaker-xgb")

+    delayed_data = create_dataset(test_size=test_size, run_bucket=run_bucket)
+    delayed_model = train(xtrain_path=delayed_data[0], ytrain_path=delayed_data[2], num_rounds=num_rounds)
+    delayed_evaluation = evaluate(model=delayed_model, xtest_path=delayed_data[1], ytest_path=delayed_data[3], run_bucket=run_bucket)
+
+    pipeline = Pipeline(
+        name="ChalkaiSagemakerPipeline",
+        steps=[delayed_evaluation],
+        parameters=[
+            f1_threshold,
+            test_size,
+            run_bucket,
+            model_package_group,
+            num_rounds,
+        ]
+    )
```

### Deploy your model with SageMaker

There are a number of different ways to deploy the model that you trained on SageMaker. We recommend deploying your
model to an endpoint, which is documented
here.

### Efficient SageMaker inference with Chalk underscores

Once you've defined your SageMaker endpoint, you can run predictions against SageMaker from Chalk. This
can be accomplished by encoding your input features and then using the F.sagemaker_predict underscore.
function. You can specify your target SageMaker endpoint in the underscore function's parameters.

```
import chalk.functions as F

@features
class Transaction:
    id: int
    amount: float
    confirmed_fraud: bool
    # Use strings to reference Customer because Customer is defined beneath this class
    customer_id: "Customer.id"
    customer: "Customer"
    transacted_at: FeatureTime

    encoded_features: str
    transaction_fraud_prediction_raw: bytes = F.sagemaker_predict(
      F.string_to_bytes(_.encoded_features),
      endpoint="chalk-sagemaker-xgb-endpoint",
    )

@online
def get_encoded_features(amount: Transaction.amount, customer_fico: Transaction.customer.fico, ...) -> Transaction.encoded_features:
    return f"{amount},{customer_fico},..."
```

### Conclusion

In this tutorial, you got a look at how Chalk accelerates your feature engineering workflows by making it easy to
connect your data sources and unifying your feature code into a single Python codebase. Chalk fits naturally into a
SageMaker step for dataset generation and hands feature data off to the rest of your SageMaker pipeline for model
training, evaluation, and serving.





