Mapping data from SQL sources to feature classes.
If you want to skip ahead, you can find the full source code for this tutorial on GitHub.
A primary source of data for many companies is a SQL database. Chalk can automatically ingest data from SQL databases and map it to feature classes.
In our example application, we have two databases: PostgreSQL and Snowflake. Our PostgreSQL database is the primary database used elsewhere in our codebase, and our Snowflake database is used for analytics, with tables populated from DBT views and batch jobs.
To configure our SQL sources in Chalk, we’ll create a
datasources.py
file that contains a
SnowflakeSource
and a
PostgreSQLSource
:
from chalk.sql import SnowflakeSource, PostgreSQLSource
snowflake = SnowflakeSource()
postgres = PostgreSQLSource()
These singleton variables can be used to query data in
Python SQL resolvers.
They’re also necessary before we can write any
.chalk.sql
files, as we’ll do below.
Chalk’s preferred way to ingest data from SQL databases is to use SQL file resolvers. This allows us to write queries in the same language as our database, and to use the same tooling to test and debug them.
To create a SQL file resolver, we create a file in our project directory with
the extension .chalk.sql
. We can then write a SQL query in this file, and
add metadata
to the top of the file to tell Chalk how to ingest the data.
From our User
feature class, we may want to resolve the
name
and email
attributes from a PostgreSQL table.
To do this, we can write the following SQL file resolver:
-- The features given to us by the user.
-- resolves: user
-- source: postgres
select
id,
full_name as name,
email
from users;
The resolves
key tells Chalk which feature class the columns
in the select
statement should be mapped to.
Then, the target names of the query are
compared against the names of the attributes on the feature class.
If the names match after stripping underscores and lower-casing,
the select target is mapped to the feature.
In the example above, we aliased the full_name
column to name
,
so it will be mapped to the name
attribute on the User
feature class.
Chalk validates your SQL file resolvers when you run
chalk apply
.
The source
key tells Chalk which integration
to use to connect to the database. Since we have only one PostgreSQL
database, we can reference the source
as postgres
. If we had
multiple PostgreSQL databases, we can use named integrations to
reference different databases.
Other comments in the SQL file resolver are indexed by Chalk and can be searched in the Chalk dashboard.
Now that we’ve written a resolver, we can deploy our feature pipeline and query our data in realtime.
In testing, it can be helpful to deploy your feature pipeline to a branch, which allows you to test your changes without affecting the production feature pipeline. Branch deployments take only a few seconds to deploy.
$ chalk apply --branch tutorial
✓ Found resolvers
✓ Deployed branch
Now that we’ve deployed our feature pipeline, we can query our data in realtime. One of the easiest ways to do this is from the Chalk CLI.
$ chalk query --in user.id=1 --out user.name --out user.email
user.name "John Doe"
email "john@doe.com"
This query will fetch the name
and email
attributes from the User
feature
class for the user with id=1
, hitting the PostgreSQL database directly.
Note that in SQL file resolver that we wrote,
we didn’t include a where
clause.
However, Chalk automatically pushes down filters to the database
when querying features.
So, the SQL that will execute against our PostgreSQL database
will be:
select
id,
full_name as name,
email
from users
where id = 1;
Chalk can also push down non-primary key filters to SQL databases.
For example, to fetch all transactions for a user, Chalk will
modify the
SQL-resolver query
to include a where
clause:
select
id,
account_id,
amount,
status,
date
from txns
where account_id = 38;
In addition to online data, we can also ingest data from SQL databases into Chalk’s offline store. Offline data won’t be queried in realtime, but can be used to train models and generate features.
For our Account
feature class, we may want to ingest data from a
Snowflake table. We can write a
SQL file resolver
to do this:
-- Incrementally ingest account data from Snowflake.
-- This comment will be searchable in the Chalk dashboard.
--
-- resolves: account
-- source: snowflake
-- type: offline
-- cron: 5m
-- incremental:
-- mode: row
-- lookback: 1h
select
id,
user_id,
amount,
updated_at
from accounts;
There are a few differences between this SQL file resolver and the one
we wrote for the User
feature class.
First, we’ve added a type
key to the header. This tells Chalk
that this resolver should be used to ingest data into the offline store.
If we didn’t include this key, Chalk would assume that this resolver
could be queried in realtime.
Second, we’ve added a cron
key to the header. This tells Chalk
to run this resolver on a schedule. In this case, we’re telling Chalk
to run this resolver every 5 minutes.
Finally, we’ve added an
incremental
key to the header.
This tells Chalk to only ingest new data from the database, and is helpful
when you have an immutable events table. Also, notice the new
updated_at
column in the select
statement. We’ll map that column
to a FeatureTime
attribute in our feature class:
from chalk.features import feature, features, FeatureTime
@features
class Account:
id: int
user_id: int
amount: float
updated_at: FeatureTime
Features with overriden observation timestamps are inserted into the offline store with the timestamp that you specify. The observation timestamp works like an “effective as of” timestamp. When you sample historical data, you can specify the observation timestamp at which you want to sample a feature value. Then, Chalk will return the most-recent feature value that was observed before that timestamp. This method of sampling ensures temporal consistency in your feature values.
While our offline data is useful for training models and generating features, we may also want to use these values for serving production queries.
However, data warehouses like Snowflake and BigQuery are optimized for analytics and are not well-suited for transactional queries.
We can have Chalk
reverse-ETL
our offline data into our online store
by setting the
max_staleness
and
etl_offline_to_online
keyword arguments on our
@features
decorator:
@features
@features(max_staleness="infinity", etl_offline_to_online=True)
class Account:
id: int
user_id: int
amount: float
updated_at: FeatureTime
The max_staleness
keyword argument tells
Chalk how stale a feature value can be before it should be refreshed.
In this case, we’re telling Chalk that we’ll tolerate arbitrarily old
feature values. However, we could also specify a max_staleness
of
1h
or 1d
to tell Chalk not to serve feature values that are older
than 1 hour or 1 day.
The etl_offline_to_online
keyword argument
tells Chalk to reverse-ETL our offline data into our online store.
By default, data only enters the online store when it’s queried in
realtime. However, by setting this keyword argument, we’re telling
Chalk to reverse-ETL our offline data into our online store.