Chalk home page
Docs
API
CLI
  1. Tutorials
  2. Tutorial: AWS SageMaker with Chalk

Introduction

Chalk enables you to build feature pipelines for machine learning models. AWS SageMaker is a popular service for building, training, and deploying models. Chalk and SageMaker fit together naturally to create a streamlined machine learning architecture that uses the best aspects of each system.

Why use Chalk?

There are several advantages to using Chalk in your ML architecture alongside SageMaker.

  1. Use the same Python codebase for notebook iteration, training, and serving. Chalk enables you to maintain a single Python codebase that integrates with many data sources, including Postgres, Databricks, Snowflake, Kafka, and others. As you tweak your feature definitions, you can deploy your updates to separate branches to try your new features without needing to wait for a long build step.
  2. Generate features when they’re needed. Chalk computes features in response to queries. Other systems such as SageMaker Feature Store require you to precompute features from batch data. We believe the precomputation style is limiting because this style prevents users from relying on real-time data. For example, precomputation prevents users from writing features reliant on recency (such as the number of failed logins in the past minute) or third-party APIs (which may be costly to exhaustively pre-fetch).

What you'll learn

In this tutorial, we’ll take a deep dive into how you can use Chalk to serve features combined with SageMaker for model training and serving. You will:

  • Set up your feature pipeline with Chalk:
    • Connect your data sources (in this tutorial, DynamoDB)
    • Write feature definitions in Python
    • Write resolvers defining how to generate those features from your data source
  • Set up your SageMaker pipeline:
    • Run Chalk offline queries and prepare datasets for training and evaluation
    • Train your model
    • Evaluate your model
  • Deploy your model with SageMaker

We’ll go through each of these steps using fraud detection as our example use case. The full code for this tutorial can be found in GitHub.


Set up Chalk

In this section, we’ll set up your Chalk feature pipeline.

Connect your data source

Chalk integrates with AWS, GCP, various SQL databases, Databricks, and more. For this tutorial, we’ll pull transaction data from AWS DynamoDB, but you can add as many data sources as you need.

To connect DynamoDB, enter your AWS access key into the Chalk dashboard. After adding AWS, define your DynamoDB data source in Python and give it a name:

src/datasources.py
from chalk.sql import DynamoDBSource

DynamoDBSource(name="our_dynamo")

Define features

Features are structured data you can use as inputs to your model. With Chalk, you define your features in one place as Pydantic-style Python classes:

from datetime import date

from chalk import DataFrame, FeatureTime
from chalk.features import features, feature


@features
class Transaction:
    id: int
    amount: float
    confirmed_fraud: bool
    # Use strings to reference Customer because Customer is defined beneath this class
    customer_id: "Customer.id"
    customer: "Customer"
    transacted_at: FeatureTime


@features
class Customer:
    id: int
    name: str
    email: str
    dob: date
    income: int
    # :tags: pii
    ssn: str
    # Set max_staleness on fico because fico relies on a third-party API call.
    # We will read this value from a cache (as long as it was computed
    # sometime in the last day).
    fico: int = feature(max_staleness="1d")
    # The transactions, linked by the Customer.id type on the Transaction.customer_id field
    transactions: DataFrame[Transaction]

In this code, Transaction is the feature class and each of its properties is a feature.

Define resolvers

Resolvers are Chalk’s way of defining how to retrieve data and convert them into features.

Most of our features will be loaded from DynamoDB using SQL file resolvers. Here’s how we retrieve basic Transaction and Customer data:

src/resolvers/transactions.chalk.sql
-- type: online
-- resolves: transaction
-- source: our_dynamo
SELECT
  id,
  amount,
  confirmed_fraud,
  customer_id,
  created_at AS transacted_at
FROM
  txns;
src/resolvers/customers.chalk.sql
-- type: online
-- resolves: customer
-- source: our_dynamo
SELECT
  id,
  name,
  email,
  dob,
  income,
  ssn
FROM
  customers;

Each column in the result must match the name of a feature. Use AS to rename columns as needed.

For features requiring computation, we use Python resolvers. Here’s a resolver for retrieving a customer’s FICO score from an API:

src/models.py
from chalk import online

@online
def get_fico(name: Customer.name, ssn: Customer.ssn) -> Customer.fico:
    # Replace with your preferred provider.
    fico_score = FICOApi().get_score(name, ssn)
    return Customer(fico=fico_score)

This resolver depends on Customer.name and Customer.ssn and returns Customer.fico. When you deploy, Chalk uses your resolver inputs and outputs to create a dependency graph to confirm your features can be generated safely.

The get_fico resolver will be invoked in response to Chalk queries. Resolvers do not run exhaustively over your customer dataset, which helps you reduce costs related to unnecessary computation and API usage. Our resolver documentation goes into greater detail about how you can schedule resolvers and set up batch offline jobs. We also set max_staleness to 1 day on the ficofeature, so this feature will be read from the online store's cache if the value has already been computed in the past day.

