Chalk home page
Docs
API
CLI
  1. Notebooks
  2. Tutorial

Chalk enables data science and machine learning teams to build and deploy feature pipelines for machine learning. For datascience workflows, Chalk can be used in entirely in a notebook to iteratively build features and generate training data.

In this tutorial, we will use Chalk in a Jupyter notebook to explore a dataset of credit card authorizations and build out some features and resolvers.


Table of Contents

  1. Configure Datasources
  2. Defining Features
  3. Defining Resolvers
  4. Computed Features
  5. Troubleshooting
  6. Summary

Configure data sources

The dataset we will be using for this tutorial is stored in tables in our Snowflake warehouse.

Chalk has built-in support for a number of SQL-like sources and can ingest data using SQL strings.

Adding a Snowflake source

Chalk provides a native integration with Snowflake as a SQL Source.

Before we can start ingesting data, we will initialize a SnowflakeSource. If you’re working on an existing project, it’s likely that this step has already been done for you.

notebook.ipynb
from chalk.sql import SnowflakeSource

snowflake = SnowflakeSource()

If you have multiple sources, you can initialize a SnowflakeSource by passing in the name of your Snowflake integration which can be defined in the Chalk Dashboard.

notebook.ipynb
from chalk.sql import SnowflakeSource

snowflake = SnowflakeSource(name="snowflake-integration")

Defining Features

Once we have our datasource setup, we can start defining features. We will start by defining the features we will ingest from our datasource. In further sections, we will define derived features that we will use to train a fraud classification model.

Our dataset consists of the following tables:

  • cards
  • merchants
  • authorizations
  • cardholders

Chalk lets you define your features in Python by decorating classes with the @feature decorator.

Chalk lets you define features directly in Python. To create a new FeatureSet, apply the @features decorator to a Python class with typed attributes.

In our notebook, we will now define the following feature sets:

notebook.ipynb
from datetime import datetime

from chalk.features import features, has_many, DataFrame, FeatureTime

@features
class Merchant:
    id: int
    name: str
    category: str
    country_code: str

@features
class Authorization:
    id: int
    amount_in_cents: int
    card_id: int
    merchant_id: int
    country_code: str
    status: str
    authorized_at: FeatureTime

    # Relationships
    card: "Card"

@features
class Card:
    id: int
    cardholder_id: int
    issued_at: datetime

    # Relationships
    authorizations: DataFrame[Authorization] = has_many(
        lambda: Authorization.card_id == Card.id
    )

@features
class CardHolder:
    id: int
    name: str
    address: str
    created_at: datetime

    # Relationships
    cards: DataFrame[Card] = has_many(lambda: CardHolder.id == Card.cardholder_id)

Here we have defined four feature sets: Merchant, Authorization, Card, and CardHolder. In the features sets, we have defined the root features which are the features that are directly fetched from the datasource as well as derived features. In the next section, we will look at how to resolve these features.

Primary Keys

Feature classes in Chalk need to have a unique id field. By default, Chalk will use the id field as the primary key for the feature set. However, if you want to use a different field as the primary key, you can specify it using the Primary argument as shown below.

notebook.ipynb
from chalk.features import Primary

@features
class Merchant:
  id: int
  merchant_id: Primary[int]
  name: str
  category: str
  country_code: str

Namespacing

Features are namespaced by their containing FeatureSet and by the name of the variable.

For example, as defined above Authorization would be the containing FeatureSet and it’s corresponding features would be named as follows:

Feature NameType
authorization.id Integer
authorization.amount_in_cents Integer
authorization.card_id Integer
authorization.merchant_id Integer
authorization.country_code String
authorization.status String
authorization.authorized_at FeatureTime

Relationships

We can also define relationships between features using the has_one or has_many functions, where the first argument specifies a function returning how to join the tables.

In the feature definitions above, we have defined a one-to-many relationship between Card and Authorization using the has_many function.

notebook.ipynb
@features
class Authorization:
  id: int
  amount_in_cents: int
  card_id: int
  merchant_id: int
  country_code: str
  status: str
  authorized_at: FeatureTime

  # Relationships
  # We defined the join condition between `Card` and `Authorization` below,
  # we don't need to repeat it here
  card: "Card"

@features
class Card:
  id: int
  cardholder_id: int
  issued_at: datetime

  # Relationships
  # The has-many relationship between `Card` and `Authorization`
  # specifying the join condition
  authorizations: DataFrame[Authorization] = has_many(
      lambda: Authorization.card_id == Card.id
  )

Feature Types

Chalk supports a number of different feature types including scalars, collections, dataclasses, Pydantic models and custom types. For a complete list of features, refer to Feature Types.

