Aggregations
Define features as aggregations of data over sliding time ranges.
Given a has-many relationship between two feature classes, Chalk can compute aggregations over the joined data either
for all possible values or filtered by specific windows of time. Create an Aggregation Feature using a
Chalk Expression or a Windowed Aggregation Feature using the windowed
function.
For example, use windowed features to count the number of login attempts made by a user over the past 10 minutes, or
to track the largest purchase amount a cardholder has made in the past 30 days.
Chalk supports a number of aggregations out of box. Chalk builtins are very performant as they’re optimized and also
run natively in C++ and Rust.
Aggregations automatically skip null or None
values.
The following table lists the supported aggregations along with some notes.
Function | Notes |
---|---|
sum | |
min | |
max | |
mean | Feature type must be float or float | None . None values are skipped, meaning they are not included in the mean calculation. |
count | |
std | Standard deviation. Requires at least 2 values. |
var | Variance. Same requirements as std . |
approx_count_distinct | An approximation of the cardinality of non-null data. |
For example, say you want to compute some aggregations over a document’s revisions. You could define the features and aggregations below.
from chalk.features import features, DataFrame, _
from datetime import datetime
@features
class Document:
id: int
revisions: DataFrame[Revision]
# these are aggregations over the revisions DataFrame
num_revisions: int = _.revisions[_.id].count()
max_revision_size: float = _.revisions[_.size_bytes].max()
# you can also filter the rows being aggregated
earliest_large_revision_ts: datetime = _.revisions[_.size_bytes > 100,000,000, _.timestamp].min()
@features
class Revision:
id: int
# this is a foreign key join between Document and Revision
document_id: Document.id
size_bytes: float
timestamp: datetime
In an aggregation, you can filter and sort by datetime features. Chalk enables you to define Windowed Aggregations, such that you can compute the same aggregation over multiple time windows in a single feature definition. Windowed features are typically computed using either raw data or pre-aggregated data. For larger datasets, some systems may pre-aggregate batch data to optimize performance at the cost of real-time accuracy. Chalk supports both modes of operation to achieve low latency and accuracy for real time aggregations over large datasets through materialized windowed aggregations.
Otherwise, for smaller datasets or lower throughput use cases, Chalk can compute windowed aggregations directly on the raw data.
Here is an example of a windowed
feature representing the number of
failed logins in the last 10 minutes, 30 minutes, and 1 day:
from chalk import Windowed, windowed
from chalk.features import DataFrame, _
from datetime import datetime
@features
class LoginAttempt:
id: int
user: "User.id"
status: str
at: datetime
@features
class User:
id: int
login_attempts: DataFrame[LoginAttempt]
num_failed_logins: Windowed[int] = windowed(
"10m", "30m", "1d",
max_staleness="10m",
expression=_.login_attempts[
_.status=="failed",
_.at > _.chalk_window
].count(),
default=0,
)
Windowed features support much of the same functionality as normal features. They are most often used alongside
max_staleness
and
etl_offline_to_online
.
Windowed features are typically resolved, either:
Windowed features resolved via expressions can reference the current windowed time range
using the _.chalk_window
operator:
from chalk import Windowed, windowed
from chalk.features import DataFrame, _
from datetime import datetime
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
at: datetime
@features
class User:
id: int
transactions: DataFrame[Transaction]
total_spend: Windowed[float] = windowed(
"30d", "60d", "90d",
default=0,
expression=_.transactions[
_.amount,
_.at > _.chalk_window
].sum(),
materialization={"bucket_duration": "1d"},
)
In this code, the windowed feature will return the sum of transaction amounts for the given user over the last 30, 60,
and 90 days. _.ts > _.chalk_window
is a boolean condition checking that the current transaction’s timestamp is greater
than the start of the current window duration.
Windowed features can also be referenced by other windowed features in the same feature class using
expressions and the _.chalk_window
operator. For example, we can compute the average
transaction amount over different time windows:
@features
class Transaction:
id: int
user_id: "User.id"
amount: float
at: datetime
@features
class User:
id: int
transactions: DataFrame[Transaction]
sum_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[
_.amount,
_.at > _.chalk_window
].sum(),
)
count_transactions: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=_.transactions[
_.ts > _.chalk_window
].count(),
)
avg_transaction_amount: Windowed[float] = windowed(
"30d", "60d", "90d",
expression=(
_.sum_transactions[_.chalk_window] /
_.count_transactions[_.chalk_window]
)
)
A windowed feature can be referenced in a query or a resolver in Python using the window as defined in the feature definition, but Chalk will convert every windowed feature to a FQN (fully qualified name) that defines the window duration in seconds. So, when referencing the feature for a resolver, you can use the Python syntax. However, when referencing a feature in a query, you can use the Python syntax when querying with the Chalk Python SDK, otherwise the CLI will require the FQN syntax. If you do not specify a window when referencing a windowed feature, Chalk will return all windows.
Given the feature definition below, the following showcase the ways to reference the respective windows in either Python syntax or using the FQN.
from chalk.features import features
from chalk.streams import windowed, Windowed
@features
class User:
id: int
...
num_failed_logins: Windowed[int] = windowed(
"10m",
"1h30m",
"1d",
expression=...,
)
Feature Definition Window | Python Syntax | Fully Qualified Name |
---|---|---|
“10m” | User.num_failed_logins[“10m”] | User.num_failed_logins__600__ |
“1h30m” | User.num_failed_logins[“1h30m”] | User.num_failed_logins__5400__ |
“1d” | User.num_failed_logins[“1d”] | User.num_failed_logins__86400__ |
Windowed features can be inputs to resolvers:
@online
def account_under_attack(
failed_logins_30m: User.num_failed_logins["30m"],
failed_logins_1d: User.num_failed_logins["1d"]
) -> ...:
return failed_logins_30m > 10 or failed_logins_1d > 100