With a combination of Python and SQL resolvers, we have now told Chalk how to retrieve each of our features.

Deploy Chalk

At this point, you can deploy Chalk to a local branch and start writing queries.

To deploy to a non-production branch, pass --branch to chalk apply:

$ chalk apply --branch sagemaker_tutorial
✓ Found resolvers
✓ Deployed branch

You can run ad hoc queries on this branch from the CLI:

$ chalk query --branch sagemaker_tutorial  \
              --in     transaction.id=1 \
              --out    transaction.confirmed_fraud

Once you’re satisfied with your feature pipeline, you can move on to setting up SageMaker.

Set up SageMaker

There are many ways to set up SageMaker. In this tutorial, we’ll use SageMaker steps to compose our model training pipeline. Steps allow us to modularize the code and we find them easier to work with than some alternatives, such as running the pipeline directly from one super long Jupyter notebook.

In this section, we’ll write steps for creating the dataset, training a candidate model, and evaluating the model.

Create the dataset

In this step, we’ll use Chalk to generate a random sample of features from our dataset, split the dataset for training and testing using scikit-learn, write the results to S3, and return the S3 paths to be used in the next step.

Writing the split datasets to S3 provides a clear history of the data you used to train and evaluate your model. Additionally, it allows you re-run later steps in your pipeline without re-executing the dataset generation step.

In the previous section, we set up our features and resolvers without going into great detail about how the code would be executed. Here, we’ll finally show how the code gets executed through a Chalk offline query:

steps/dataset.py
from sagemaker.workflow.function_step import step
from src.feature_sets import Transaction

@step(name="create_dataset")
def create_dataset(
  test_size: float, run_bucket: str, model_name: str
) -> tuple[str, str, str, str]:
  from chalk.client import ChalkClient
  from sklearn.model_selection import train_test_split

   client = ChalkClient(branch="sagemaker_tutorial")

   chalk_dataset = ChalkClient().offline_query(
       recompute_features=True,
       output=[
           Transaction.amount,
           Transaction.customer.fico,
           # ... and more features
       ],
       # By giving this dataset a name, we will be able to view it in the Chalk
       # dashboard, download it locally, or share it with collaborators.
       name=model_name,
   )

   # Converting a chalk_dataset to a pandas (or polars) dataframe is
   # efficient because both are backed by Apache Arrow.
   dataset = chalk_dataset.to_pandas()

  # Create training data by removing the target column.
  X = dataset.drop(columns=[Transaction.confirmed_fraud])

  # Pull target from dataset.
  y = dataset[Transaction.confirmed_fraud]

  # Split data into train and test set. We will use k-fold cross-validation
  # for hyperparameter tuning
  X_train, X_test, y_train, y_test = train_test_split(
      X, y,test_size=test_size
  )

  xtrain_path = f"{run_bucket}/input/X_train.parquet"
  xtest_path = f"{run_bucket}/input/X_test.parquet"
  ytrain_path = f"{run_bucket}/input/y_train.parquet"
  ytest_path = f"{run_bucket}/input/y_test.parquet"

  dataset.to_parquet(f"{run_bucket}/raw_data/data.parquet")
  X_train.to_parquet(xtrain_path)
  y_train.to_parquet(ytrain_path)
  X_test.to_parquet(xtest_path)
  y_test.to_parquet(ytest_path)
  return xtrain_path, xtest_path, ytrain_path, ytest_path

The full code for this step can be found in steps/dataset.py.

Train and evaluate

Our last two steps will retrieve our features from S3 and train and evaluate our model. These steps don’t depend on Chalk or any other data sources, so you can rapidly iterate here without being blocked on feature engineering if your training data does not require changes.

In steps/training.py, we load the training data from S3 and train a model using scikit-learn:

steps/training.py
from sagemaker.workflow.function_step import step

PARAM_GRID = {
    'xgb__n_estimators': [20, 50, 100, 200],
    'xgb__learning_rate': [0.01, 0.1, 0.2],
    'xgb__max_depth': [3, 5, 7, 9],
}

@step(
    name="model-training",
    instance_type="ml.m5.xlarge",
    keep_alive_period_in_seconds=300,
)
def train(
    xtrain_path: str,
    ytrain_path: str,
    num_rounds: int
):
    from sklearn.pipeline import Pipeline
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    from sklearn.impute import SimpleImputer
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import RandomizedSearchCV

    # Read data files from S3.
    X_train = pd.read_parquet(xtrain_path)
    y_train = pd.read_parquet(ytrain_path)

    pipeline = Pipeline(
        steps=[
            ("impute", (SimpleImputer())),
            ("scaler", StandardScaler()),
            ("xgb", GradientBoostingClassifier()),
        ]
    )
    # Note: Fraud detection commonly experiences the imbalanced learning problem.
    # In other words, fraud may be over or underrepresented in the dataset, leading
    # to unreliable results in production. When training a model for production,
    # the imbalance should be addressed, for example, by upsampling, downsampling,
    # or model choice.
    rsc = RandomizedSearchCV(
        pipeline,
        param_distributions=PARAM_GRID,
        n_iter=num_rounds,
        cv=3,
        scoring="f1",
        n_jobs=-1,
    )
    rsc.fit(X_train, y_train)

    return rsc.best_estimator_

