Resolvers
Introspect and override feature resolution timestamps.
By default, features are timestamped with the execution time of their resolver. However, you can override this behavior by providing timestamps from your data source This functionality can be helpful when working with an event store or timestamped API.
The first step is to
add a FeatureTime
attribute to your feature set.
By default, if you have a feature named ts
with a type
of datetime.datetime
, it will be treated as a feature time.
from chalk.features import features, FeatureTime
@features
class User:
ts: datetime
However, if you wish to use a different name, you can use
the FeatureTime
annotation:
from chalk.features import features, FeatureTime
@features
class User:
timestamp: FeatureTime
The class FeatureTime
is a datetime.datetime
with a typing.Annotation
applied.
Using this new feature, you can now introspect the time features were created in resolvers, and return feature data with custom timestamps.
To pull the feature time, require the ts
field from the user in the arguments of your function.
This value will be set to the most-recent value
of all arguments to your function.
@offline
def fn(name: User.name, ts: User.ts) -> ...:
You can also provide the time that a set of features
was created by supplying a value for the ts
field
in your resolver.
Building on the above example:
@offline
def fn(...) -> Features[User.name, User.ts]:
return User(
name="Maryam Mirzakhani",
ts=datetime(2014, 8, 12)
)
Features with overriden observation timestamps are treated specially when inserted into the online store. In particular, Chalk will always check for existing “newer” feature values in the online store before inserting historically dated feature values. This means that you can safely ingest large quantities of backdated features without accidentally ingesting stale data into the online store.
Additionally, once features are inserted into the online store, Chalk tracks the source observation timestamps when these feature values are returned as part of online queries. Chalk uses these source timestamps to compute the “feature staleness” metric. Staleness in this context is defined as “query time - observation time”.
Features with overriden observation timestamps are inserted into the offline store with the timestamp that you specify. The observation timestamp works like an “effective as of” timestamp, so if you insert something like this:
| id | feature | value | timestamp |
|---------------------------------------------|
| 1 | age | 7 | 2022-02-01T00:00:00Z |
into an offline store that already contained these observations:
| id | feature | value | timestamp |
|---------------------------------------------|
| 1 | age | 6 | 2022-01-01T00:00:00Z |
| 1 | age | 8 | 2022-03-01T00:00:00Z |
then the observation will be interleaved “in between” the existing observations, and you would see the following query results:
id, age, <= 2022-02-01
output: 7
id, age, <= 2022-03-02
output: 8
id, age, <= 2022-01-02
output: 6
Chalk supports resolvers that are explicitly time-dependent. This is useful for performing backfills which
compute values that depend on values that are semantically similar to datetime.now()
.
You can express time-dependency by declaring a dependency on a special feature called Now
:
@online
from chalk.features import Now
def get_age_in_years(birthday: User.birthday, now: Now) -> User.age_in_years:
return (now - birthday).years
In online query, (i.e. with ChalkClient().query
), Now
is datetime.now()
. In offline query contexts,
now
will be set to the appropriate input_time
value for the calculation. This allows you to backfill
a feature for a single entity at many different historical time points:
ChalkClient().offline_query(input={User.id: [1,1,1]}, output=[User.age_in_years], input_times=[
datetime.now() - timedelta(days=365*10),
datetime.now() - timedelta(days=365),
datetime.now() - timedelta(days=0),
])
## output:
# | id | age_in_years |
# | 1 | <age> - 10 |
# | 1 | <age> - 1 |
# | 1 | <age> - 0 |
Now
can be used in DataFrame resolvers as well in order to compute bulk values:
@online
def batch_get_age_in_years(df: DataFrame[User.id, User.birthday, Now]) -> DataFrame[User.id, User.age_in_years]:
return (
df.to_polars()
.select(
pl.col(User.id),
pl.col(str(User.birthday) - pl.col(str(Now))).alias(str(User.age_in_years))
)
)