Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:
@features
class User:
id: int
name: str
email: str
credit_card: CreditCard = has_one(
lambda: CreditCard.user_id == User.id
)
logins: DataFrame[Login] = has_many(
lambda: Login.user_id == User.id
)
Features can be nested, as with credit_card
and logins
above.
In this section, we'll dive into API components that make up the
building blocks of features.
The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Setting etl_offline_to_online
on a feature class assigns it to all features on the
class which do not explicitly specify etl_offline_to_online
.
When a feature is expensive or slow to compute, you may wish to cache its value.
Chalk uses the terminology "maximum staleness" to describe how recently a feature
value needs to have been computed to be returned without re-running a resolver.
Assigning a max_staleness
to the feature class assigns it to all features on the
class which do not explicitly specify a max_staleness
value of their own.
@features(
owner="andy@chalk.ai",
max_staleness="30m",
etl_offline_to_online=True,
tags="user-group",
)
class User:
id: str
# Comments here appear in the web!
# :tags: pii
name: str | None
# :owner: userteam@mycompany.com
location: LatLng
Add metadata and configuration to a feature.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags
The maximum version for a feature. Versioned features can be
referred to with the @
operator:
@features
class User:
id: str
score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"
See more at Versioning
The default version for a feature. When you reference a
versioned feature without the @
operator, you reference
the default_version
. Set to 1
by default.
@features
class User:
id: str
score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"
See more at Default versions
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Read more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of validations to apply to this feature. Generally, max
, min
, max_length
,
and min_length
are more convenient, but the parameter strict
applies to all
of those parameters. If you want to mix strict and non-strict validations, you can
use this parameter.
The default value of the feature if it otherwise can't be computed.
If you don't need to specify other metadata, you can also assign a default
in the same way you would assign a default to a dataclass
:
from chalk.features import features
@features
class User:
num_purchases: int = 0
The type of the input feature, given by _TRich
.
from chalk.features import Primary
@features
class User:
uid: Primary[int]
# Uses a default value of 0 when one cannot be computed.
num_purchases: int = 0
# Description of the name feature.
# :owner: fraud@company.com
# :tags: fraud, credit
name: str = feature(
max_staleness="10m",
etl_offline_to_online=True
)
score = feature(
version=2, default_version=2
)
Specify a feature that represents a one-to-many relationship.
The join condition between @features
classes.
This argument is callable to allow for forward
references to members of this class and the joined
class.
from chalk.features import DataFrame, features
@features
class Card
id: str
balance: float
@features
class User
id: str
cards: DataFrame[Card] = has_many(
lambda: User.id == Card.user_id
)
The function after
can be used with DataFrame
to compute windowed features.
After filters a DataFrame
relative to the current time in context.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to after
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]
The function before
can be used with DataFrame
to compute windowed features.
Before filters a DataFrame
relative to the current time in context.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to before
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]
Marks a feature as the primary feature for a feature class.
Features named id
on feature classes without an explicit primary
feature are declared primary keys by default, and don't need to be
marked with Primary
.
If you have primary key feature with a name other than id
,
you can use this marker to indicate the primary key.
@features
class User:
uid: Primary[int]
Specify explicit data validation for a feature.
The feature()
function can also specify these validations,
but this class allows you to specify both strict and non-strict
validations at the same time.
from chalk.features import features, feature
@features
class User:
fico_score: int = feature(
validations=[
Validation(min=300, max=850, strict=True),
Validation(min=300, max=320, strict=False),
Validation(min=840, max=850, strict=False),
]
)
# If only one set of validations were needed,
# you can use the [`feature`](#feature) function instead:
first_name: str = feature(
min_length=2, max_length=64, strict=True
)
Decorator to create an online resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Like tags, when
can filter when a resolver is eligible
to run. Unlike tags, when
can use feature values,
so that you can write resolvers like:
@online(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
...
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@online
def name_match(
name: User.full_name,
account_name: User.bank_account.title
) -> User.account_name_match_score:
if name.lower() == account_name.lower():
return 1.
return 0.
Decorator to create an offline resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Like tags, when
can filter when a resolver
is eligible to run. Unlike tags, when
can use feature values,
so that you can write resolvers like::
@offline(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
...
Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@offline(cron="1h")
def get_fraud_score(
email: User.email,
name: User.name,
) -> User.fraud_score:
return socure.get_sigma_score(email, name)
Run an online or offline resolver on a schedule.
This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.
The period of the cron job. Can be either a crontab ("0 * * * *"
)
or a Duration
("2h"
).
Optionally, a function to filter down the arguments to consider.
See Filtering examples for more information.
Explicitly provide the sample function for the cron job.
See Custom examples for more information.
Using a filter
def fltr(v: Account.active):
return v
@online(cron=Cron(schedule="1d", filter=fltr))
def fn(balance: Account.balance) -> ...:
Using a sample function
def s() -> DataFrame[User.id]:
return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others
Read more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The arguments to pass to the decorated function.
If one of the arguments is a DataFrame
with a
filter or projection applied, the resolver will
only be called with the filtered or projected
data. Read more at
https://docs.chalk.ai/docs/unit-tests#data-frame-inputs
The result of calling the decorated function
with args
. Useful for unit-testing.
Read more at Unit Tests
@online
def get_num_bedrooms(
rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
return len(rooms)
rooms = [
Room(id=1, name="bedroom"),
Room(id=2, name="kitchen"),
Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
The type of machine to use.
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
Chalk includes a DataFrame
class that models tabular data in much the same
way that pandas
does. However, there are some key differences that
allow the Chalk DataFrame
to increase type safety and performance.
Like pandas, Chalk's DataFrame
is a two-dimensional data structure with
rows and columns. You can perform operations like filtering, grouping,
and aggregating on a DataFrame
. However, there are two main differences.
DataFrame
is lazy and can be backed by multiple data sources, where a pandas.DataFrame
executes eagerly in memory.DataFrame[...]
can be used to represent a type of data with pre-defined filters.Unlike pandas, the implementation of a Chalk DataFrame
is lazy, and
can be executed against many different backend sources of data.
For example, in unit tests, a DataFrame
uses an implementation backed by polars.
But if your DataFrame
was returned from a SQL source,
filters and aggregations may be pushed down to the database for efficient
execution.
Each column of a Chalk DataFrame
typed by a Feature
type.
For example, you might have a resolver returning a DataFrame
containing user ids and names:
@features
class User:
id: int
name: str
email: str
@online
def get_users() -> DataFrame[User.id, User.name]:
return DataFrame([
User(id=1, name="Alice"),
User(id=2, name="Bob")
])
Construct a Chalk DataFrame
.
The data. Can be an existing pandas.DataFrame
,
polars.DataFrame
or polars.LazyFrame
,
a sequence of feature instances, or a dict
mapping
a feature to a sequence of values.
The strategy to use to handle missing values.
A feature value is "missing" if it is an ellipsis (...
),
or it is None
and the feature is not annotated as Optional[...]
.
The available strategies are:
'error'
: Raise a TypeError
if any missing values are found.
Do not attempt to replace missing values with the default
value for the feature.'default_or_error'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, raise a TypeError
.'default_or_allow'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, leave it as None
. This is the default strategy.'allow'
: Allow missing values to be stored in the DataFrame
.
This option may result non-nullable features being assigned
None
values.Row-wise construction
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
Column-wise construction
df = DataFrame({
User.id: [1, 2],
User.first: ["Sam", "Iris"],
User.last: ["Wu", "Xi"]
})
Construction from polars.DataFrame
import polars
df = DataFrame(polars.DataFrame({
"user.id": [1, 2],
"user.first": ["Sam", "Iris"],
"user.last": ["Wu", "Xi"]
}))
Aggregate the DataFrame
by the specified columns.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Compute a histogram with fixed width bins.
The column to compute the histogram on. If not supplied, the DataFrame
is assumed to contain a single column.
DataFrame({
Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]
Group based on a time value (date
or datetime
).
The groups are defined by a time-based window, and optionally,
other columns in the DataFrame
. The "width" of the window is
defined by the period
parameter, and the spacing between the
windows is defined by the every
parameter. Note that if the
every
parameter is smaller than the period
parameter, then
the windows will overlap, and a single row may be assigned to
multiple groups.
As an example, consider the following DataFrame
:
val: a b c d e f g h
─────────●─────────●─────────●─────────●───────▶
time: A B C D
┌─────────┐
1 │ a b │ 1: [a, b]
└────┬────┴────┐
2 ◀───▶│ b c │ 2: [b, c]
every└────┬────┴────┐
3 ◀────────▶│ c d e │ 3: [c, d, e]
period └────┬────┴────┐
4 │d e f│ 4: [d, e, f]
└────┬────┴────┐
5 │ f │ 5: [f]
└────┬────┴────┐
6 │ │
└────┬────┴────┐
7 │ g h │ 7: [g, h]
└────┬────┴────┐
8 │g h │ 8: [g, h]
└─────────┘
In the above example, the sixth time bucket is empty, and
will not be included in the resulting DataFrame
.
from chalk import DataFrame, op
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
},
).group_by_hopping(
index=User.ts,
group={User.id: User.id},
agg={User.val: op.median(User.val)},
period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts │ User.val │
╞═════════╪══════════╪══════════╡
│ 1 │ 2020-1-1 │ 3 │
├─────────┼──────────┼──────────┤
│ 3 │ 2020-1-3 │ 10 │
╰─────────┴──────────┴──────────╯
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
User.first_name, User.first
)
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
User.first_name: User.first
})
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 2, 3],
User.val: [1, 4, 10],
}
).std()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4.5826 │
╰─────────┴──────────╯
df = DataFrame({
User.a: [1, 2, 3],
User.b: [3, 2, 1],
})
df.sort(User.a)
a b
-----------
0 1 3
1 2 2
2 3 1
Get the underlying DataFrame
as a polars.LazyFrame
.
The underlying polars.LazyFrame
.
Get the underlying DataFrame
as a pyarrow.Table
.
The underlying pyarrow.Table
. This format is the canonical
representation of the data in Chalk.
Get the underlying DataFrame
as a pandas.DataFrame
.prefixed
Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name
, if
if prefixed=False, name
)
The data formatted as a pandas.DataFrame
.
Operations for aggregations in DataFrame
.
The class methods on this class are used to create
aggregations for use in DataFrame.group_by
.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
User.active: [True, True, False],
}
).group_by(
group={User.id: User.id}
agg={
User.val: op.product(User.val),
User.active: op.product(User.active),
}
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
│ 1 │ 2 │ 1 │
├─────────┼──────────┼─────────────┤
│ 3 │ 10 │ 0 │
╰─────────┴──────────┴─────────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 0.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 6.5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 2 │
├─────────┼──────────┤
│ 3 │ 1 │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val='a'),
User(id=1, val='b'),
User(id=3, val='c'),
User(id=3, val='d'),
]
).group_by(
group={User.id: User.id},
agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ "ab" │
├─────────┼──────────┤
│ 3 │ "cd" │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=True,
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=False
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.
The aggregation, allowing you to continue to chain methods.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ null │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Create a Snowflake data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a PostgreSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a MySQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a BigQuery data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Redshift data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a CloudSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Testing SQL source.
If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.
The SQL source for use in Chalk resolvers.
source = SQLiteInMemorySource(name="RISK")
Create a Databricks data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Incremental settings for Chalk SQL queries.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter" mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
. This will incrementalize your query using the timestamp
of the latest row that has been ingested.
Chalk will also include another query parameter named "chalk_last_execution_timestamp"
that can be used instead.
This will incrementalize your query using the last time the query was executed.
incremental_timestamp:
If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.
If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.
The timestamp to set as the lower bound
Run a query from a SQL file.
This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL File Resolvers for more information.
The path to the file with the sql file,
relative to the caller's file, or to the
directory that your chalk.yaml
file lives in.
Execute a query to a pa.Table
.
A function that, given the list of column names returned from the database, produces a mapping of columns to converters. Columns not in the resulting dictionary should be ignored.
The executor will attempt to avoid the use of SQLAlchemy and use more efficient methods to retrieve data.
A pa.Table
containing the results.
Execute a query to a pa.Table
.
A function that, given the list of column names returned from the database, produces a mapping of columns to converters. Columns not in the resulting dictionary should be ignored.
A pa.Table
containing the results.
Automatically ingest a table.
from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
name="users",
features=User,
).with_table(
name="accounts",
features=Account,
# Override one of the column mappings.
column_to_feature={
"acct_id": Account.id,
},
)
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Return the results represented by this Query as a DataFrame
.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Materialize the query.
Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.
Note: this requires the usage of the fields={...}
argument when used in conjunction with query_string
or query_sql_file
.
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
Decorator to create a stream resolver.
This parameter is defined when the streaming resolver returns a windowed feature.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of
tumbling windows as adjacently arranged bins of equal width.
Tumbling windows are most often used alongside max_staleness
to allow the features
to be sent to the online store and offline store after each window period.
Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.
See more at Window Modes
A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.
See more at Parsing
A mapping from input BaseModel
attribute to Chalk feature attribute to support continuous streaming re-keying.
This parameter is required for continuous resolvers.
Features that are included here do not have to be explicitly returned in the stream resolver:
the feature will automatically be set to the key value used for aggregation.
See more at Keys
An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom Event Timestamping
A callable function! You can unit-test stream resolvers as you would unit-test any other code.
Decorator to create a sink. Read more at Sinks
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests
@sink
def process_updates(
uid: User.id,
email: User.email,
phone: User.phone,
):
user_service.update(
uid=uid,
email=email,
phone=phone
)
process_updates(123, "sam@chalk.ai", "555-555-5555")
Declare a windowed feature.
@features
class User:
failed_logins: Windowed[int] = windowed("10m", "24h")
Create a windowed feature.
See more at Windowed Features
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.
Read more at Caching
Feature versions allow you to manage a feature as its definition changes over time.
The version
keyword argument allows you to specify the
maximum number of versions available for this feature.
See more at Versioning
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Read more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
Metadata for the windowed feature, parameterized by
TPrim
(the primitive type of the feature) and
TRich
(the decoded type of the feature, if decoder
is provided).
from chalk import windowed, Windowed
@features
class User:
id: int
email_count: Windowed[int] = windowed(days=range(1, 30))
logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
Protocol used to communicate with brokers.
Valid values are "PLAINTEXT"
, "SSL"
, "SASL_PLAINTEXT"
, and "SASL_SSL"
.
Defaults to "PLAINTEXT"
.
Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL.
Valid values are "PLAIN"
, "GSSAPI"
, "SCRAM-SHA-256"
, "SCRAM-SHA-512"
, "OAUTHBEARER"
.
Defaults to "PLAIN"
.
The ChalkClient
is the primary Python interface for interacting with Chalk.
You can use it to query data, trigger resolver runs, gather offline data, and more.
Create a ChalkClient
. Clients can either be created
explicitly, with environment variables, or using credentials
in ~/.chalk.yml
.
The __init__
method specifies overloads for this purpose.
See the overloaded methods for details.
from chalk.client import ChalkClient
client = ChalkClient(branch="testing")
client.query(
input={
User.name: "Katherine Johnson"
},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Compute features values using online resolvers. See Overview for more information.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
The time at which to evaluate the query. If not specified, the current time will be used.
This parameter is complex in the context of online_query since the online store
only stores the most recent value of an entity's features. If now
is in the past,
it is extremely likely that None
will be returned for cache-only features.
This parameter is primarily provided to support:
If you are trying to perform an exploratory analysis of past feature values, prefer offline_query
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If True
, the output of each of the query plan stages will be stored.
This option dramatically impacts the performance of the query,
so it should only be used for debugging.
from chalk.client import ChalkClient
result = ChalkClient().query(
input={
User.name: "Katherine Johnson"
},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)
Execute multiple queries (represented by queries=
argument) in a single request. This is useful if the
queries are "rooted" in different @features
classes -- i.e. if you want to load features for User
and
Merchant
and there is no natural relationship object which is related to both of these classes, multi_query
allows you to submit two independent queries.
In contrast, query_bulk
executes a single query with multiple inputs/outputs.
Compute features values for many rows of inputs using online resolvers. See Overview for more information on online query.
This method is similar to query
, except it takes in list
of inputs, and produces one
output per row of inputs.
This method is appropriate if you want to fetch the same set of features for many different input primary keys.
This method contrasts with multi_query
, which executes multiple fully independent queries.
This endpoint is not available in all environments.
The time at which to evaluate the query. If not specified, the current time will be used.
The length of this list must be the same as the length of the values in input
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results: list[BulkOnlineQueryResult], where each result contains dataframes of the results of each query.
from chalk.client import ChalkClient
ChalkClient().query_bulk(
input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Compute features values for many inputs using online resolvers. See Overview for more information.
This endpoint is not available in all environments.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
from chalk.client import ChalkClient
ChalkClient().query(
input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Compute feature values from the offline store or by running offline/online resolvers.
See Dataset
for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
A unique name that if provided will be used to generate and
save a Dataset
constructed from the list of features computed
from the inputs.
If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The maximum number of samples to include in the DataFrame
.
If not specified, all samples will be returned.
If True, progress bars will be shown while the query is running.
Primarily intended for use in a Jupyter-like notebook environment.
This flag will also be propagated to the methods of the resulting
Dataset
.
Used to control whether or not resolvers are allowed to run in order to compute feature values.
If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.
If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now()
dataset = ChalkClient().offline_query(
input={
User.id: uids,
User.ts: [at] * len(uids),
},
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
Get a Chalk Dataset
containing data from a previously created dataset.
If an offline query has been created with a dataset name, .get_dataset
will
return a Chalk Dataset
.
The Dataset
wraps a lazily-loading Chalk DataFrame
that enables us to analyze
our data without loading all of it directly into memory.
See Overview for more information.
The name of the Dataset
to return.
Previously, you must have supplied a dataset name upon an offline query.
Dataset names are unique for each environment.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now()
X = ChalkClient().offline_query(
input={
User.id: uids,
User.ts: [at] * len(uids),
},
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset='my_dataset_name'
)
Some time later...
dataset = ChalkClient().get_dataset(
dataset_name='my_dataset_name'
)
...
If memory allows:
df: pd.DataFrame = dataset.get_data_as_pandas()
Triggers a resolver to run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
resolver_fqn="mymodule.fn"
)
Retrieves the status of a resolver run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
from chalk.client import ChalkClient
ChalkClient().get_run_status(
run_id="3",
)
ResolverRunResponse(
id="3",
status=ResolverRunStatus.SUCCEEDED
)
Targets feature observation values for deletion and performs deletion online and offline.
An optional list of the feature names of the features that should be deleted for the targeted primary keys. Not specifying this and not specifying the "tags" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "tags" parameter are mutually exclusive.
An optional list of tags that specify features that should be targeted for deletion. If a feature has a tag in this list, its observations for the primary keys you listed will be targeted for deletion. Not specifying this and not specifying the "features" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "features" parameter are mutually exclusive.
Holds any errors (if any) that occurred during the drop request. Deletion of a feature may partially-succeed.
from chalk.client import ChalkClient
ChalkClient().delete_features(
namespace="user",
features=["name", "email", "age"],
primary_keys=[1, 2, 3]
)
Performs a drop on features, which involves a deletes all their data (both online and offline). Once the feature is reset in this manner, its type can be changed.
Holds any errors (if any) that occurred during the drop request. Dropping a feature may partially-succeed.
from chalk.client import ChalkClient
ChalkClient().drop_features(
namespace="user",
features=["name", "email", "age"],
)
Upload data to Chalk for use in offline resolvers or to prime a cache.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The errors encountered from uploading features.
from chalk.client import ChalkClient
ChalkClient().upload_features(
input={
User.id: 1,
User.name: "Katherine Johnson"
}
)
Upload data to Chalk for use in offline resolvers or to prime a cache.
A list of mappings, each of which includes the features for which there are known values mapped to those values. Each mapping can have different keys, but each mapping must have the same root features class. OR A mapping where each feature key is mapped to a list of the values for that feature. You can consider this a mapping that describes columns (keys, i.e. features) and rows (the list of values in the map for each feature). Each list must be the same length. OR A pandas, polars or Chalk DataFrame.
The environment under which to run the upload. API tokens can be scoped to an environment. If no environment is specified in the upload, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The errors encountered from uploading features.
from chalk.client import ChalkClient
ChalkClient().multi_upload_features(
input=[
{
User.id: 1,
User.name: "Katherine Johnson"
},
{
User.id: 2,
User.name: "Eleanor Roosevelt"
}
]
)
Get the most recent feature values from the offline store.
See Overview for more information.
A pandas.DataFrame
with columns equal to the names of the features in output,
and values representing the value of the most recent observation.
from chalk.client import ChalkClient
sample_df = ChalkClient().sample(
output=[
Account.id,
Account.title,
Account.user.full_name
],
max_samples=10
)
Create a new branch based off of a deployment from the server. By default, uses the latest live deployment.
The specific deployment ID to use for the branch. If not specified, the latest live deployment on the server will be used. You can see which deployments are available by clicking on the 'Deployments' tab on the project page in the Chalk dashboard.
A response object containing metadata about the branch.
from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
Point the ChalkClient
at the given branch.
If the branch does not exist or if branch deployments are not enabled for the current environment, this method raises an error.
from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
client.set_branch("my-new-branch")
Returns a BranchGraphSummary
object that contains the
state of the branch server: Which resolver/features are
defined, and the history of live notebook updates on the
server.
The branch to query. If not specified, the branch is
expected to be included in the constructor for ChalkClient
.
Tests a streaming resolver and its ability to parse and resolve messages. See Streams for more information.
The number of messages to digest from the stream source. As messages may not be incoming into the stream, this action may time out.
A filepath from which test messages will be ingested. This file should be newline delimited json as follows:
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
Each line may optionally contain a timezone string as a value to the key "message_timestamp".
Alternatively, keys can be supplied in code along with the "test_message_bodies" argument. Both arguments must be the same length.
Message bodies can be supplied in code as strings or bytes along with the "test_message_keys" argument. Both arguments must be the same length.
A simple wrapper around a status and optional error message.
Calling .features()
will return the test results, if they exist.
Otherwise, check .errors
and .message
for errors.
from chalk.streams import stream, KafkaSource
from chalk.client import ChalkClient
from chalk.features import Features, features
import pydantic
>>>
# This code is an example of a simple streaming feature setup.
stream_source=KafkaSource(...)
>>>
@features(etl_offline_to_online=True, max_staleness="7d")
class StreamingFeature:
id: str
user_id: str
card_id: str
>>>
class StreamingMessage(pydantic.BaseModel):
card_id: str
user_id: str
>>>
@stream(source=stream_source)
def our_stream_resolver(
m: StreamingMessage,
) -> Features[StreamingFeature.id, StreamingFeature.card_id, StreamingFeature.user_id]:
return StreamingFeature(
id=f"{m.card_id}-{m.user_id}",
card_id=m.card_id,
user_id=m.user_id,
)
>>>
# Once you have done a `chalk apply`, you can test the streaming resolver with custom messages as follows
client = ChalkClient()
keys = ["my_key"] * 10
messages = [StreamingMessage(card_id="1", user_id=str(i)).json() for i in range(10)]
resp = client.test_streaming_resolver(
resolver="our_stream_resolver",
message_keys=keys,
message_bodies=messages,
)
print(resp.features)
The value of the requested feature. If an error was encountered in resolving this feature, this field will be empty.
The ChalkError
describes an error from running a resolver
or from a feature that can't be validated.
The category of the error, given in the type field for the error codes. This will be one of "REQUEST", "NETWORK", and "FIELD".
Loads a pl.LazyFrame
containing the output. This method is appropriate for working with larger-than-memory datasets.
Use .to_polars()
if you want a DataFrame
instead.
A pl.LazyFrame
materializing query output data.
Waits for the revision job to complete.
ChalkClient.offline_query
returns a DatasetRevision
instance immediately after
submitting the revision job. This method can be used to wait for the
revision job to complete.
Once the revision job is complete, the status
attribute of the
DatasetRevision
instance will be updated to reflect the status of the
revision job.
If the revision job was successful, you can then use methods such as
get_data_as_pandas()
without having to wait for the revision job to
complete.
Downloads the resolver replay data for the given resolver in the revision, provided the revision had store_plan_stages enabled.
The replay data is functionally similar to viewing the intermediate results on the plan explorer.
If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly.
If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data
will be returned. If the resolver does not appear in the plan, an exception will be thrown.lazy
Whether to return a pl.LazyFrame
or a pl.DataFrame
. If True
, a pl.LazyFrame
will be returned.
Wrapper around Offline Query results.
Datasets are obtained by invoking ChalkClient.offline_query()
.
Dataset
instances store important metadata and enable the retrieval of
offline query outputs.
from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now()
dataset: Dataset = ChalkClient().offline_query(
input={
User.id: uids,
User.ts: [at] * len(uids),
},
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
df.recompute(features=[User.fraud_score], branch="feature/testing")
Loads a pl.LazyFrame
containing the output. This method is appropriate for working with larger-than-memory datasets.
Use .to_polars()
if you want a DataFrame
instead.
A pl.LazyFrame
materializing query output data.
Downloads output files pertaining to the revision to the given path.
Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.
from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now()
dataset: Dataset = ChalkClient().offline_query(
input={
User.id: uids,
User.ts: [at] * len(uids),
},
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
dataset.download_data('my_directory')
Creates a new revision of this Dataset
by recomputing the specified features.
Carries out the new computation on the branch specified when constructing the client.
A list of specific features to recompute. Features that don't exist in the dataset will be added. If not provided, all the existing features in the dataset will be recomputed.
If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.
If True, progress bars will be shown while recomputation is running.
This flag will also be propogated to the methods of the resulting
Dataset
.
If no branch was provided to the Chalk Client.
from chalk.client import ChalkClient
dataset = ChalkClient(branch="data_science").offline_query(...)
df = dataset.get_data_as_polars()
# make changes to resolvers in your project
dataset.recompute()
new_df = dataset.get_data_as_polars() # receive newly computed data
Downloads the resolver replay data for the given resolver in the latest revision of the dataset.
The replay data is functionally similar to viewing the intermediate results on the plan explorer.
If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.
The category of an error.
For more detailed error information, see ErrorCode
Request errors are raised before execution of your resolver code. They may occur due to invalid feature names in the input or a request that cannot be satisfied by the resolvers you have defined.
Field errors are raised while running a feature resolver for a particular field. For this type of error, you'll find a feature and resolver attribute in the error type. When a feature resolver crashes, you will receive null value in the response. To differentiate from a resolver returning a null value and a failure in the resolver, you need to check the error schema.
The detailed error code.
For a simpler category of error, see ErrorCodeCategory
.
Convenience method for accessing feature result from the data response.
The FeatureResult
for the feature, if it exists.
from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature(User.name).ts
datetime.datetime(2023, 2, 5, 23, 25, 26, 427605)
data.get_feature("user.name").meta.cache_hit
False
Context in which to execute a query.
Raised when constructing a ChalkClient
without valid credentials.
When this exception is raised, no explicit client_id
and client_secret
were provided, there was no ~/.chalk.yml
file with applicable credentials,
and the environment variables CHALK_CLIENT_ID
and CHALK_CLIENT_SECRET
were not set.
You may need to run chalk login
from your command line, or check that your
working directory is set to the root of your project.
Duration is used to describe time periods in natural language. To specify using natural language, write the count of the unit you would like, followed by the representation of the unit.
Chalk supports the following units:
Signifier | Meaning |
---|---|
w | Weeks |
d | Days |
h | Hours |
m | Minutes |
s | Seconds |
ms | Milliseconds |
As well as the special keyword "infinity"
Examples
Signifier | Meaning |
---|---|
"10h" | 10 hours |
"1w 2m" | 1 week and 2 minutes |
"1h 10m 2s" | 1 hour, 10 minutes, and 2 seconds |
"infinity" | Unbounded time duration |
Read more at Duration
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environments
can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersSee more at Environments
Tags allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
Like Environments, tags control when resolvers run based on the Online Context or Training Context matching the tags provided to the resolver decorator. Resolvers optionally take a keyword argument named tags that can take one of three types:
None
(default) - The resolver will be a candidate to run for every set of tags.str
- The resolver will run only if this tag is provided.list[str]
- The resolver will run in all of the specified tags match.See more at Tags
Get the tags for a feature, feature class, or resolver.
If the supplied variable is not a feature, feature class, or resolver.
Feature tags
@features(tags="group:risk")
class User:
id: str
# :tags: pii
email: str
tags(User.id)
['group:risk']
Feature class tags
tags(User)
['group:risk']
Feature + feature class tags
tags(User.email)
['pii', 'group:risk']
Get the description of a feature, feature class, or resolver.
If the supplied variable is not a feature, feature class, or resolver.
@features
class RocketShip:
# Comments above a feature become
# descriptions for the feature!
software_version: str
description(RocketShip.software_version)
'Comments above a feature become descriptions for the feature!'
Determine whether a feature is a feature time.
See Time for more details on FeatureTime
.
True
if the feature is a FeatureTime
and False
otherwise
from chalk.features import features
@features
class User:
id: str
updated_at: datetime = feature_time()
assert is_feature_time(User.updated_at) is True
assert is_feature_time(User.id) is False
Any class decorated by @dataclass
.
There isn't a base class for dataclass
, so we use this
TypeAlias
to refer to indicate any class decorated with
@dataclass
.
The base type for Chalk exceptions.
This exception makes error handling easier, as you can look only for this exception class.
from chalk.monitoring import Chart, Series
Chart(name="Request count").with_trigger(
Series
.feature_null_ratio_metric()
.where(feature=User.fico_score) > 0.2,
)
Change the window period for a Chart
.
Triggers are applied when a certain series is above or below a given value. The expression specifies the series, operand, and value as follows
A description to your Trigger
. Descriptions
provided here will be included in the alert message in
Slack or PagerDuty.
For Slack alerts, you can use the mrkdwn syntax described here: https://api.slack.com/reference/surfaces/formatting#basics
Class describing a series of data in two dimensions, as in a line chart. Series should be instantiated with one of the classmethods that specifies the metric to be tracked.
Creates a Series
of metric kind FeatureStaleness
.
The time window to calculate the metric over.
A new FeatureStalenessSeries
instance that inherits from the Series
class.
Creates a Series
of metric kind FeatureValue
.
The time window to calculate the metric over.
A new FeatureValueSeries
instance that inherits from the Series
class.
Creates a Series
of metric kind ResolverLatency
.
The time window to calculate the metric over.
A new ResolverLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind QueryLatency
.
The time window to calculate the metric over.
A new QueryLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind CronLatency
.
The time window to calculate the metric over.
A new CronLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamMessageLatency
.
The time window to calculate the metric over.
A new StreamMessageLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamWindowLatency
.
The time window to calculate the metric over.
A new StreamWindowLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamLag
.
The time window to calculate the metric over.
A new StreamLagSeries
instance that inherits from the Series
class.
Given two DataFrame
s, left
and right
, check if left == right
,
and raise otherwise.
If False
, allows the assert/test to succeed if the required columns are present,
irrespective of the order in which they appear.
If False
, allows the assert/test to succeed if the required rows are present,
irrespective of the order in which they appear; as this requires
sorting, you cannot set on frames that contain un-sortable columns.
If left
does not equal right
If chalkpy[runtime]
is not installed.
@online
def get_average_spend_30d(
spend: User.cards[after(days_ago=30)],
) -> User.average_spend_30d:
return spend.mean()
with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)):
now = datetime.now(tz=timezone.utc)
get_average_spend_30d(
spend=DataFrame([
Card(spend=10, ts=now - timedelta(days=31)),
Card(spend=20, ts=now - timedelta(days=29)),
Card(spend=30, ts=now - timedelta(days=28)),
])
)