In steps/evaluate.py, we load our newly trained model and evaluate it against our test dataset:

steps/evaluate.py
from sagemaker.workflow.function_step import step

@step(
    name="model-evaluation",
    instance_type='ml.t3.medium',
    keep_alive_period_in_seconds=300,
)
def evaluate(model, xtest_path: str, ytest_path: str, run_bucket: str) -> str:
    import pandas as pd
    from sklearn.metrics import (
        accuracy_score,
        f1_score,
        precision_score,
        recall_score,
    )
    import s3fs
    import json

    X_test = pd.read_parquet(xtest_path)
    y_test = pd.read_parquet(ytest_path)

    predictions = model.predict(X_test)

    results = {
        "accuracy": accuracy_score(y_test, predictions),
        "f1": f1_score(y_test, predictions),
        "precision": precision_score(y_test, predictions),
        "recall": recall_score(y_test, predictions),
    }

    # Upload evaluation report to s3
    s3_fs = s3fs.S3FileSystem()
    eval_src_s3 = f"{run_bucket}/evaluation/evaluation.json"

    with s3_fs.open(eval_src_s3, "wb") as file:
        file.write(json.dumps(results))

    return eval_src_s3

Assemble the steps into a SageMaker pipeline

Finally, let’s assemble our three steps into a SageMaker pipeline in chalk_sagemaker_pipeline.py:

chalk_sagemaker_pipeline.py
from sagemaker.workflow.pipeline import Pipeline
from steps.dataset import create_dataset
from steps.training import train
from steps.evaluate import evaluate
from sagemaker.workflow.parameters import (
  ParameterInteger,
  ParameterString,
  ParameterFloat,
)
from uuid import uuid4

BUCKET_PREFIX = "s3://chalk-sagemaker-models/"

if __name__ == "__main__":
  # Create Run Parameters
  model_package_group = "chalk-sagemaker-xgb"
  run_bucket = f"s3://chalk-sagemaker-models/{model_package_group}/{uuid4()}/"

  # Required F1 Threshold for model registration
  f1_threshold = ParameterFloat(name="F1Threshold", default_value=0.8)

  # Size of test split
  test_size = ParameterFloat(name="TestSize", default_value=0.2)

  # Number of estimators to evaluate
  num_rounds = ParameterInteger(name="NumRounds", default_value=50)
  run_bucket = ParameterString(name="RunBucket", default_value=run_bucket)
  model_package_group = ParameterString(name="ModelPackageGroup", default_value="chalk-sagemaker-xgb")

   delayed_data = create_dataset(test_size=test_size, run_bucket=run_bucket)
   delayed_model = train(xtrain_path=delayed_data[0], ytrain_path=delayed_data[2], num_rounds=num_rounds)
   delayed_evaluation = evaluate(model=delayed_model, xtest_path=delayed_data[1], ytest_path=delayed_data[3], run_bucket=run_bucket)

   pipeline = Pipeline(
       name="ChalkaiSagemakerPipeline",
       steps=[delayed_evaluation],
       parameters=[
           f1_threshold,
           test_size,
           run_bucket,
           model_package_group,
           num_rounds,
       ]
   )

Deploy your model with SageMaker

There are a number of different ways to deploy the model that you trained on SageMaker. We recommend deploying your model to an endpoint, which is documented here.

Efficient SageMaker inference with Chalk underscores

Once you’ve defined your SageMaker endpoint, you can run predictions against SageMaker from Chalk. This can be accomplished by encoding your input features and then using the F.sagemaker_predict underscore. function. You can specify your target SageMaker endpoint in the underscore function’s parameters.

import chalk.functions as F

@features
class Transaction:
    id: int
    amount: float
    confirmed_fraud: bool
    # Use strings to reference Customer because Customer is defined beneath this class
    customer_id: "Customer.id"
    customer: "Customer"
    transacted_at: FeatureTime

    encoded_features: str
    transaction_fraud_prediction_raw: bytes = F.sagemaker_predict(
      F.string_to_bytes(_.encoded_features),
      endpoint="chalk-sagemaker-xgb-endpoint",
    )

@online
def get_encoded_features(amount: Transaction.amount, customer_fico: Transaction.customer.fico, ...) -> Transaction.encoded_features:
    return f"{amount},{customer_fico},..."

Conclusion

In this tutorial, you got a look at how Chalk accelerates your feature engineering workflows by making it easy to connect your data sources and unifying your feature code into a single Python codebase. Chalk fits naturally into a SageMaker step for dataset generation and hands feature data off to the rest of your SageMaker pipeline for model training, evaluation, and serving.