Features
Define features as aggregations of data over sliding time ranges.
Windowed features are features defined over time ranges. For example, you can use windowed features to count the number of login attempts made by a user over the past 10 minutes, or to track the largest purchase amount a cardholder has made in the past 30 days.
Here is an example of a windowed
feature representing the number of
failed logins in the last 10 minutes, 30 minutes, and 1 day:
@features
class User:
id: int
num_failed_logins: Windowed[int] = windowed(
"10m", "30m", "1d",
max_staleness="10m",
default=0,
)
Windowed features support much of the same functionality as normal features. They are most often used alongside
max_staleness
and
etl_offline_to_online
to allow the features to be sent to online store and
offline store after each window period. Windowed features often use default
to set a
default value to return when there are no messages within a time period.
Windowed features resolved via underscore expressions can reference the current windowed time range
using the _.chalk_window
operator:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_spend: Windowed[float] = windowed(
"30d", "60d", "90d",
default=0,
expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
materialization={"bucket_duration": "1d"},
)
In this code, the windowed feature will return the sum of transaction amounts for the given user over the last 30, 60,
and 90 days. _.ts > _.chalk_window
is a boolean condition checking that the current transaction’s timestamp is greater
than the start of the current window duration.
Windowed features can also be referenced by other windowed features in the same feature class using
underscore expressions and the _.chalk_window
operator. For example, we can compute the average
transaction amount over different time windows:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
sum_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
)
count_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[_.ts > _.chalk_window].count(),
)
avg_transaction_amount: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.sum_transactions[_.chalk_window] / _.count_transactions[_.chalk_window],
)
A windowed feature can be referenced in a query or a resolver in the following, equivalent ways. Each column below shows the possible syntax variants for a given time window.
# Note: The last value for each list is the time converted to seconds
User.num_failed_logins("10m") User.num_failed_logins("1d") User.num_failed_logins("1h30m")
User.num_failed_logins["10m"] User.num_failed_logins["1d"] User.num_failed_logins["1h30m"]
User.num_failed_logins_10m User.num_failed_logins_1d User.num_failed_logins_1h30m
User.num_failed_logins__10m__ User.num_failed_logins__1d__ User.num_failed_logins__1h30m__
User.num_failed_logins__600__ User.num_failed_logins__86400__ User.num_failed_logins__5400__
Windowed features can be inputs to resolvers:
@online
def account_under_attack(
failed_logins_30m: User.num_failed_logins('30m'),
failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
return failed_logins_30m > 10 or failed_logins_1d > 100
Windowed features are typically computed using either raw data or pre-aggregated data. Raw data has the most accuracy, but can be slow if you request longer time windows or large volumes of data. Some systems improve performance by serving features from pre-aggregated batch data. Pre-aggregated data mitigates the performance issue by reducing the number of data points needed, but prevents your application from accessing the newest data entering your system.
Chalk balances accuracy and performance by combining both approaches. We aggregate historical data while continuously
updating as new data arrives. To have Chalk aggregate your data, pass
materialization
to your windowed feature and use bucket_duration
to set the size
of each aggregated time window:
@features
class User:
id: int
transactions: DataFrame[transactions]
total_transaction_amount: Windowed[int] = windowed(
"10d",
"90d",
materialization={"bucket_duration": "1d"},
expression=_.transactions[_.amount].sum(),
)
Because this code shows a bucket_duration
of 1 day, Chalk will aggregate transaction data into 1 day buckets of
timeseries data. Chalk will then use this timeseries data to serve total_transaction_amount
for the past 30 days and
90 days. As new data arrives, the relevant timeseries buckets are modified to include the data. Over time, the oldest
buckets are removed from your online store once they cannot be used by any time window.
The number of buckets is determined by your longest time window divided by your bucket duration. For example, if your time window is 90 days and your bucket duration is 1 day, you would have 90 buckets. If your bucket duration is set to 1 minute, you would instead have 129,600 buckets.
Buckets are aligned starting from the Unix epoch, ignoring leap seconds. To serve windowed feature queries, Chalk uses all buckets containing any overlap with the requested time window.
Use chalk aggregate backfill
to backfill aggregation for a windowed feature. This command
is useful if you change your feature’s time windows or bucket_duration
values.
To view existing aggregations, use chalk aggregate list
.
Each of these commands outputs a table of your aggregations:
Series Namespace Group Agg Bucket Retention Aggregation Dependent Features
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
1 transaction user_id merchant_id amount 1d 30d sum user.txn_sum_by_merchant merchant.txn_sum_by_user
1 transaction user_id merchant_id amount 1d 30d count user.txn_count_by_merchant
2 transaction user_id amount 1d 30d sum user.txn_sum
The series column shows the unique ID of the timeseries data underlying our aggregation system. Each unique combination
of namespace, group (see group_by_windowed
), and agg (the feature to aggregate) columns
represents a separate timeseries. When possible, Chalk will use the same timeseries data to serve multiple features.
Bucket shows the current bucket size. Retention shows the maximum time window of any feature that depends on the given timeseries. Dependent features list the features that are served by the given timeseries.
Rather than streaming new data to keep the timeseries buckets up to date, you may instead opt to use continuous backfills. First, choose a schedule to determine when and how frequently backfills should occur. In the following example, the total_transaction_amount
features will be backfilled every day.
@features
class User:
id: int
transactions: DataFrame[transactions]
total_transaction_amount: Windowed[int] = windowed(
"10d",
"90d",
materialization={
"bucket_duration": "1d",
"backfill_schedule": "0 0 * * *",
"continuous_buffer_duration": "36h",
},
expression=_.transactions[_.amount].sum(),
)
To ensure that the computed aggregate uses the most recent data, you can set a continuous_buffer_duration
. This tells Chalk to include the past 36 hours of data as computed directly from your online resolvers. The rest of the window is computed from the pre-aggregated buckets which are backfilled every day.
Similar to SQL GROUP BY
clauses, you can group your windowed feature by one or more other features with
group_by_windowed
.
In this example, our goal is to create a feature representing a given user’s historical spend at a specific merchant. This feature can be used as input into our model to determine whether a given transaction is fraudulent.
First, we define a Transaction
FeatureSet with a has-one relationship to the User
FeatureSet. Within the User
FeatureSet, we track a grouped windowed feature representing the user’s total spend at each merchant over the past
30 days and 90 days.
from chalk.features import DataFrame, features, _
from chalk.streams import group_by_windowed
@features
class Transaction:
id: int
merchant_id: str
user_id: "User.id"
user: "User"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
spend_by_merchant: DataFrame = group_by_windowed(
"30d",
"90d",
materialization={"bucket_duration": "1d"},
expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
)
In this example, expression
groups all the given user’s transactions
by each transaction’s merchant_id
value,
then aggregates each group by the sum of the transactions’ amount
values.
The User
class uses materialization to aggregate
associated transaction data as a performance optimization.
Finally, to query for spend at a specific merchant, we can access one of the groups:
from chalk.features import features, _, DataFrame
from chalk.streams import group_by_windowed, Windowed, windowed
@features
class User:
id: int
transactions: DataFrame[Transaction]
most_common_merchant_id: str
spend_by_merchant: DataFrame = group_by_windowed(
"30d",
"90d",
materialization={"bucket_duration": "1d"},
expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
)
spend_at_most_common_merchant: Windowed[float] = windowed(
"30d",
"90d",
expression=_.spend_by_merchant.group(merchant_id=_.most_common_merchant_id)
)
To learn more about using windowed features with streaming data sources, see our documentation on windowed streaming.