Chalk home page
Docs
API
CLI
  1. Features
  2. Windowed

Windowed features are features defined over time ranges. For example, you can use windowed features to count the number of login attempts made by a user over the past 10 minutes, or to track the largest purchase amount a cardholder has made in the past 30 days.


Feature definition

Here is an example of a windowed feature representing the number of failed logins in the last 10 minutes, 30 minutes, and 1 day:

@features
class User:
    id: int
    num_failed_logins: Windowed[int] = windowed(
        "10m", "30m", "1d",
        max_staleness="10m",
        default=0,
    )

Windowed features support much of the same functionality as normal features. They are most often used alongside max_staleness and etl_offline_to_online to allow the features to be sent to online store and offline store after each window period. Windowed features often use default to set a default value to return when there are no messages within a time period.

Querying by windowed time range

Windowed features resolved via underscore expressions can reference the current windowed time range using the _.chalk_window operator:

@features
class Transaction:
    id: int
    user_id: "User.id"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    total_spend: Windowed[float] = windowed(
        "30d", "60d", "90d",
        default=0,
        expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
        materialization={"bucket_duration": "1d"},
    )

In this code, the windowed feature will return the sum of transaction amounts for the given user over the last 30, 60, and 90 days. _.ts > _.chalk_window is a boolean condition checking that the current transaction’s timestamp is greater than the start of the current window duration.

Nested windowed feature references

Windowed features can also be referenced by other windowed features in the same feature class using underscore expressions and the _.chalk_window operator. For example, we can compute the average transaction amount over different time windows:

@features
class Transaction:
    id: int
    user_id: "User.id"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    sum_transactions: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
    )
    count_transactions: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.transactions[_.ts > _.chalk_window].count(),
    )
    avg_transaction_amount: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.sum_transactions[_.chalk_window] / _.count_transactions[_.chalk_window],
    )

Referencing windowed features

A windowed feature can be referenced in a query or a resolver in the following, equivalent ways. Each column below shows the possible syntax variants for a given time window.

# Note: The last value for each list is the time converted to seconds
User.num_failed_logins("10m")    User.num_failed_logins("1d")       User.num_failed_logins("1h30m")
User.num_failed_logins["10m"]    User.num_failed_logins["1d"]       User.num_failed_logins["1h30m"]
User.num_failed_logins_10m       User.num_failed_logins_1d          User.num_failed_logins_1h30m
User.num_failed_logins__10m__    User.num_failed_logins__1d__       User.num_failed_logins__1h30m__
User.num_failed_logins__600__    User.num_failed_logins__86400__    User.num_failed_logins__5400__

Windowed features can be inputs to resolvers:

@online
def account_under_attack(
    failed_logins_30m: User.num_failed_logins('30m'),
    failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
    return failed_logins_30m > 10 or failed_logins_1d > 100

Materialized window aggregations

Windowed features are typically computed using either raw data or pre-aggregated data. Raw data has the most accuracy, but can be slow if you request longer time windows or large volumes of data. Some systems improve performance by serving features from pre-aggregated batch data. Pre-aggregated data mitigates the performance issue by reducing the number of data points needed, but prevents your application from accessing the newest data entering your system.

Chalk balances accuracy and performance by combining both approaches. We aggregate historical data while continuously updating as new data arrives. To have Chalk aggregate your data, pass materialization to your windowed feature and use bucket_duration to set the size of each aggregated time window:

@features
class User:
    id: int
    transactions: DataFrame[transactions]
    total_transaction_amount: Windowed[int] = windowed(
        "10d",
        "90d",
        materialization={"bucket_duration": "1d"},
        expression=_.transactions[_.amount].sum(),
    )

Because this code shows a bucket_duration of 1 day, Chalk will aggregate transaction data into 1 day buckets of timeseries data. Chalk will then use this timeseries data to serve total_transaction_amount for the past 30 days and 90 days. As new data arrives, the relevant timeseries buckets are modified to include the data. Over time, the oldest buckets are removed from your online store once they cannot be used by any time window.

The number of buckets is determined by your longest time window divided by your bucket duration. For example, if your time window is 90 days and your bucket duration is 1 day, you would have 90 buckets. If your bucket duration is set to 1 minute, you would instead have 129,600 buckets.

Buckets are aligned starting from the Unix epoch, ignoring leap seconds. To serve windowed feature queries, Chalk uses all buckets containing any overlap with the requested time window.

Managing aggregations

Use chalk aggregate backfill to backfill aggregation for a windowed feature. This command is useful if you change your feature’s time windows or bucket_duration values.

To view existing aggregations, use chalk aggregate list.

Each of these commands outputs a table of your aggregations:

 Series  Namespace    Group                Agg     Bucket  Retention  Aggregation  Dependent Features
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 1       transaction  user_id merchant_id  amount  1d      30d        sum          user.txn_sum_by_merchant merchant.txn_sum_by_user
 1       transaction  user_id merchant_id  amount  1d      30d        count        user.txn_count_by_merchant
 2       transaction  user_id              amount  1d      30d        sum          user.txn_sum

The series column shows the unique ID of the timeseries data underlying our aggregation system. Each unique combination of namespace, group (see group_by_windowed), and agg (the feature to aggregate) columns represents a separate timeseries. When possible, Chalk will use the same timeseries data to serve multiple features.

Bucket shows the current bucket size. Retention shows the maximum time window of any feature that depends on the given timeseries. Dependent features list the features that are served by the given timeseries.

Continuous Backfills

Rather than streaming new data to keep the timeseries buckets up to date, you may instead opt to use continuous backfills. First, choose a schedule to determine when and how frequently backfills should occur. In the following example, the total_transaction_amount features will be backfilled every day.

@features
class User:
    id: int
    transactions: DataFrame[transactions]
    total_transaction_amount: Windowed[int] = windowed(
        "10d",
        "90d",
        materialization={
            "bucket_duration": "1d",
            "backfill_schedule": "0 0 * * *",
            "continuous_buffer_duration": "36h",
        },
        expression=_.transactions[_.amount].sum(),
    )

To ensure that the computed aggregate uses the most recent data, you can set a continuous_buffer_duration. This tells Chalk to include the past 36 hours of data as computed directly from your online resolvers. The rest of the window is computed from the pre-aggregated buckets which are backfilled every day.


Grouped materialized window aggregations

Similar to SQL GROUP BY clauses, you can group your windowed feature by one or more other features with group_by_windowed.

Example

In this example, our goal is to create a feature representing a given user’s historical spend at a specific merchant. This feature can be used as input into our model to determine whether a given transaction is fraudulent.

First, we define a Transaction FeatureSet with a has-one relationship to the User FeatureSet. Within the User FeatureSet, we track a grouped windowed feature representing the user’s total spend at each merchant over the past 30 days and 90 days.

from chalk.features import DataFrame, features, _
from chalk.streams import group_by_windowed

@features
class Transaction:
    id: int
    merchant_id: str
    user_id: "User.id"
    user: "User"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    spend_by_merchant: DataFrame = group_by_windowed(
        "30d",
        "90d",
        materialization={"bucket_duration": "1d"},
        expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
    )

In this example, expression groups all the given user’s transactions by each transaction’s merchant_id value, then aggregates each group by the sum of the transactions’ amount values.

The User class uses materialization to aggregate associated transaction data as a performance optimization.

Finally, to query for spend at a specific merchant, we can access one of the groups:

from chalk.features import features, _, DataFrame
from chalk.streams import group_by_windowed, Windowed, windowed

@features
class User:
  id: int
  transactions: DataFrame[Transaction]
  most_common_merchant_id: str
  spend_by_merchant: DataFrame = group_by_windowed(
      "30d",
      "90d",
      materialization={"bucket_duration": "1d"},
      expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
  )
  spend_at_most_common_merchant: Windowed[float] = windowed(
      "30d",
      "90d",
      expression=_.spend_by_merchant.group(merchant_id=_.most_common_merchant_id)
  )

Windowed streaming

To learn more about using windowed features with streaming data sources, see our documentation on windowed streaming.