Tutorials
Build a feature pipeline for fraud detection.
Chalk helps you build out feature pipelines for training and serving machine learning models.
The building blocks of Chalk are features. Each piece of data in your system, whether a column in a database or a value passed in at inference, is a feature. For example, a user’s age and whether they are an adult might be a features in your system:
from chalk.features import features
@features
class User:
id: int
age: int
is_adult: bool
Features are computed by resolvers. A resolver is a function that takes features as arguments and outputs new features. For example, a resolver might take a user’s age and output a boolean indicating whether they are over 18.
from chalk.features import online
@online
def is_adult(age: User.age) -> User.is_adult:
return age >= 18
The focus on data instead of pipelines may be unfamiliar at first. Traditional orchestration platforms like Airflow or Dagster explicitly compose functions which produce data into a DAG of tasks. With Chalk, the DAG of resolvers is defined implicitly by the features they produce. This architecture makes it easy to build out feature pipelines that are reusable and composable. Chalk handles tracking your features for temporal consistency, running your resolvers in parallel, and horizontally scaling your feature pipelines.
This tutorial will walk you through the process of building a feature pipeline for a simple model. We will be building a feature pipeline for a fraud detection model, and will cover the full feature development lifecycle:
Before you get started, make sure you have the Chalk CLI installed.
If you want to skip ahead, you can find the full source code for this tutorial on GitHub.
We’ll start by modeling the features we want for our
the users in our system.
We’ll start simple with three
scalar features:
user.id
, user.name
, and user.email
.
First, we’ll create a new file called models.py
where we’ll define a User
class decorated with
@features
.
from chalk.features import features
@features
class User:
id: int
# The name the user provided to us at signup.
# :owner: identity@chalk.ai
# :tags: pii
name: str
# :tags: pii
email: str
Note that at this point, we haven’t defined how to compute these features. We are only thinking about the data that we would like to have.
There are a few things to note here. First, all our feature
classes need to have a unique id
field. By default, this
is the field named id
. However, if you want to use a
different feature name as the primary key, you can specify it
by describing the primary key feature with the Primary
type.
from chalk.features import features
@features
class User:
id: int
user_id: Primary[int]
name: str
email: str
In our features above, we’ve added some comments and annotations to our features. These are optional, but can be useful for documentation and for setting alerting policies. For example, you may wish to send Pagerduty alerts to different teams based on the owner of the related feature.
Any of the comments and tags from the code also show up in the Chalk dashboard, and are indexed for search.
For example, we’ve added a pii
tag
to the name
and email
fields. This means that
these fields will be treated as personally identifiable
information and will be subject to additional
restrictions.
Next up, we’ll define a related feature class to our users.
We’ll call this class Account
and it will represent
a bank account that a user owns.
from chalk.features import features
@features
class Account:
id: int
# The name of the owner of the account.
title: str
# The id of the user that owns this account.
user_id: int
# The balance of the account, in dollars.
balance: float
This should look much like what we did for the User
class.
However, we may want to link these two classes together.
We can do this by adding a user
field to the Account
class.
@features
class Account:
id: int
user_id: int
user_id: User.id
balance: float
# The user that owns this account.
user: User
This denotes that each account has one user, and that the
Account.user_id
and the User.id
are equal and of type int
, as described by Account.user_id
.
Once we’ve defined the relationship on one side of the join, we can define the inverse relationship on the other side without needing to specify the predicate again.
@features
class User:
id: int
name: str
email: str
# The account that this user owns.
account: "Account"
The final feature entity that we’ll define in this tutorial is
for transactions. Each account has many transactions, and each
transaction is linked to a single account. We’ll define the
Transaction
class and link it to our Account
class as follows:
from chalk.features import features
from chalk.features import features, DataFrame
class TransactionStatus(str, Enum):
PENDING = "pending"
CLEARED = "cleared"
FAILED = "failed"
@features
class Transaction:
id: int
# The id of the account that this transaction belongs to, set to a join.
# We refer to features and feature classes defined further down in the file
# using quotation marks, so Chalk will recognize that it is a valid
# feature reference to be processed later.
account_id: "Account.id"
# The amount of the transaction, in dollars.
amount: float
# The status of the transaction, defined as an enum above.
status: TransactionStatus
# Because we define the join condition between
# `Transaction` and `Account` below, we don't
# need to repeat it here.
account: "Account"
@features
class User:
id: int
name: str
email: str
# The account that this user owns.
account: "Account"
@features
class Account:
id: int
user_id: User.id
balance: float
user: User
transactions: DataFrame[Transaction]
This is the first time we’re seeing the DataFrame
type.
A Chalk DataFrame
models tabular data in much the same
way that pandas
does. However, there are some key differences that
allow the Chalk DataFrame
to increase type safety and performance.
Like pandas, Chalk’s DataFrame
is a two-dimensional data structure with
rows and columns. You can perform operations like filtering, grouping,
and aggregating on a DataFrame
. However, there are two main differences.
DataFrame
is lazy and can be backed by multiple data sources, where a pandas.DataFrame
executes eagerly in memory.DataFrame[...]
can be used to represent a type of data with pre-defined filters.You can read more about the Chalk DataFrame
in the docs and
API Reference.
You might also notice that we’ve used an Enum
feature here.
Chalk supports many feature types, including
Enum
,
lists and sets,
and @dataclasses
.
A primary source of data for many companies is a SQL database. Chalk can automatically ingest data from SQL databases and map it to feature classes.
In our example application, we have two databases: PostgreSQL and Snowflake. Our PostgreSQL database is the primary database used in our codebase, and our Snowflake database is used for analytics, with tables populated from DBT views and batch jobs.
To configure our SQL sources in Chalk, we’ll create a
datasources.py
file that contains a
SnowflakeSource
and a
PostgreSQLSource
:
from chalk.sql import SnowflakeSource, PostgreSQLSource
snowflake = SnowflakeSource()
postgres = PostgreSQLSource()
These singleton variables can be used to query data in
Python SQL resolvers.
They’re also necessary before we can write any
.chalk.sql
files, as we’ll do below.
Chalk’s preferred way to ingest data from SQL databases is to use SQL file resolvers. This allows us to write queries in the same language as our database, and to use the same tooling to test and debug them.
To create a SQL file resolver, we create a file in our project directory with
the extension .chalk.sql
. We can then write a SQL query in this file, and
add metadata
to the top of the file to tell Chalk how to ingest the data.
From our User
feature class, we may want to resolve the
name
and email
attributes from a PostgreSQL table.
To do this, we can write the following SQL file resolver:
-- The features given to us by the user.
-- resolves: user
-- source: postgres
select
id,
full_name as name,
email
from users;
The resolves
key tells Chalk which feature class the columns
in the select
statement should be mapped to.
Then, the target names of the query are
compared against the names of the attributes on the feature class.
If the names match after stripping underscores and lower-casing,
the select target is mapped to the feature.
In the example above, we aliased the full_name
column to name
,
so it will be mapped to the name
attribute on the User
feature class.
Chalk validates your SQL file resolvers when you run
chalk apply
.
The source
key tells Chalk which integration
to use to connect to the database. Since we have only one PostgreSQL
database, we can reference the source
as postgres
. If we had
multiple PostgreSQL databases, we can use named integrations to
reference different databases.
Other comments in the SQL file resolver are indexed by Chalk and can be searched in the Chalk dashboard.
Now that we’ve written a resolver, we can deploy our feature pipeline and query our data in realtime.
In testing, it can be helpful to deploy your feature pipeline to a branch, which allows you to test your changes without affecting the production feature pipeline. Branch deployments take only a few seconds to deploy.
$ chalk apply --branch tutorial
✓ Found resolvers
✓ Deployed branch
Now that we’ve deployed our feature pipeline, we can query our data in realtime. One of the easiest ways to do this is from the Chalk CLI.
$ chalk query --in user.id=1 --out user.name --out user.email
user.name "John Doe"
email "john@doe.com"
This query will fetch the name
and email
attributes from the User
feature
class for the user with id=1
, hitting the PostgreSQL database directly.
Note that in SQL file resolver that we wrote,
we didn’t include a where
clause.
However, Chalk automatically pushes down filters to the database
when querying features.
So, the SQL that will execute against our PostgreSQL database
will be:
select
id,
full_name as name,
email
from users
where id = 1;
Chalk can also push down non-primary key filters to SQL databases.
For example, to fetch all transactions for a user, Chalk will
modify the
SQL-resolver query
to include a where
clause:
select
id,
account_id,
amount,
status,
date
from txns
where account_id = 38;
In addition to online data, we can also ingest data from SQL databases into Chalk’s offline store. Offline data won’t be queried in realtime, but can be used to train models and generate features.
For our Account
feature class, we may want to ingest data from a
Snowflake table. We can write a
SQL file resolver
to do this:
-- Incrementally ingest account data from Snowflake.
-- This comment will be searchable in the Chalk dashboard.
--
-- resolves: account
-- source: snowflake
-- type: offline
-- cron: 5m
-- incremental:
-- mode: row
-- lookback: 1h
select
id,
user_id,
amount,
updated_at
from accounts;
There are a few differences between this SQL file resolver and the one
we wrote for the User
feature class.
First, we’ve added a type
key to the header. This tells Chalk
that this resolver should be used to ingest data into the offline store.
If we didn’t include this key, Chalk would assume that this resolver
could be queried in realtime.
Second, we’ve added a cron
key to the header. This tells Chalk
to run this resolver on a schedule. In this case, we’re telling Chalk
to run this resolver every 5 minutes.
Finally, we’ve added an
incremental
key to the header.
This tells Chalk to only ingest new data from the database, and is helpful
when you have an immutable events table. Also, notice the new
updated_at
column in the select
statement. We’ll map that column
to a FeatureTime
attribute in our feature class:
from chalk.features import feature, features, FeatureTime
@features
class Account:
id: int
user_id: int
amount: float
updated_at: FeatureTime
Features with overriden observation timestamps are inserted into the offline store with the timestamp that you specify. The observation timestamp works like an “effective as of” timestamp. When you sample historical data, you can specify the observation timestamp at which you want to sample a feature value. Then, Chalk will return the most-recent feature value that was observed before that timestamp. This method of sampling ensures temporal consistency in your feature values.
While our offline data is useful for training models and generating features, we may also want to use these values for serving production queries.
However, data warehouses like Snowflake and BigQuery are optimized for analytics and are not well-suited for transactional queries.
We can have Chalk
reverse-ETL
our offline data into our online store
by setting the
max_staleness
and
etl_offline_to_online
keyword arguments on our
@features
decorator:
@features
@features(max_staleness="infinity", etl_offline_to_online=True)
class Account:
id: int
user_id: int
amount: float
updated_at: FeatureTime
The max_staleness
keyword argument tells
Chalk how stale a feature value can be before it should be refreshed.
In this case, we’re telling Chalk that we’ll tolerate arbitrarily old
feature values. However, we could also specify a max_staleness
of
1h
or 1d
to tell Chalk not to serve feature values that are older
than 1 hour or 1 day.
The etl_offline_to_online
keyword argument
tells Chalk to reverse-ETL our offline data into our online store.
By default, data only enters the online store when it’s queried in
realtime. However, by setting this keyword argument, we’re telling
Chalk to reverse-ETL our offline data into our online store.
So far, we’ve mapped SQL tables into feature classes. But there’s a lot more we can do with
Chalk. In this step, we’ll add features to our Account
and User
feature classes
that are gathered from API calls and computed downstream of other features.
We’ve noticed that some fraudsters try to link stolen accounts to our platform and attempt to transfer money through our system. To detect this behavior, we want to compute a similarity score between the user’s name and the account’s title.
We’ll start by adding this new feature, account_name_match
,
to our User
feature class.
@features
class User:
id: int
name: str
email: str
account: "Account"
# The similarity between the user's name and the account's title.
account_name_match: float
Next, we’ll define a resolver that computes this feature. We’ll use Jaccard similarity to compute the similarity score.
from src.models import User
from chalk import online
@online
def account_name_match(
title: User.account.title,
name: User.name,
) -> User.account_name_match:
"""Docstrings show up in the Chalk dashboard"""
intersection = set(title) & set(name)
union = set(title) | set(name)
return len(intersection) / len(union)
The @online
decorator tells Chalk that this resolver should be called
in realtime when the User.account_name_match
feature is requested.
Our feature dependencies are declared in the function signature
as User.account.title
and User.name
.
Chalk will automatically retrieve
User.account_id
and User.name
from our
user.chalk.sql
resolver. Then, using this account id,
Chalk will retrieve Account.title
from the online store, where it has been cached from our cron run of the
balance.chalk.sql
resolver.
Resolvers are callable functions, so we can test them like any other Python function. Let’s test our new resolver by writing a unit test:
from src.resolvers import account_name_match
def test_names_match():
"""Resolvers can be unit tested exactly as you would expect.
Here, the `account_name_match` resolver should return 1.0
because the `title` and `name` are identical.
"""
assert 1 == account_name_match(
title="John Coltrane",
name="John Coltrane",
)
def test_names_completely_different():
"""The `account_name_match` resolver should return 0
because the `title` and `name` don't share any characters.
"""
assert 0 == account_name_match(
title="John Coltrane",
name="Zyx",
)
You can read more about testing resolvers in the API docs.
Any Python function can be used as a resolver. This means that we can call APIs to compute features. Let’s add a feature that computes the user’s FICO score from our credit scoring vendor, Experian.
As before, we’ll first add the features that we want to compute:
from chalk.features import feature
@features
class User:
id: int
name: str
email: str
account_name_match: float
# The fraud score, as provided by a third-party vendor.
fico_score: int = feature(min=300, max=850, strict=True)
# Tags from our credit scoring vendor.
credit_score_tags: list[str]
We are adding
strict validation
to our fico_score
feature
to ensure that we only store and utilize valid FICO scores.
Now, we can write a resolver to fetch the user’s FICO score from Experian.
from src.models import User
from src.mocks import experian
from chalk.features import online, Features
@online
def get_fraud_score(
name: User.name,
email: User.email,
) -> Features[User.fico_score, User.credit_score_tags]:
response = experian.get_credit_score(name, email)
# We don't need to provide all the features for
# the `User` class, only the ones that we want to update.
return User(
fico_score=response['fico_score'],
credit_score_tags=response['tags'],
)
Here, we are returning two features of the user,
User.fico_score
and User.credit_score_tags
.
We use the Features
type to indicate
which feature we expect to return.
Also note that we are initializing the User
class with
only the features that we want to update.
This partial initialization is the primary difference
between Python’s
@dataclass
and Chalk’s
@features
.
Finally, we’ll want to deploy our new resolvers. As before, we can check our work by using a branch deployment:
$ chalk apply --branch tutorial
✓ Found resolvers
✓ Deployed branch
We can then query our new features:
$ chalk query --branch tutorial \
--in user.id=1 \
--out user.name_match_score
Now that we’ve written some features and resolvers and deployed them to Chalk, we’re ready to integrate Chalk into our production decisioning systems.
As a sanity check, it can be helpful to use the Chalk CLI to query a well-known input and ensure that we get the expected output.
We can use the chalk query
command,
passing in the id of a user, and the names of the features
we want to resolve:
$ chalk query --in user.id=1 \
--out user.name \
--out user.email \
--out user.account.balance
Results
user.name "John Doe"
email "john@doe.com"
user.account.balance 2032.91
Once we’re satisfied that our features and resolvers are working as expected, we can use a client library to query Chalk from our application.
In this first example, we’ll use the
ChalkClient
in the
chalkpy
package
to query Chalk from our application:
from src.models import User
from chalk.client import ChalkClient
# Create a new Chalk client. By default, this will
# pick up the login credentials generated after running
# `chalk login`.
client = ChalkClient()
client.query(
input=User(id=1234),
output=[
User.id,
User.name,
User.fico_score,
User.account.balance,
],
)
We use the same feature definitions for querying our data as we used for defining our features and resolvers.
Chalk has API client libraries in several languages, including Python, Go, Typescript, and Elixir.
All API clients can operate on the string names of features. However, in a production system, you may have many hundreds or thousands of features, and want to avoid hard-coding the names of each feature in your code.
To help with this, Chalk can codegen a library of strongly-typed feature names for you.
For example, say the service that calls into Chalk is written in Go. We can generate a Go library of feature names with the following command:
$ chalk codegen go --out ./clients/go/client.go --package=client
✓ Found resolvers
✓ Wrote features to file './clients/go/client.go'
✓ Please do not change the generated code.
This generates a file
clients/go/client.go
that looks like this:
package client
/**************************************
Code generated by Chalk. DO NOT EDIT.
> chalk codegen go --out ./clients/go/client.go --package client
**************************************/
import (
"github.com/chalk-ai/chalk-go"
"time"
)
var InitFeaturesErr error
type Account struct {
Id *int64
Title *string
UserId *int64
Balance *float64
User *User
UpdatedAt *time.Time
}
type User struct {
Id *int64
Name *string
Email *string
Account *Account
AccountNameMatch *float64
FicoScore *int64
CreditScoreTags *[]any
}
var Features struct {
Account *Account
User *User
}
func init() {
InitFeaturesErr = chalk.InitFeatures(&Features)
}
We can then use this library to query Chalk:
import (
"github.com/chalk-ai/chalk-go"
)
// Create a new Chalk client.
client := chalk.NewClient()
// Create an empty struct to hold the results.
user := User{}
// Query Chalk, and add the results to the struct.
_, err = client.OnlineQuery(
chalk.OnlineQueryParams{}.
WithInput(Features.User.Id, 1234).
WithOutputs(
Features.User.Id,
Features.User.LastName,
Features.User.FicoScore,
Features.User.Account.Balance,
),
&user,
)
// Now, you can access the properties of the
// user for which there was a matching `output`.
fmt.Println(user.Account.Balance)
If your calling service is written in Python, but you don’t want to take a dependency on the repository that contains your Chalk features, you can generate your Python features into a separate repository:
$ chalk codegen python --out ./clients/python/client.py
You can see the generated code in
clients/python/client.py
.
If you are generating Python into a subdirectory of your
Chalk project, be sure to add an entry to your
.chalkignore
containing the directory of your generated code
(in the above example, clients/
).
Otherwise, Chalk will find duplicate definitions
of your features.