Tutorials
Build a Chalk feature pipeline for training and serving models with AWS SageMaker.
Chalk enables you to build feature pipelines for machine learning models. AWS SageMaker is a popular service for building, training, and deploying models. Chalk and SageMaker fit together naturally to create a streamlined machine learning architecture that uses the best aspects of each system.
There are several advantages to using Chalk in your ML architecture alongside SageMaker.
In this tutorial, we’ll take a deep dive into how you can use Chalk to serve features combined with SageMaker for model training and serving. You will:
We’ll go through each of these steps using fraud detection as our example use case. The full code for this tutorial can be found in GitHub.
In this section, we’ll set up your Chalk feature pipeline.
Chalk integrates with AWS, GCP, various SQL databases, Databricks, and more. For this tutorial, we’ll pull transaction data from AWS DynamoDB, but you can add as many data sources as you need.
To connect DynamoDB, enter your AWS access key into the Chalk dashboard. After adding AWS, define your DynamoDB data source in Python and give it a name:
from chalk.sql import DynamoDBSource
DynamoDBSource(name="our_dynamo")
Features are structured data you can use as inputs to your model. With Chalk, you define your features in one place as Pydantic-style Python classes:
from datetime import date
from chalk import DataFrame, FeatureTime
from chalk.features import features, feature
@features
class Transaction:
id: int
amount: float
confirmed_fraud: bool
# Use strings to reference Customer because Customer is defined beneath this class
customer_id: "Customer.id"
customer: "Customer"
transacted_at: FeatureTime
@features
class Customer:
id: int
name: str
email: str
dob: date
income: int
# :tags: pii
ssn: str
# Set max_staleness on fico because fico relies on a third-party API call.
# We will read this value from a cache (as long as it was computed
# sometime in the last day).
fico: int = feature(max_staleness="1d")
# The transactions, linked by the Customer.id type on the Transaction.customer_id field
transactions: DataFrame[Transaction]
In this code, Transaction
is the feature class and each of its properties is a feature.
Resolvers are Chalk’s way of defining how to retrieve data and convert them into features.
Most of our features will be loaded from DynamoDB using SQL file resolvers. Here’s how we retrieve basic
Transaction
and Customer
data:
-- type: online
-- resolves: transaction
-- source: our_dynamo
SELECT
id,
amount,
confirmed_fraud,
customer_id,
created_at AS transacted_at
FROM
txns;
-- type: online
-- resolves: customer
-- source: our_dynamo
SELECT
id,
name,
email,
dob,
income,
ssn
FROM
customers;
Each column in the result must match the name of a feature. Use AS
to rename columns as needed.
For features requiring computation, we use Python resolvers. Here’s a resolver for retrieving a customer’s FICO score from an API:
from chalk import online
@online
def get_fico(name: Customer.name, ssn: Customer.ssn) -> Customer.fico:
# Replace with your preferred provider.
fico_score = FICOApi().get_score(name, ssn)
return Customer(fico=fico_score)
This resolver depends on Customer.name
and Customer.ssn
and returns Customer.fico
. When you deploy, Chalk uses
your resolver inputs and outputs to create a dependency graph to confirm your features can be generated safely.
The get_fico
resolver will be invoked in response to Chalk queries. Resolvers do not run exhaustively over your customer dataset, which helps you reduce costs related to unnecessary computation and API usage. Our resolver documentation goes into greater detail about how you can schedule resolvers and set up batch offline jobs. We also set max_staleness
to 1 day on the fico
feature, so this feature will be read from the online store's cache if the value has already been computed in the past day.
With a combination of Python and SQL resolvers, we have now told Chalk how to retrieve each of our features.
At this point, you can deploy Chalk to a local branch and start writing queries.
To deploy to a non-production branch, pass --branch
to chalk apply
:
$ chalk apply --branch sagemaker_tutorial
✓ Found resolvers
✓ Deployed branch
You can run ad hoc queries on this branch from the CLI:
$ chalk query --branch sagemaker_tutorial \
--in transaction.id=1 \
--out transaction.confirmed_fraud
Once you’re satisfied with your feature pipeline, you can move on to setting up SageMaker.
There are many ways to set up SageMaker. In this tutorial, we’ll use SageMaker steps to compose our model training pipeline. Steps allow us to modularize the code and we find them easier to work with than some alternatives, such as running the pipeline directly from one super long Jupyter notebook.
In this section, we’ll write steps for creating the dataset, training a candidate model, and evaluating the model.
In this step, we’ll use Chalk to generate a random sample of features from our dataset, split the dataset for training and testing using scikit-learn, write the results to S3, and return the S3 paths to be used in the next step.
Writing the split datasets to S3 provides a clear history of the data you used to train and evaluate your model. Additionally, it allows you re-run later steps in your pipeline without re-executing the dataset generation step.
In the previous section, we set up our features and resolvers without going into great detail about how the code would be executed. Here, we’ll finally show how the code gets executed through a Chalk offline query:
from sagemaker.workflow.function_step import step
from src.feature_sets import Transaction
@step(name="create_dataset")
def create_dataset(
test_size: float, run_bucket: str, model_name: str
) -> tuple[str, str, str, str]:
from chalk.client import ChalkClient
from sklearn.model_selection import train_test_split
client = ChalkClient(branch="sagemaker_tutorial")
chalk_dataset = ChalkClient().offline_query(
recompute_features=True,
output=[
Transaction.amount,
Transaction.customer.fico,
# ... and more features
],
# By giving this dataset a name, we will be able to view it in the Chalk
# dashboard, download it locally, or share it with collaborators.
name=model_name,
)
# Converting a chalk_dataset to a pandas (or polars) dataframe is
# efficient because both are backed by Apache Arrow.
dataset = chalk_dataset.to_pandas()
# Create training data by removing the target column.
X = dataset.drop(columns=[Transaction.confirmed_fraud])
# Pull target from dataset.
y = dataset[Transaction.confirmed_fraud]
# Split data into train and test set. We will use k-fold cross-validation
# for hyperparameter tuning
X_train, X_test, y_train, y_test = train_test_split(
X, y,test_size=test_size
)
xtrain_path = f"{run_bucket}/input/X_train.parquet"
xtest_path = f"{run_bucket}/input/X_test.parquet"
ytrain_path = f"{run_bucket}/input/y_train.parquet"
ytest_path = f"{run_bucket}/input/y_test.parquet"
dataset.to_parquet(f"{run_bucket}/raw_data/data.parquet")
X_train.to_parquet(xtrain_path)
y_train.to_parquet(ytrain_path)
X_test.to_parquet(xtest_path)
y_test.to_parquet(ytest_path)
return xtrain_path, xtest_path, ytrain_path, ytest_path
The full code for this step can be found in steps/dataset.py.
Our last two steps will retrieve our features from S3 and train and evaluate our model. These steps don’t depend on Chalk or any other data sources, so you can rapidly iterate here without being blocked on feature engineering if your training data does not require changes.
In steps/training.py, we load the training data from S3 and train a model using scikit-learn:
from sagemaker.workflow.function_step import step
PARAM_GRID = {
'xgb__n_estimators': [20, 50, 100, 200],
'xgb__learning_rate': [0.01, 0.1, 0.2],
'xgb__max_depth': [3, 5, 7, 9],
}
@step(
name="model-training",
instance_type="ml.m5.xlarge",
keep_alive_period_in_seconds=300,
)
def train(
xtrain_path: str,
ytrain_path: str,
num_rounds: int
):
from sklearn.pipeline import Pipeline
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import RandomizedSearchCV
# Read data files from S3.
X_train = pd.read_parquet(xtrain_path)
y_train = pd.read_parquet(ytrain_path)
pipeline = Pipeline(
steps=[
("impute", (SimpleImputer())),
("scaler", StandardScaler()),
("xgb", GradientBoostingClassifier()),
]
)
# Note: Fraud detection commonly experiences the imbalanced learning problem.
# In other words, fraud may be over or underrepresented in the dataset, leading
# to unreliable results in production. When training a model for production,
# the imbalance should be addressed, for example, by upsampling, downsampling,
# or model choice.
rsc = RandomizedSearchCV(
pipeline,
param_distributions=PARAM_GRID,
n_iter=num_rounds,
cv=3,
scoring="f1",
n_jobs=-1,
)
rsc.fit(X_train, y_train)
return rsc.best_estimator_
In steps/evaluate.py, we load our newly trained model and evaluate it against our test dataset:
from sagemaker.workflow.function_step import step
@step(
name="model-evaluation",
instance_type='ml.t3.medium',
keep_alive_period_in_seconds=300,
)
def evaluate(model, xtest_path: str, ytest_path: str, run_bucket: str) -> str:
import pandas as pd
from sklearn.metrics import (
accuracy_score,
f1_score,
precision_score,
recall_score,
)
import s3fs
import json
X_test = pd.read_parquet(xtest_path)
y_test = pd.read_parquet(ytest_path)
predictions = model.predict(X_test)
results = {
"accuracy": accuracy_score(y_test, predictions),
"f1": f1_score(y_test, predictions),
"precision": precision_score(y_test, predictions),
"recall": recall_score(y_test, predictions),
}
# Upload evaluation report to s3
s3_fs = s3fs.S3FileSystem()
eval_src_s3 = f"{run_bucket}/evaluation/evaluation.json"
with s3_fs.open(eval_src_s3, "wb") as file:
file.write(json.dumps(results))
return eval_src_s3
Finally, let’s assemble our three steps into a SageMaker pipeline in chalk_sagemaker_pipeline.py:
from sagemaker.workflow.pipeline import Pipeline
from steps.dataset import create_dataset
from steps.training import train
from steps.evaluate import evaluate
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
from uuid import uuid4
BUCKET_PREFIX = "s3://chalk-sagemaker-models/"
if __name__ == "__main__":
# Create Run Parameters
model_package_group = "chalk-sagemaker-xgb"
run_bucket = f"s3://chalk-sagemaker-models/{model_package_group}/{uuid4()}/"
# Required F1 Threshold for model registration
f1_threshold = ParameterFloat(name="F1Threshold", default_value=0.8)
# Size of test split
test_size = ParameterFloat(name="TestSize", default_value=0.2)
# Number of estimators to evaluate
num_rounds = ParameterInteger(name="NumRounds", default_value=50)
run_bucket = ParameterString(name="RunBucket", default_value=run_bucket)
model_package_group = ParameterString(name="ModelPackageGroup", default_value="chalk-sagemaker-xgb")
delayed_data = create_dataset(test_size=test_size, run_bucket=run_bucket)
delayed_model = train(xtrain_path=delayed_data[0], ytrain_path=delayed_data[2], num_rounds=num_rounds)
delayed_evaluation = evaluate(model=delayed_model, xtest_path=delayed_data[1], ytest_path=delayed_data[3], run_bucket=run_bucket)
pipeline = Pipeline(
name="ChalkaiSagemakerPipeline",
steps=[delayed_evaluation],
parameters=[
f1_threshold,
test_size,
run_bucket,
model_package_group,
num_rounds,
]
)
There are a number of different ways to deploy the model that you trained on SageMaker. We recommend deploying your model to an endpoint, which is documented here.
Once you’ve defined your SageMaker endpoint, you can run predictions against SageMaker from Chalk. This
can be accomplished by encoding your input features and then using the F.sagemaker_predict
underscore.
function. You can specify your target SageMaker endpoint in the underscore function’s parameters.
import chalk.functions as F
@features
class Transaction:
id: int
amount: float
confirmed_fraud: bool
# Use strings to reference Customer because Customer is defined beneath this class
customer_id: "Customer.id"
customer: "Customer"
transacted_at: FeatureTime
encoded_features: str
transaction_fraud_prediction_raw: bytes = F.sagemaker_predict(
F.string_to_bytes(_.encoded_features),
endpoint="chalk-sagemaker-xgb-endpoint",
)
@online
def get_encoded_features(amount: Transaction.amount, customer_fico: Transaction.customer.fico, ...) -> Transaction.encoded_features:
return f"{amount},{customer_fico},..."
In this tutorial, you got a look at how Chalk accelerates your feature engineering workflows by making it easy to connect your data sources and unifying your feature code into a single Python codebase. Chalk fits naturally into a SageMaker step for dataset generation and hands feature data off to the rest of your SageMaker pipeline for model training, evaluation, and serving.