Workflows
Chalk makes it easy to batch ingest historical feature data from bulk data sources.
If you need to ingest historical feature data from bulk data sources (i.e. a data warehouse or S3), you can use Chalk’s support for “feature timestamping”. This functionality allows you to override the default timestamps on the ingested feature values.
First, define a feature_time feature for the relevant feature set:
@features
class User:
id: int
...
backfilled_feature: str
...
ts: FeatureTime
Note that this ts
feature doesn’t need to correspond to a specific feature in your business domain.
Chalk simply uses this feature_time
to set the “observation time” of other features that your resolvers compute.
Please see documentation on feature_time for more information on this topic.
Now, define a resolver to ingest data from your data source:
@offline
def ingest_historical_data() -> DataFrame[User.id, User.backfilled_feature, User.ts]:
return (
snowflake
.query_string(
"SELECT id, backfilled_feature, updated_at FROM source_table"
fields={"updated_at": User.ts}
)
.all()
)
Chalk assumes that your timestamp column is UTC
, unless otherwise specified. Note: you may return many
values of backfilled_feature
for the same id
, but they should have different ts
values.
Then, after running chalk apply
, you can trigger this resolver to run one time using the Chalk Dashboard,
or the chalk trigger
command:
chalk trigger --resolver your.module.path.ingest_historical_data
You must manually trigger the resolver because it has no cron schedule specified.
Once your resolver run completes, your data will be available in the offline store with effective times
specified by the values returned for your feature_time
(in this example, updated_at
from source_table
).
If the feature is marked as tolerating staleness, and etl_offline_to_online=True
, then Chalk will also insert
feature values into the online store if they are newer than existing values.
Resolvers that use incremental ingest, don’t re-process data from before their “max observed timestamp” by default, even if the query is changed.
Chalk lets you reset the maximum observed timestamp of incremental resolvers to a specific timestamp, or re-ingest all historical data.
Chalk uses the chalk migrate
command to perform this operation. Suppose that you want to add a new column,
favorite_color
, to this existing batch SQL resolver:
@offline(cron="0 * * * *")
def ingest_preferences() -> DataFrame[User.id, User.favorite_food, User.favorite_color]:
return (
snowflake
.query_string("SELECT id, favorite_food, favorite_color, updated_at FROM preferences")
.incremental(incremental_column="updated_at")
)
If you have been running this resolver in production for a long time, then simply adding favorite_color
and
running chalk apply
will not ingest historical color preferences because the incremental
timestamp
lower bound will prevent the query from returning “old” rows which include these historical favorite_color
observations.
Instead, run a migration and override the default incremental behavior:
chalk migrate --resolver your.module.path.ingest_preferences --reset-incremental
# or
chalk migrate --resolver your.module.path.ingest_preferences --incremental-lower-bound 2022-01-01T00:00:00.000Z
The first variant will ingest all available historical data. The second will ingest data newer (inclusive) than the lower bound that you specify with an ISO8601 timestamp.
Chalk can also backfill feature data for resolvers that take arguments to compute feature values. This is useful for generating historically accurate training datasets using new resolvers that are derived from features where we have historical observations.
Suppose we have the following feature class:
@features
class User:
id: int
name: str
reversed_name: str # a feature we want to backfill
with the following newly added resolver:
@online
def reverse_name(name: User.name) -> User.reversed_name:
return name[::-1]
Chalk can automatically compute this feature using historically observed values of User.name
. To do this, you
use the chalk
CLI tool to plan and execute a “migration”:
chalk migrate --resolver your.module.path.reverse_name
In this example, Chalk will query the offline store to sample all combinations of:
| user.id | user.name | observed_at |
and then invoke your resolver. For each sampled tuple (id, name, observed_at)
, Chalk will write
(id, reverse_name(name), observed-at)
back to the offline store, and to the online store if
the new feature is marked with non-zero max_staleness
and etl_offline_to_online=True
.