Chalk home page
Docs
API
CLI
  1. Resolvers
  2. Time

By default, features are timestamped with the execution time of their resolver. However, you can override this behavior by providing timestamps from your data source This functionality can be helpful when working with an event store or timestamped API.

The first step is to add a FeatureTime attribute to your feature set.

By default, if you have a feature named ts with a type of datetime.datetime, it will be treated as a feature time.

from chalk.features import features, FeatureTime

@features
class User:
    ts: datetime

However, if you wish to use a different name, you can use the FeatureTime annotation:

from chalk.features import features, FeatureTime

@features
class User:
    timestamp: FeatureTime

The class FeatureTime is a datetime.datetime with a typing.Annotation applied.

Using this new feature, you can now introspect the time features were created in resolvers, and return feature data with custom timestamps.


Using feature time

To pull the feature time, require the ts field from the user in the arguments of your function. This value will be set to the most-recent value of all arguments to your function.

@offline
def fn(name: User.name, ts: User.ts) -> ...:

Overriding feature time

You can also provide the time that a set of features was created by supplying a value for the ts field in your resolver.

Building on the above example:

@offline
def fn(...) -> Features[User.name, User.ts]:
    return User(
        name="Maryam Mirzakhani",
        ts=datetime(2014, 8, 12)
    )

Interaction with the online store

Features with overriden observation timestamps are treated specially when inserted into the online store. In particular, Chalk will always check for existing “newer” feature values in the online store before inserting historically dated feature values. This means that you can safely ingest large quantities of backdated features without accidentally ingesting stale data into the online store.

Additionally, once features are inserted into the online store, Chalk tracks the source observation timestamps when these feature values are returned as part of online queries. Chalk uses these source timestamps to compute the “feature staleness” metric. Staleness in this context is defined as “query time - observation time”.


Interaction with the offline store

Features with overriden observation timestamps are inserted into the offline store with the timestamp that you specify. The observation timestamp works like an “effective as of” timestamp, so if you insert something like this:

| id | feature | value | timestamp            |
|---------------------------------------------|
| 1  | age     | 7     | 2022-02-01T00:00:00Z |

into an offline store that already contained these observations:

| id | feature | value | timestamp            |
|---------------------------------------------|
| 1  | age     | 6     | 2022-01-01T00:00:00Z |
| 1  | age     | 8     | 2022-03-01T00:00:00Z |

then the observation will be interleaved “in between” the existing observations, and you would see the following query results:

id, age, <= 2022-02-01
output: 7

id, age, <= 2022-03-02
output: 8

id, age, <= 2022-01-02
output: 6

Now: Explicitly time-dependent resolvers

Chalk supports resolvers that are explicitly time-dependent. This is useful for performing backfills which compute values that depend on timestamps that are semantically similar to datetime.now().

You can express time-dependency by declaring a dependency on a special feature called Now, which gets converted into a datetime within resolvers:

@online
from chalk.features import Now

def get_age_in_years(birthday: User.birthday, now: Now) -> User.age_in_years:
    return (now - birthday).years

In online query, (i.e. with ChalkClient().query), Now is datetime.now() if the now parameter is unused. In offline query contexts, now will be set to the appropriate input_time value for the calculation. This allows you to backfill a feature for a single entity at many different historical time points:

ChalkClient().offline_query(input={User.id: [1,1,1]}, output=[User.age_in_years], input_times=[
    datetime.now() - timedelta(days=365*10),
    datetime.now() - timedelta(days=365),
    datetime.now() - timedelta(days=0),
])

## output:

# | id | age_in_years |
# | 1  | <age> - 10   |
# | 1  | <age> - 1    |
# | 1  | <age> - 0    |

Now can be used in DataFrame resolvers as well in order to compute bulk values:

@online
def batch_get_age_in_years(df: DataFrame[User.id, User.birthday, Now]) -> DataFrame[User.id, User.age_in_years]:
    return (
        df.to_polars()
            .select(
                pl.col(User.id),
                pl.col(str(User.birthday) - pl.col(str(Now))).alias(str(User.age_in_years))
            )
    )