Time
Chalk makes it easy to batch ingest historical feature data from bulk data sources.
When ingesting historical feature data from bulk data sources (such as a data warehouse or S3), you can override feature time with timestamps from the data source. Chalk uses feature time to determine when a feature should be included in your point-in-time queries for consistency with production.
First, define a FeatureTime
feature for the relevant feature class:
@features
class User:
id: int
...
backfilled_feature: str
...
ts: FeatureTime
Next, define a resolver to ingest data from your data source. Here’s an example of a SQL file resolver for ingesting data from Snowflake:
-- type: offline
-- resolves: user
-- source: snowflake
SELECT id, backfilled_feature, updated_at AS ts FROM source_table
Chalk assumes your timestamp column is in UTC. You may return many values of backfilled_feature
for the same id
, but
they should have different FeatureTime
values.
Then, after running chalk apply
, you can trigger this resolver to run once using the Chalk Dashboard or
the chalk trigger
command:
chalk trigger --resolver your.module.path.ingest_historical_data
You must manually trigger the resolver because it has no cron schedule specified.
Once your resolver run completes, your data will be available in the offline store with effective times specified by the
values returned for the FeatureTime
(in this example, updated_at
from source_table
). If the feature tolerates
staleness and etl_offline_to_online=True
, then Chalk will also insert feature values into the
online store if they are newer than existing values.
Resolvers that use incremental ingestion don’t re-process data from before their “max observed timestamp” by default, even if the query is changed.
Chalk lets you reset the maximum observed timestamp of incremental resolvers to a specific timestamp, or re-ingest all historical data.
Chalk uses offline queries to perform this operation. Suppose you want to add a new column,
favorite_color
, to this existing batch SQL resolver:
-- type: offline
-- resolves: user
-- source: snowflake
-- cron: "0 * * * *"
-- incremental:
-- incremental_column: updated_at
SELECT id, favorite_food, favorite_color, updated_at FROM preferences
If you have been running this resolver in production for a long time, then simply adding favorite_color
and
running chalk apply
will not ingest historical color preferences because the incremental
timestamp
lower bound will prevent the query from returning “old” rows which include these historical favorite_color
observations.
Instead, run an offline query to obtain a dataset of the feature values you want to ingest and then call
.ingest
to either store online or offline.
from chalk.client import ChalkClient
dataset = ChalkClient().offline_query(
input={
User.id: [i for i in range(1000)],
},
output=[
User.favorite_color
],
)
dataset.recompute(features=['user.favorite_color'])
dataset.ingest(store_offline=True)
Chalk can also backfill feature data for resolvers that take arguments to compute feature values. This is useful for generating historically accurate training datasets using new resolvers that are derived from features where we have historical observations.
Suppose we have the following feature class:
@features
class User:
id: int
name: str
reversed_name: str # a feature we want to backfill
with the following newly added resolver:
@online
def reverse_name(name: User.name) -> User.reversed_name:
return name[::-1]
Chalk can automatically compute this feature using historically observed values of User.name
. To do so, you can
either use an offline query with recompute_features=True
and
ingest the computed feature values as we did above. Alternatively, you can use chalk trigger
to
trigger a resolver run:
chalk trigger --resolver your.module.path.reverse_name --lower-bound 2024-05-05T12:00:00+00:00 --persist-offline=True
In this example, Chalk will query the offline store to sample all combinations of user.id
, user.name
, and the
corresponding FeatureTime
for which FeatureTime
is later than the ISO8601 timestamp passed in as the lower_bound
.
For each sample, Chalk will invoke your resolver and write the results back to the offline store. Chalk will also update
the online store if the new feature is marked with non-zero max_staleness
and etl_offline_to_online=True
.