Using feature time

By default, our features are timestamped with the execution time of their resolvers. Since we want to be able to run point-in-time correct backfills, we will need to use the FeatureTime type to override the default behavior and explicitly specify the created_at field.

To learn more about how to use FeatureTime, refer to the section on Time.

notebook.ipynb
from chalk.features import FeatureTime

@features
class Authorization:
  # Root features
  id: int
  amount_in_cents: int
  card_id: int
  authorized_at: datetime
  authorized_at: FeatureTime

  # Relationships
  card: "Card"


Defining Resolvers

Next we will define the resolvers for the features we have defined above. A resolver is a function that defines how features are fetched or derived.

To ingest data from Snowflake for the features we defined above, we will define resolvers using SQL strings. Specifically we will use the query_string function on our Snowflake source defined above.

It is important to make sure the names of the features we are resolving match the names of the features we defined above. For example, in the resolver definition below, we alias created_at to authorized_at for Authorization

SQL Resolvers

We will use the %%resolver magic to define SQL resolvers in our notebook.

notebook.ipynb
%%resolver get_merchant_features
-- resolves: Merchant
-- source: snowflake
SELECT id,
       name,
       category,
       country_code
FROM merchant
notebook.ipynb
%%resolver get_cardholder_features
-- resolves: CardHolder
-- source: snowflake
SELECT id,
       name,
       address,
       created_at
FROM cardholder
notebook.ipynb
%%resolver get_authorization_features
-- resolves: Authorization
-- source: snowflake
SELECT id,
       amount_in_cents,
       card_id,
       merchant_id,
       status,
       country_code,
       created_at as authorized_at
FROM authorization
notebook.ipynb
%%resolver get_card_features
-- resolves: Card
-- source: snowflake
SELECT id,
       cardholder_id,
       issued_at
FROM card

Python resolvers

Alternatively, you can define resolvers using Python functions using the @offline decorator

A note on namespaces

Resolvers can take in multiple features as input, however, all feature dependencies in a single resolver must be from the same namespace.

Requiring features from the same root namespace

@offline
def fn(
    authorization_amount: Authorization.amount_in_cents,
    card_id: Authorization.card_id,
) -> Authorization.some_feature:
    return ...

Here, we incorrectly request features from the root namespaces of Authorization and Card:

Requiring features from different root namespaces

@online
def fn(
    authorization_amount: Authorization.amount_in_cents,
    card_id: Card.id
) -> Authorization.some_feature:
    return ...

Computed Features

The ChalkClient provides the offline_query method to compute features from the offline store.

To validate that we are able to resolve features from our offline store, we can run an offline query to resolve the features defined on Merchant.

If inputs are given, the query will return rows corresponding to those inputs, otherwise it will return a random sample according to the max_samples parameter.

Offline queries return a Dataset instance which can be converted to a Pandas DataFrame using the get_data_as_pandas method.

notebook.ipynb
dataset = client.offline_query(
    input={
        Merchant.id: [1, 2, 3]
    },
    output=[
        Merchant.id,
        Merchant.name,
        Merchant.category
    ],
    recompute_features=True,
).get_data_as_dataframe()

We get back the following DataFrame, validating that our resolvers are working as expected

┌───────────────────────┬───────────────────────────┬─────────────────┐
│ merchant.category     ┆ merchant.name             ┆ merchant.id     │
│ ---                   ┆ ---                       ┆ ---             │
│ str                   ┆ str                       ┆ i64             │
╞═══════════════════════╪═══════════════════════════╪═════════════════╡
│ Gas Station           ┆ Tucker, Hull and Gallegos ┆ 1               │
│ E-commerce            ┆ Silva-Odonnell            ┆ 2               │
│ Grocery               ┆ Taylor-Davis              ┆ 3               │
└───────────────────────┴───────────────────────────┴─────────────────┘

Note that we specified the recompute_features parameter to True to ensure that the features are recomputed by the resolvers. When set to False, output features are sampled from the offline store.

Feature Definitions

Let’s expand on the feature sets we have defined and add the following computed features:

notebook.ipynb
@features
class Authorization:
  id: int
  amount_in_cents: int
  card_id: int
  merchant_id: int
  country_code: str
  status: str
  authorized_at: FeatureTime

  # The authorization amount (in cents) of the previous transaction
  previous_auth_amount_in_cents: int

  # Relationships
  card: "Card"

