Chalk home page
Docs
SDK
CLI
  1. Features
  2. Windowed Aggregations

Create a data aggregation over a time range using a Windowed Feature. For example, use windowed features to count the number of login attempts made by a user over the past 10 minutes, or to track the largest purchase amount a cardholder has made in the past 30 days.


Note: Chalk has first-class support for materializing aggregations, see the docs!

Set up a data aggregation using windowed features

Here is an example of a windowed feature representing the number of failed logins in the last 10 minutes, 30 minutes, and 1 day:

from chalk import Windowed, windowed
from chalk.features import DataFrame, _
import datetime as dt

@features
class LoginAttempt:
    id: int
    user: "User.id"
    status: str
    ts: dt.datetime

@features
class User:
    id: int
    login_attempts: DataFrame[LoginAttempt]
    num_failed_logins: Windowed[int] = windowed(
        "10m", "30m", "1d",
        max_staleness="10m",
        expression=_.login_attempts[_.status=="failed", _.ts > _.chalk_window].count(),
        default=0,
    )

Windowed features support much of the same functionality as normal features. They are most often used alongside max_staleness and etl_offline_to_online.

Windowed features are typically resolved, either:

Querying by windowed time range

Windowed features resolved via expressions can reference the current windowed time range using the _.chalk_window operator:

@features
class Transaction:
    id: int
    user_id: "User.id"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    total_spend: Windowed[float] = windowed(
        "30d", "60d", "90d",
        default=0,
        expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
        materialization={"bucket_duration": "1d"},
    )

In this code, the windowed feature will return the sum of transaction amounts for the given user over the last 30, 60, and 90 days. _.ts > _.chalk_window is a boolean condition checking that the current transaction’s timestamp is greater than the start of the current window duration.

Nested windowed feature references

Windowed features can also be referenced by other windowed features in the same feature class using expressions and the _.chalk_window operator. For example, we can compute the average transaction amount over different time windows:

@features
class Transaction:
    id: int
    user_id: "User.id"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    sum_transactions: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
    )
    count_transactions: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.transactions[_.ts > _.chalk_window].count(),
    )
    avg_transaction_amount: Windowed[float] = windowed(
        "30d", "60d", "90d",
        expression=_.sum_transactions[_.chalk_window] / _.count_transactions[_.chalk_window],
    )

Referencing windowed features

A windowed feature can be referenced in a query or a resolver in the following, equivalent ways. Each column below shows the possible syntax variants for a given time window.

# Note: The last value for each list is the time converted to seconds
User.num_failed_logins("10m")    User.num_failed_logins("1d")       User.num_failed_logins("1h30m")
User.num_failed_logins["10m"]    User.num_failed_logins["1d"]       User.num_failed_logins["1h30m"]
User.num_failed_logins_10m       User.num_failed_logins_1d          User.num_failed_logins_1h30m
User.num_failed_logins__10m__    User.num_failed_logins__1d__       User.num_failed_logins__1h30m__
User.num_failed_logins__600__    User.num_failed_logins__86400__    User.num_failed_logins__5400__

Windowed features can be inputs to resolvers:

@online
def account_under_attack(
    failed_logins_30m: User.num_failed_logins('30m'),
    failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
    return failed_logins_30m > 10 or failed_logins_1d > 100

Grouped window aggregations

Similar to SQL GROUP BY clauses, you can group your windowed feature by one or more other features with group_by_windowed.

Example

In this example, our goal is to create a feature representing a given user’s historical spend at a specific merchant. This feature can be used as input into our model to determine whether a given transaction is fraudulent.

First, we define a Transaction FeatureSet with a has-one relationship to the User FeatureSet. Within the User FeatureSet, we track a grouped windowed feature representing the user’s total spend at each merchant over the past 30 days and 90 days.

from chalk.features import DataFrame, features, _
from chalk.streams import group_by_windowed

@features
class Transaction:
    id: int
    merchant_id: str
    user_id: "User.id"
    user: "User"
    amount: float

@features
class User:
    id: int
    transactions: DataFrame[Transaction]
    spend_by_merchant: DataFrame = group_by_windowed(
        "30d",
        "90d",
        expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
    )

In this example, expression groups all the given user’s transactions by each transaction’s merchant_id value, then aggregates each group by the sum of the transactions’ amount values.

The User class uses materialization to aggregate associated transaction data as a performance optimization.

Finally, to query for spend at a specific merchant, we can access one of the groups:

from chalk.features import features, _, DataFrame
from chalk.streams import group_by_windowed, Windowed, windowed

@features
class User:
  id: int
  transactions: DataFrame[Transaction]
  most_common_merchant_id: str
  spend_by_merchant: DataFrame = group_by_windowed(
      "30d",
      "90d",
      expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
  )
  spend_at_most_common_merchant: Windowed[float] = windowed(
      "30d",
      "90d",
      expression=_.spend_by_merchant.group(merchant_id=_.most_common_merchant_id)
  )

Windowed streaming

To learn more about using windowed features with streaming data sources, see our documentation on windowed streaming.