Tutorials
Work through an example using Chalk in a Jupyter notebook
Chalk enables data science and machine learning teams to build and deploy feature pipelines for machine learning. For data science workflows, Chalk can be used in entirely in a notebook to iteratively build features and generate training data.
In this tutorial, we will use Chalk in a Jupyter notebook to explore a dataset of credit card authorizations and build out some features and resolvers.
The dataset we will be using for this tutorial is stored in tables in our Snowflake warehouse.
Chalk has built-in support for a number of SQL-like sources and can ingest data using SQL strings.
Chalk provides a native integration with Snowflake as a SQL Source.
Before we can start ingesting data, we will initialize
a SnowflakeSource
.
If you’re working on an existing project, it’s likely that this
step has already been done for you.
from chalk.sql import SnowflakeSource
snowflake = SnowflakeSource()
If you have multiple sources, you can initialize a
SnowflakeSource
by passing in the name of your Snowflake integration which can be
defined in the Chalk Dashboard.
from chalk.sql import SnowflakeSource
snowflake = SnowflakeSource(name="snowflake-integration")
Once we have our data source setup, we can start defining features. We will start by defining the features we will ingest from our data source. In further sections, we will define derived features that we will use to train a fraud classification model.
Our dataset consists of the following tables:
cards
merchants
authorizations
cardholders
Chalk lets you define your features in Python by decorating classes
with the @feature
decorator.
Chalk lets you define features directly in Python. To create a new FeatureSet
,
apply the @features
decorator to a Python class with typed attributes.
In our notebook, we will now define the following feature classes:
from datetime import datetime
from chalk.features import features, has_many, DataFrame, FeatureTime
@features
class Merchant:
id: int
name: str
category: str
country_code: str
@features
class Authorization:
id: int
amount_in_cents: int
card_id: int
merchant_id: int
country_code: str
status: str
authorized_at: FeatureTime
# Relationships
card: "Card"
@features
class Card:
id: int
cardholder_id: int
issued_at: datetime
# Relationships
authorizations: DataFrame[Authorization] = has_many(
lambda: Authorization.card_id == Card.id
)
@features
class CardHolder:
id: int
name: str
address: str
created_at: datetime
# Relationships
cards: DataFrame[Card] = has_many(lambda: CardHolder.id == Card.cardholder_id)
Here we have defined four feature classes: Merchant
, Authorization
, Card
,
and CardHolder
.
In the features sets, we have defined the root features which are the
features that are directly fetched from the datasource as well as derived features.
In the next section, we will look at how to resolve these features.
Feature classes in Chalk need to have a unique id
field. By default,
Chalk will use the id
field as the primary key for the feature class.
However, if you want to use a different field as the primary key, you can
specify it using the Primary
argument as shown below.
from chalk.features import Primary
@features
class Merchant:
id: int
merchant_id: Primary[int]
name: str
category: str
country_code: str
Features are namespaced by their containing FeatureSet
and by the
name of the variable.
For example, as defined above Authorization
would be the containing
FeatureSet
and its corresponding features would be named as follows:
Feature Name | Type |
---|---|
authorization.id | Integer |
authorization.amount_in_cents | Integer |
authorization.card_id | Integer |
authorization.merchant_id | Integer |
authorization.country_code | String |
authorization.status | String |
authorization.authorized_at | FeatureTime |
We can also define relationships between features using the has_one
or has_many
functions, where the first argument specifies a function
returning how to join the tables.
In the feature definitions above, we have defined a one-to-many relationship between Card
and Authorization
using the has_many
function.
@features
class Authorization:
id: int
amount_in_cents: int
card_id: int
merchant_id: int
country_code: str
status: str
authorized_at: FeatureTime
# Relationships
# We defined the join condition between `Card` and `Authorization` below,
# we don't need to repeat it here
card: "Card"
@features
class Card:
id: int
cardholder_id: int
issued_at: datetime
# Relationships
# The has-many relationship between `Card` and `Authorization`
# specifying the join condition
authorizations: DataFrame[Authorization] = has_many(
lambda: Authorization.card_id == Card.id
)
Chalk supports a number of different feature types including scalars, collections, dataclasses, Pydantic models and custom types. For a complete list of features, refer to Feature Types.
By default, our features are timestamped with the execution time of their resolvers.
Since we want to be able to run point-in-time correct backfills, we will need
to use the FeatureTime
type to override the default behavior and explicitly
use the authorized_at
field.
To learn more about how to use FeatureTime
, refer to our time documentation.
from chalk.features import FeatureTime
@features
class Authorization:
# Root features
id: int
amount_in_cents: int
card_id: int
authorized_at: datetime
authorized_at: FeatureTime
# Relationships
card: "Card"
Next we will define the resolvers for the features we have defined above. A resolver is a function that defines how features are fetched or derived.
To ingest data from Snowflake for the features we defined above, we will define
resolvers using SQL strings.
Specifically we will use the query_string
function on our Snowflake source defined above.
It is important to make sure the names of the features we are resolving match the
names of the features we defined above.
For example, in the resolver definition below,
we alias created_at
to authorized_at
for Authorization
We will use the %%resolver
magic to define SQL resolvers in our notebook.
%%resolver get_merchant_features
-- resolves: Merchant
-- source: snowflake
SELECT id,
name,
category,
country_code
FROM merchant
%%resolver get_cardholder_features
-- resolves: CardHolder
-- source: snowflake
SELECT id,
name,
address,
created_at
FROM cardholder
%%resolver get_authorization_features
-- resolves: Authorization
-- source: snowflake
SELECT id,
amount_in_cents,
card_id,
merchant_id,
status,
country_code,
created_at as authorized_at
FROM authorization
%%resolver get_card_features
-- resolves: Card
-- source: snowflake
SELECT id,
cardholder_id,
issued_at
FROM card
Alternatively, you can define resolvers using Python functions using the @offline
decorator
A note on namespaces
Resolvers can take in multiple features as input, however, all feature dependencies in a single resolver must be from the same namespace.
Requiring features from the same root namespace
@offline
def fn(
authorization_amount: Authorization.amount_in_cents,
card_id: Authorization.card_id,
) -> Authorization.some_feature:
return ...
Here, we incorrectly request features from the root namespaces
of Authorization
and Card
:
Requiring features from different root namespaces
@online
def fn(
authorization_amount: Authorization.amount_in_cents,
card_id: Card.id
) -> Authorization.some_feature:
return ...
The ChalkClient
provides the offline_query
method to compute features from the offline store.
To validate that we are able to resolve features from our offline store, we can run an
offline query to resolve the features defined on Merchant
.
If inputs are given, the query will return rows corresponding to those inputs, otherwise it
will return a random sample according to the max_samples
parameter.
Offline queries return a Dataset
instance which can be converted to a
Pandas DataFrame using the get_data_as_pandas
method.
dataset = client.offline_query(
input={
Merchant.id: [1, 2, 3]
},
output=[
Merchant.id,
Merchant.name,
Merchant.category
],
recompute_features=True,
).get_data_as_dataframe()
We get back the following DataFrame
, validating that our resolvers are working as expected
┌───────────────────────┬───────────────────────────┬─────────────────┐
│ merchant.category ┆ merchant.name ┆ merchant.id │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 │
╞═══════════════════════╪═══════════════════════════╪═════════════════╡
│ Gas Station ┆ Tucker, Hull and Gallegos ┆ 1 │
│ E-commerce ┆ Silva-Odonnell ┆ 2 │
│ Grocery ┆ Taylor-Davis ┆ 3 │
└───────────────────────┴───────────────────────────┴─────────────────┘
Note that we specified the recompute_features
parameter to True
to ensure that the features are recomputed by the resolvers.
When set to False
, output features are sampled from the offline store.
Let’s expand on the feature classes we have defined and add the following computed features:
@features
class Authorization:
id: int
amount_in_cents: int
card_id: int
merchant_id: int
country_code: str
status: str
authorized_at: FeatureTime
# The authorization amount (in cents) of the previous transaction
previous_auth_amount_in_cents: int
# Relationships
card: "Card"
@features
class Card:
id: int
cardholder_id: int
issued_at: datetime
# The total number of transactions
count_transactions_total: int
# The total number of transactions in the last 7 days
count_transactions_7d: int
# The number of days since the card was created
days_since_card_created: int
# Days since first transaction
days_since_first_transaction: int
# Days since last transaction
days_since_last_transaction: int
# Relationships
authorizations: DataFrame[Authorization] = has_many(
lambda: Authorization.card_id == Card.id
)
Next, we will define resolvers for these features.
%%resolver get_prev_auth_amounts
-- resolves: Authorization
-- source: snowflake
WITH ordered AS (
SELECT
id,
card_id,
amount_in_cents,
LAG(amount_in_cents,
1) OVER (PARTITION BY card_id ORDER BY created_at) AS previous_auth_amount_in_cents
FROM
AUTHORIZATION
)
SELECT
id,
previous_auth_amount_in_cents
FROM
ordered
WHERE
previous_auth_amount_in_cents IS NOT NULL
from chalk.features import after
@offline
def get_count_all_txns(
txns: Card.authorizations[Authorization.id],
) -> Card.count_transactions_total:
return txns.count()
@offline
def get_count_7d_txns(
txns: Card.authorizations[Authorization.id, after(days_ago=7)]
) -> Card.count_transactions_7d:
return txns.count()
Chalk has support for time windows
using the before
and after
functions.
In the resolvers above, we use the after
operator to
filter the transactions by the created_at
field.
Additionally, since we don’t need to resolve all the features on Authorization
to compute the counts, we can specify the features we need for this resolver using a projection.
Projections allow us to scope down a DataFrame
to only include the features we need.
In this instance, we are using projections to only fetch the id
field from Authorization
.
In the resolvers above, we have combined filtering with a projection on the authorizations DataFrame
.
Refer to the section on
composing projections and filters
for more details.
from datetime import datetime
from chalk import Now
@offline
def get_days_since_card_created(
card_id: Card.id,
issued_at: Card.issued_at,
now: Now,
) -> Card.days_since_card_created:
return (now - issued_at).days
@offline
def get_days_since_first_last_txn(
txns: Card.authorizations[Authorization.created_at],
now: Now,
) -> Features[
Card.days_since_first_transaction,
Card.days_since_last_transaction,
]:
# Sort transactions by created_at
sorted_txns = txns.sort(by=Authorization.created_at, descending=False)
# Get first and last transaction dates
first_txn_date = sorted_txns.first(col=Authorization.created_at)
last_txn_date = sorted_txns.last(col=Authorization.created_at)
return Card(
days_since_first_transaction=(now - first_txn_date).days,
days_since_last_transaction=(now - last_txn_date).days,
)
In the resolvers above, we made use of the Chalk feature Now
which
allows us to express time-dependency in our resolvers.
This is useful for performing backfills which compute values that depend
on values that are semantically similar to datetime.now()
.
In online queries, Now
represents datetime.now()
. In offline queries,
we can use the input_times
parameter
to specify the times Now
should resolve to allowing us to run backfills for many different
historical points in time.
# Example of running an offline query for multiple historical points in time
client.offline_query(
input={Card.id: [1, 1, 1]},
output=[Card.get_days_since_card_created],
input_times=[
datetime.now(),
datetime.now() - timedelta(days=10),
datetime.now() - timedelta(days=50),
],
)
# Output:
# ┌─────────┬──────────────────────────────┐
# │ card.id ┆ card.days_since_card_created │
# │ --- ┆ --- │
# ╞═════════╪══════════════════════════════╡
# │ 1 ┆ 90 │
# │ 1 ┆ 80 │
# │ 1 ┆ 40 │
# └─────────┴──────────────────────────────┘
Some queries that involve multiple operations might need additional tracking.
Users can supply store_plan_stages=True
to store
intermediate outputs at all operations of the query.
This will dramatically slow things down, so use wisely!
These results are visible in the dashboard under the “Queries” page as shown below.
The Query Plan shows the operations that were executed to compute the query as well as the intermediate results at each stage. The numbers on the edges represent the number of rows of data that were passed from one stage to the next.
You can examine the intermediate results at each stage of the query plan by clicking on a specific stage and download the results as a parquet file.
In this tutorial, we learned how to use Chalk in a notebook to define features, resolvers and run offline queries.
To dive deeper into Chalk, check out our documentation on the topics listed below