@features
class Card:
  id: int
  cardholder_id: int
  issued_at: datetime

  # The total number of transactions
  count_transactions_total: int

  # The total number of transactions in the last 7 days
  count_transactions_7d: int

  # The number of days since the card was created
  days_since_card_created: int

  # Days since first transaction
  days_since_first_transaction: int

  # Days since last transaction
  days_since_last_transaction: int

  # Relationships
  authorizations: DataFrame[Authorization] = has_many(
      lambda: Authorization.card_id == Card.id
  )

Next, we will define resolvers for these features.

notebook.ipynb
%%resolver get_prev_auth_amounts
-- resolves: Authorization
-- source: snowflake
WITH ordered AS (
	SELECT
		id,
		card_id,
		amount_in_cents,
		LAG(amount_in_cents,
			1) OVER (PARTITION BY card_id ORDER BY created_at) AS previous_auth_amount_in_cents
	FROM
		AUTHORIZATION
)
SELECT
	id,
	previous_auth_amount_in_cents
FROM
	ordered
WHERE
	previous_auth_amount_in_cents IS NOT NULL
notebook.ipynb
from chalk.features import after

@offline
def get_count_all_txns(
    txns: Card.authorizations[Authorization.id],
) -> Card.count_transactions_total:
    return txns.count()


@offline
def get_count_7d_txns(
    txns: Card.authorizations[Authorization.id, after(days_ago=7)]
) -> Card.count_transactions_7d:
    return txns.count()

Projections & Filters

Chalk has support for time windowed aggregations using the before and after functions. In the resolvers above, we use the after operator to filter the transactions by the created_at field.

Additionally, since we don’t need to resolve all the features on Authorization to compute the counts, we can specify the features we need for this resolver using a projection. Projections allow us to scope down a DataFrame to only include the features we need. In this instance, we are using projections to only fetch the id field from Authorization.

In the resolvers above, we have combined filtering with a projection on the authorizations DataFrame. Refer to the section on composing projections and filters for more details.

notebook.ipynb
from datetime import datetime
from chalk import Now

@offline
def get_days_since_card_created(
    card_id: Card.id,
    issued_at: Card.issued_at,
    now: Now,
) -> Card.days_since_card_created:
    return (now - issued_at).days

@offline
def get_days_since_first_last_txn(
    txns: Card.authorizations[Authorization.created_at],
    now: Now,
) -> Features[
    Card.days_since_first_transaction,
    Card.days_since_last_transaction,
]:
    # Sort transactions by created_at
    sorted_txns = txns.sort(by=Authorization.created_at, descending=False)

    # Get first and last transaction dates
    first_txn_date = sorted_txns.first(col=Authorization.created_at)
    last_txn_date = sorted_txns.last(col=Authorization.created_at)

    return Card(
        days_since_first_transaction=(now - first_txn_date).days,
        days_since_last_transaction=(now - last_txn_date).days,
    )

Time-dependent resolvers

In the resolvers above, we made use of the Chalk feature Now which allows us to express time-dependency in our resolvers. This is useful for performing backfills which compute values that depend on values that are semantically similar to datetime.now().

In online queries, Now represents datetime.now(). In offline queries, we can use the input_times parameter to specify the times Now should resolve to allowing us to run backfills for many different historical points in time.

notebook.ipynb
# Example of running an offline query for multiple historical points in time
client.offline_query(
    input={Card.id: [1, 1, 1]},
    output=[Card.get_days_since_card_created],
    input_times=[
        datetime.now(),
        datetime.now() - timedelta(days=10),
        datetime.now() - timedelta(days=50),
    ],
)


# Output:
# ┌─────────┬──────────────────────────────┐
# │ card.id ┆ card.days_since_card_created │
# │ ---     ┆ ---                          │
# ╞═════════╪══════════════════════════════╡
# │ 1       ┆ 90                           │
# │ 1       ┆ 80                           │
# │ 1       ┆ 40                           │
# └─────────┴──────────────────────────────┘

Troubleshooting

Some queries that involve multiple operations might need additional tracking. Users can supply store_plan_stages=True to store intermediate outputs at all operations of the query. This will dramatically slow things down, so use wisely! These results are visible in the dashboard under the “Queries” page as shown below.

Query Plan

The Query Plan shows the operations that were executed to compute the query as well as the intermediate results at each stage. The numbers on the edges represent the number of rows of data that were passed from one stage to the next.

Chalk Query Plan

Intermediate Results

You can examine the intermediate results at each stage of the query plan by clicking on a specific stage and download the results as a parquet file.

Chalk Query Intermediate Results


Summary

In this tutorial, we learned how to use Chalk in a notebook to define features, resolvers and run offline queries.

To dive deeper into Chalk, check out our documentation on the topics listed below