Features
Define features as aggregations of data over sliding time ranges.
Create a data aggregation over a time range using a Windowed Feature. For example, use windowed features to count the number of login attempts made by a user over the past 10 minutes, or to track the largest purchase amount a cardholder has made in the past 30 days.
Note: Chalk has first-class support for materializing aggregations, see the docs!
Here is an example of a windowed
feature representing the number of
failed logins in the last 10 minutes, 30 minutes, and 1 day:
from chalk import Windowed, windowed
from chalk.features import DataFrame, _
import datetime as dt
@features
class LoginAttempt:
id: int
user: "User.id"
status: str
ts: dt.datetime
@features
class User:
id: int
login_attempts: DataFrame[LoginAttempt]
num_failed_logins: Windowed[int] = windowed(
"10m", "30m", "1d",
max_staleness="10m",
expression=_.login_attempts[_.status=="failed", _.ts > _.chalk_window].count(),
default=0,
)
Windowed features support much of the same functionality as normal features. They are most often used alongside
max_staleness
and
etl_offline_to_online
.
Windowed features are typically resolved, either:
Windowed features resolved via expressions can reference the current windowed time range
using the _.chalk_window
operator:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_spend: Windowed[float] = windowed(
"30d", "60d", "90d",
default=0,
expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
materialization={"bucket_duration": "1d"},
)
In this code, the windowed feature will return the sum of transaction amounts for the given user over the last 30, 60,
and 90 days. _.ts > _.chalk_window
is a boolean condition checking that the current transaction’s timestamp is greater
than the start of the current window duration.
Windowed features can also be referenced by other windowed features in the same feature class using
expressions and the _.chalk_window
operator. For example, we can compute the average
transaction amount over different time windows:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
sum_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[_.amount, _.ts > _.chalk_window].sum(),
)
count_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[_.ts > _.chalk_window].count(),
)
avg_transaction_amount: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.sum_transactions[_.chalk_window] / _.count_transactions[_.chalk_window],
)
A windowed feature can be referenced in a query or a resolver in the following, equivalent ways. Each column below shows the possible syntax variants for a given time window.
# Note: The last value for each list is the time converted to seconds
User.num_failed_logins("10m") User.num_failed_logins("1d") User.num_failed_logins("1h30m")
User.num_failed_logins["10m"] User.num_failed_logins["1d"] User.num_failed_logins["1h30m"]
User.num_failed_logins_10m User.num_failed_logins_1d User.num_failed_logins_1h30m
User.num_failed_logins__10m__ User.num_failed_logins__1d__ User.num_failed_logins__1h30m__
User.num_failed_logins__600__ User.num_failed_logins__86400__ User.num_failed_logins__5400__
Windowed features can be inputs to resolvers:
@online
def account_under_attack(
failed_logins_30m: User.num_failed_logins('30m'),
failed_logins_1d: User.num_failed_logins('1d')
) -> ...:
return failed_logins_30m > 10 or failed_logins_1d > 100
Similar to SQL GROUP BY
clauses, you can group your windowed feature by one or more other features with
group_by_windowed
.
In this example, our goal is to create a feature representing a given user’s historical spend at a specific merchant. This feature can be used as input into our model to determine whether a given transaction is fraudulent.
First, we define a Transaction
FeatureSet with a has-one relationship to the User
FeatureSet. Within the User
FeatureSet, we track a grouped windowed feature representing the user’s total spend at each merchant over the past
30 days and 90 days.
from chalk.features import DataFrame, features, _
from chalk.streams import group_by_windowed
@features
class Transaction:
id: int
merchant_id: str
user_id: "User.id"
user: "User"
amount: float
@features
class User:
id: int
transactions: DataFrame[Transaction]
spend_by_merchant: DataFrame = group_by_windowed(
"30d",
"90d",
expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
)
In this example, expression
groups all the given user’s transactions
by each transaction’s merchant_id
value,
then aggregates each group by the sum of the transactions’ amount
values.
The User
class uses materialization to aggregate
associated transaction data as a performance optimization.
Finally, to query for spend at a specific merchant, we can access one of the groups:
from chalk.features import features, _, DataFrame
from chalk.streams import group_by_windowed, Windowed, windowed
@features
class User:
id: int
transactions: DataFrame[Transaction]
most_common_merchant_id: str
spend_by_merchant: DataFrame = group_by_windowed(
"30d",
"90d",
expression=_.transactions.group_by(_.merchant_id).agg(_.amount.sum()),
)
spend_at_most_common_merchant: Windowed[float] = windowed(
"30d",
"90d",
expression=_.spend_by_merchant.group(merchant_id=_.most_common_merchant_id)
)
To learn more about using windowed features with streaming data sources, see our documentation on windowed streaming.