Chalk home page
Docs
API
CLI
  1. Workflows
  2. Batch Backfilling

Simple Backfills from Bulk Sources

If you need to ingest historical feature data from bulk data sources (i.e. a data warehouse or S3), you can use Chalk’s support for “feature timestamping”. This functionality allows you to override the default timestamps on the ingested feature values.

First, define a feature_time feature for the relevant feature set:

@features
class User:
    id: int
    ...
    backfilled_feature: str
    ...
    ts: FeatureTime

Note that this ts feature doesn’t need to correspond to a specific feature in your business domain. Chalk simply uses this feature_time to set the “observation time” of other features that your resolvers compute. Please see documentation on feature_time for more information on this topic.

Now, define a resolver to ingest data from your data source:

@offline
def ingest_historical_data() -> DataFrame[User.id, User.backfilled_feature, User.ts]:
    return (
        snowflake
            .query_string(
                "SELECT id, backfilled_feature, updated_at FROM source_table"
                fields={"updated_at": User.ts}
            )
            .all()
    )

Chalk assumes that your timestamp column is UTC, unless otherwise specified. Note: you may return many values of backfilled_feature for the same id, but they should have different ts values.

Then, after running chalk apply, you can trigger this resolver to run one time using the Chalk Dashboard, or the chalk trigger command:

chalk trigger --resolver your.module.path.ingest_historical_data

You must manually trigger the resolver because it has no cron schedule specified.

Once your resolver run completes, your data will be available in the offline store with effective times specified by the values returned for your feature_time (in this example, updated_at from source_table). If the feature is marked as tolerating staleness, and etl_offline_to_online=True, then Chalk will also insert feature values into the online store if they are newer than existing values.


Re-ingesting Incremental Resolvers

Resolvers that use incremental ingest, don’t re-process data from before their “max observed timestamp” by default, even if the query is changed.

Chalk lets you reset the maximum observed timestamp of incremental resolvers to a specific timestamp, or re-ingest all historical data.

Chalk uses the chalk migrate command to perform this operation. Suppose that you want to add a new column, favorite_color, to this existing batch SQL resolver:

@offline(cron="0 * * * *")
def ingest_preferences() -> DataFrame[User.id, User.favorite_food, User.favorite_color]:
    return (
        snowflake
            .query_string("SELECT id, favorite_food, favorite_color, updated_at FROM preferences")
            .incremental(incremental_column="updated_at")
    )

If you have been running this resolver in production for a long time, then simply adding favorite_color and running chalk apply will not ingest historical color preferences because the incremental timestamp lower bound will prevent the query from returning “old” rows which include these historical favorite_color observations.

Instead, run a migration and override the default incremental behavior:

chalk migrate --resolver your.module.path.ingest_preferences --reset-incremental
# or
chalk migrate --resolver your.module.path.ingest_preferences --incremental-lower-bound 2022-01-01T00:00:00.000Z

The first variant will ingest all available historical data. The second will ingest data newer (inclusive) than the lower bound that you specify with an ISO8601 timestamp.


Backfilling Python Resolvers

Chalk can also backfill feature data for resolvers that take arguments to compute feature values. This is useful for generating historically accurate training datasets using new resolvers that are derived from features where we have historical observations.

Suppose we have the following feature class:

@features
class User:
    id: int
    name: str

    reversed_name: str # a feature we want to backfill

with the following newly added resolver:

@online
def reverse_name(name: User.name) -> User.reversed_name:
    return name[::-1]

Chalk can automatically compute this feature using historically observed values of User.name. To do this, you use the chalk CLI tool to plan and execute a “migration”:

chalk migrate --resolver your.module.path.reverse_name

In this example, Chalk will query the offline store to sample all combinations of:

| user.id | user.name | observed_at |

and then invoke your resolver. For each sampled tuple (id, name, observed_at), Chalk will write (id, reverse_name(name), observed-at) back to the offline store, and to the online store if the new feature is marked with non-zero max_staleness and etl_offline_to_online=True.