Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:
from chalk.features import features, DataFrame
@features
class CreditCard:
id: int
user_id: "User.id"
limit: float
@features
class User:
id: int
name: str
email: str
credit_cards: DataFrame[CreditCard]
total_limit: float = _.credit_cards[_.limit].sum()
Features can be nested, as with credit_cards
above.
In this section, we'll dive into API components that make up the
building blocks of features.
The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Setting etl_offline_to_online
on a feature class assigns it to all features on the
class which do not explicitly specify etl_offline_to_online
.
When a feature is expensive or slow to compute, you may wish to cache its value.
Chalk uses the terminology "maximum staleness" to describe how recently a feature
value needs to have been computed to be returned without re-running a resolver.
Assigning a max_staleness
to the feature class assigns it to all features on the
class which do not explicitly specify a max_staleness
value of their own.
@features(
owner="andy@chalk.ai",
max_staleness="30m",
etl_offline_to_online=True,
tags="user-group",
)
class User:
id: str
# Comments here appear in the web!
# :tags: pii
name: str | None
# :owner: userteam@mycompany.com
location: LatLng
Add metadata and configuration to a feature.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner
from chalk.features import features, feature
from datetime import date
@features
class User:
id: str
# :owner: user-team@company.com
name: str
dob: date = feature(owner="user-team@company.com")
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags
from chalk.features import features, feature
@features
class User:
id: str
# :tags: pii
name: str
dob: date = feature(tags=["pii"])
The maximum version for a feature. Versioned features can be
referred to with the @
operator:
@features
class User:
id: str
score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"
See more at Versioning
The default version for a feature. When you reference a
versioned feature without the @
operator, you reference
the default_version
. Set to 1
by default.
@features
class User:
id: str
score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"
See more at Default versions
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching
When True
(default), Chalk will cache all values, including nulls.
When False
, Chalk will not update the null entry in the cache.
When "evict_nulls"
, Chalk will evict the entry that would have been
null from the cache, if it exists.
Concretely, suppose the current state of a database is {a: 1, b: 2}
,
and you write a row {a: 2, b: None}
. Here is the expected result in the db:
{a: 2, b: None}
when cache_nulls=True
(default){a: 2, b: 2}
when cache_nulls=False
{a: 2}
when cache_nulls="evict_nulls"
If cache_defaults is set, this will override the value of cache_nulls
When True
(default), Chalk will cache all values, including default values.
When False
, Chalk will not update the default entry in the cache.
When "evict_defaults"
, Chalk will evict the entry that would have been
a default value from the cache, if it exists.
Concretely, suppose the current state of a database is {a: 1, b: 2}
,
and you write a row {a: 2, b: "default"}
, and the default value for feature b is "default"
.
Here is the expected result in the db:
{a: 2, b: "default"}
when cache_defaults=True
{a: 2, b: 2}
when cache_defaults=False
{a: 2}
when cache_defaults="evict_defaults"
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Read more at Reverse ETL
If specified, when this feature is computed, Chalk will check that x >= min
.
from chalk.features import features, feature
@features
class User:
id: str
fico_score: int = feature(min=300, max=850)
If specified, when this feature is computed, Chalk will check that x <= max
.
from chalk.features import features, feature
@features
class User:
id: str
fico_score: int = feature(min=300, max=850)
If specified, when this feature is computed, Chalk will check that len(x) >= min_length
.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(min_length=1)
If specified, when this feature is computed, Chalk will check that len(x) <= max_length
.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(min_length=1000)
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of validations to apply to this feature. Generally, max
, min
, max_length
,
and min_length
are more convenient, but the parameter strict
applies to all
of those parameters. Use this parameter if you want to mix strict and non-strict validations.
The backing pyarrow.DataType
for the feature. This parameter can
be used to control the storage format of data. For example, if you
have a lot of data that could be represented as smaller data types,
you can use this parameter to save space.
import pyarrow as pa
@features
class WatchEvent:
id: str
duration_hours: float = feature(dtype=pa.float8())
The default value of the feature if it otherwise can't be computed.
If you don't need to specify other metadata, you can also assign a default
in the same way you would assign a default to a dataclass
:
from chalk.features import features
@features
class User:
num_purchases: int = 0
An underscore expression for defining the feature. Typically,
this value is assigned directly to the feature without needing
to use the feature(...)
function. However, if you want to define
other properties, like a default
or max_staleness
, you'll
want to use the expression
keyword argument.
from chalk.features import features
from chalk import _
@features
class Receipt:
subtotal: int
tax: int = 0 # Default value, without other metadata
total: int = feature(expression=_.subtotal + _.tax, default=0)
See more at Underscore
If True
, this feature is considered deprecated, which impacts the dashboard, alerts,
and warnings.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(deprecated=True)
The type of the input feature, given by _TRich
.
from chalk.features import Primary, features, feature
@features
class User:
uid: Primary[int]
# Uses a default value of 0 when one cannot be computed.
num_purchases: int = 0
# Description of the name feature.
# :owner: fraud@company.com
# :tags: fraud, credit
name: str = feature(
max_staleness="10m",
etl_offline_to_online=True
)
score: int = feature(
version=2, default_version=2
)
Specify a feature that represents a one-to-one relationship.
This function allows you to explicitly specify a join condition between
two @features
classes. When there is only one way to join two classes,
we recommend using the foreign-key definition instead of this has_one
function. For example, if you have a User
class and a Card
class, and
each user has one card, you can define the Card
and User
classes as
follows:
@features
class User
id: str
@features
class Card
id: str
user_id: User.id
user: User
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
card: Card = has_one(
lambda: User.id == Card.user_id
)
Specify a feature that represents a one-to-many relationship.
The join condition between @features
classes.
This argument is callable to allow for forward
references to members of this class and the joined
class.
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
cards: DataFrame[Card] = has_many(
lambda: User.id == Card.user_id
)
The function after
can be used with DataFrame
to compute windowed features.
after
filters a DataFrame
relative to the current time in context, such that if
the after
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where now - {time_window} <= t <= now
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to after
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]
The function before
can be used with DataFrame
to compute windowed features.
before
filters a DataFrame
relative to the current time in context such that
if the before
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where t <= now - {time_window}
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to before
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]
Declare a windowed feature.
@features
class User:
failed_logins: Windowed[int] = windowed("10m", "24h")
Create a windowed feature.
See more at Windowed
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.
See more at Caching
Sets a maximum age for values eligible to be retrieved from the offline store, defined in relation to the query's current point-in-time.
Feature versions allow you to manage a feature as its definition changes over time.
The version
keyword argument allows you to specify the
maximum number of versions available for this feature.
See more at Versioning
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
See more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of Validations to apply to this feature.
See more at https://docs.chalk.ai/api-docs#Validation
The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum()
.
Configuration for aggregating data. Pass bucket_duration
with a
Duration to configure the bucket size for aggregation.
If True
, each of the windows will use a bucket duration equal to its
window duration.
See more at Materialized window aggregations
Metadata for the windowed feature, parameterized by
TPrim
(the primitive type of the feature) and
TRich
(the decoded type of the feature, if decoder
is provided).
from chalk import windowed, Windowed
@features
class User:
id: int
email_count: Windowed[int] = windowed(days=range(1, 30))
logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]
Create a windowed feature with grouping.
See more at Grouped materialized window aggregations
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
The expression to compute the feature. This is an underscore expression,
like _.transactions[_.amount].sum()
.
Configuration for aggregating data. Pass bucket_duration
with a
Duration to configure the bucket size for aggregation.
See more at Materialized window aggregations
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
The backing pyarrow.DataType
for the feature. This parameter can
be used to control the storage format of data. For example, if you
have a lot of data that could be represented as smaller data types,
you can use this parameter to save space.
import pyarrow as pa
from chalk.features import features
@features
class User:
id: str
email_count: Windowed[int] = windowed(
"10m", "30m",
dtype=pa.int16(),
)
from chalk import group_by_windowed, DataFrame
from chalk.features import features
@features
class Email:
id: int
user_id: "User.id"
@features
class User:
id: int
emails: DataFrame[Email]
emails_by_category: DataFrame = group_by_windowed(
"10m", "30m",
expression=_.emails.group_by(_.category).count(),
)
Marks a feature as the primary feature for a feature class.
Features named id
on feature classes without an explicit primary
feature are declared primary keys by default, and don't need to be
marked with Primary
.
If you have primary key feature with a name other than id
,
you can use this marker to indicate the primary key.
from chalk.features import features
from chalk import Primary
@features
class User:
username: Primary[str]
Specify explicit data validation for a feature.
The feature()
function can also specify these validations,
but this class allows you to specify both strict and non-strict
validations at the same time.
from chalk.features import features, feature
@features
class User:
fico_score: int = feature(
validations=[
Validation(min=300, max=850, strict=True),
Validation(min=300, max=320, strict=False),
Validation(min=840, max=850, strict=False),
]
)
# If only one set of validations were needed,
# you can use the [`feature`](#feature) function instead:
first_name: str = feature(
min_length=2, max_length=64, strict=True
)
The Vector class can be used type annotation to denote a Vector feature.
Instances of this class will be provided when working with raw vectors inside of resolvers.
Generally, you do not need to construct instances of this class directly, as Chalk will
automatically convert list-like features into Vector
instances when working with a Vector
annotation.
data: numpy.Array | list[float] | pyarrow.FixedSizeListScalar The vector values
from chalk.features import Vector, features
@features
class Document:
embedding: Vector[1536]
Define a nearest neighbor relationship for performing Vector similarity search.metric: "l2" | "ip" | "cos" The metric to use to compute distance between two vectors. L2 Norm ("l2"), Inner Product ("ip"), and Cosine ("cos") are supported. Defaults to "l2".
A nearest neighbor relationship filter.
Decorator to create an online resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time
to build a static computation graph. If True
, all inputs will
either be StaticOperators
(for has-many and DataFrame
relationships)
or StaticExpressions
(for individual features). The resolver must
return a StaticOperator
as output.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@online
def name_match(
name: User.full_name,
account_name: User.bank_account.title
) -> User.account_name_match_score:
if name.lower() == account_name.lower():
return 1.
return 0.
Decorator to create an offline resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time to
build a static computation graph. If True
, all inputs will either
be StaticOperators
(for has-many and dataframe relationships) or
StaticExpressions
(for individual features). The resolver must
return a StaticOperator
as output.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@offline(cron="1h")
def get_fraud_score(
email: User.email,
name: User.name,
) -> User.fraud_score:
return socure.get_sigma_score(email, name)
Run an online or offline resolver on a schedule.
This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.
The period of the cron job. Can be either a crontab ("0 * * * *"
)
or a Duration
("2h"
).
Optionally, a function to filter down the arguments to consider.
See Filtering examples for more information.
Explicitly provide the sample function for the cron job.
See Custom examples for more information.
Using a filter
def only_active_filter(v: User.active):
return v
@online(cron=Cron(schedule="1d", filter=only_active_filter))
def score_user(d: User.signup_date) -> User.score:
return ...
Using a sample function
def s() -> DataFrame[User.id]:
return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others
Read more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The arguments to pass to the decorated function.
If one of the arguments is a DataFrame
with a
filter or projection applied, the resolver will
only be called with the filtered or projected
data. Read more at
https://docs.chalk.ai/docs/unit-tests#data-frame-inputs
The result of calling the decorated function
with args
. Useful for unit-testing.
Read more at Unit Tests
@online
def get_num_bedrooms(
rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
return len(rooms)
rooms = [
Room(id=1, name="bedroom"),
Room(id=2, name="kitchen"),
Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
The type of machine to use.
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
Chalk includes a DataFrame
class that models tabular data in much the same
way that pandas
does. However, there are some key differences that
allow the Chalk DataFrame
to increase type safety and performance.
Like pandas, Chalk's DataFrame
is a two-dimensional data structure with
rows and columns. You can perform operations like filtering, grouping,
and aggregating on a DataFrame
. However, there are two main differences.
DataFrame
is lazy and can be backed by multiple data sources, where a pandas.DataFrame
executes eagerly in memory.DataFrame[...]
can be used to represent a type of data with pre-defined filters.Unlike pandas, the implementation of a Chalk DataFrame
is lazy, and
can be executed against many different backend sources of data.
For example, in unit tests, a DataFrame
uses an implementation backed by polars.
But if your DataFrame
was returned from a SQL source,
filters and aggregations may be pushed down to the database for efficient
execution.
Each column of a Chalk DataFrame
typed by a Feature
type.
For example, you might have a resolver returning a DataFrame
containing user ids and names:
@features
class User:
id: int
name: str
email: str
@online
def get_users() -> DataFrame[User.id, User.name]:
return DataFrame([
User(id=1, name="Alice"),
User(id=2, name="Bob")
])
Construct a Chalk DataFrame
.
The data. Can be an existing pandas.DataFrame
,
polars.DataFrame
or polars.LazyFrame
,
a sequence of feature instances, or a dict
mapping
a feature to a sequence of values.
The strategy to use to handle missing values.
A feature value is "missing" if it is an ellipsis (...
),
or it is None
and the feature is not annotated as Optional[...]
.
The available strategies are:
'error'
: Raise a TypeError
if any missing values are found.
Do not attempt to replace missing values with the default
value for the feature.'default_or_error'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, raise a TypeError
.'default_or_allow'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, leave it as None
. This is the default strategy.'allow'
: Allow missing values to be stored in the DataFrame
.
This option may result non-nullable features being assigned
None
values.Row-wise construction
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
Column-wise construction
df = DataFrame({
User.id: [1, 2],
User.first: ["Sam", "Iris"],
User.last: ["Wu", "Xi"]
})
Construction from polars.DataFrame
import polars
df = DataFrame(polars.DataFrame({
"user.id": [1, 2],
"user.first": ["Sam", "Iris"],
"user.last": ["Wu", "Xi"]
}))
Aggregate the DataFrame
by the specified columns.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Compute a histogram with fixed width bins.
The column to compute the histogram on. If not supplied, the DataFrame
is assumed to contain a single column.
DataFrame({
Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]
Group based on a time value (date
or datetime
).
The groups are defined by a time-based window, and optionally,
other columns in the DataFrame
. The "width" of the window is
defined by the period
parameter, and the spacing between the
windows is defined by the every
parameter. Note that if the
every
parameter is smaller than the period
parameter, then
the windows will overlap, and a single row may be assigned to
multiple groups.
As an example, consider the following DataFrame
:
val: a b c d e f g h
─────────●─────────●─────────●─────────●───────▶
time: A B C D
┌─────────┐
1 │ a b │ 1: [a, b]
└────┬────┴────┐
2 ◀───▶│ b c │ 2: [b, c]
every└────┬────┴────┐
3 ◀────────▶│ c d e │ 3: [c, d, e]
period └────┬────┴────┐
4 │d e f│ 4: [d, e, f]
└────┬────┴────┐
5 │ f │ 5: [f]
└────┬────┴────┐
6 │ │
└────┬────┴────┐
7 │ g h │ 7: [g, h]
└────┬────┴────┐
8 │g h │ 8: [g, h]
└─────────┘
In the above example, the sixth time bucket is empty, and
will not be included in the resulting DataFrame
.
from chalk import DataFrame, op
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
},
).group_by_hopping(
index=User.ts,
group={User.id: User.id},
agg={User.val: op.median(User.val)},
period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts │ User.val │
╞═════════╪══════════╪══════════╡
│ 1 │ 2020-1-1 │ 3 │
├─────────┼──────────┼──────────┤
│ 3 │ 2020-1-3 │ 10 │
╰─────────┴──────────┴──────────╯
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
User.first_name, User.first
)
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
User.first_name: User.first
})
Read a .csv file as a DataFrame
.
values = DataFrame.read_csv(
"s3://...",
columns={0: MyFeatures.id, 1: MyFeatures.name},
has_header=False,
)
Sort the DataFrame
by the given columns.
df = DataFrame({
User.a: [1, 2, 3],
User.b: [3, 2, 1],
})
df.sort(User.a)
a b
-----------
0 1 3
1 2 2
2 3 1
Get the underlying DataFrame
as a polars.LazyFrame
.
The underlying polars.LazyFrame
.
Get the underlying DataFrame
as a pyarrow.Table
.
The underlying pyarrow.Table
. This format is the canonical
representation of the data in Chalk.
Get the underlying DataFrame
as a pandas.DataFrame
.prefixed
Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name
, if
if prefixed=False, name
)
The data formatted as a pandas.DataFrame
.
Operations for aggregations in DataFrame
.
The class methods on this class are used to create
aggregations for use in DataFrame.group_by
.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
User.active: [True, True, False],
}
).group_by(
group={User.id: User.id}
agg={
User.val: op.product(User.val),
User.active: op.product(User.active),
}
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
│ 1 │ 2 │ 1 │
├─────────┼──────────┼─────────────┤
│ 3 │ 10 │ 0 │
╰─────────┴──────────┴─────────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 0.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 6.5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 2 │
├─────────┼──────────┤
│ 3 │ 1 │
╰─────────┴──────────╯
Concatenate the string values of col
and col2
in a DataFrame
.
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val='a'),
User(id=1, val='b'),
User(id=3, val='c'),
User(id=3, val='d'),
]
).group_by(
group={User.id: User.id},
agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ "ab" │
├─────────┼──────────┤
│ 3 │ "cd" │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=True,
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=False
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.
The aggregation, allowing you to continue to chain methods.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ null │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Create a Snowflake data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a PostgreSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a MySQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a DynamoDB data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details. DynamoDBSources can be queried via PartiQL SQL
resolvers.
You may override the ambient AWS credentials by providing either a client ID and secret, or a role ARN.
Create a BigQuery data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Redshift data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a CloudSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Testing SQL source.
If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.
The SQL source for use in Chalk resolvers.
source = SQLiteInMemorySource(name="RISK")
Create a Databricks data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Spanner data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Incremental settings for Chalk SQL queries.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter" mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
. This will incrementalize your query using the timestamp
of the latest row that has been ingested.
Chalk will also include another query parameter named "chalk_last_execution_timestamp"
that can be used instead.
This will incrementalize your query using the last time the query was executed.
incremental_timestamp:
If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.
If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.
The timestamp to set as the lower bound
Run a query from a SQL file.
This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL file resolvers for more information.
The path to the file with the sql file,
relative to the caller's file, or to the
directory that your chalk.yaml
file lives in.
Automatically ingest a table.
from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
name="users",
features=User,
).with_table(
name="accounts",
features=Account,
# Override one of the column mappings.
column_to_feature={
"acct_id": Account.id,
},
)
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Return the results represented by this Query as a DataFrame
.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
A query that can be returned from a resolver.
Materialize the query.
Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.
Note: this requires the usage of the fields={...}
argument when used in conjunction with query_string
or query_sql_file
.
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
A query that can be returned from a resolver.
Generate a Chalk SQL file resolver from a filepath and a sql string.
This will generate a resolver in your web dashboard that can be queried,
but will not output a .chalk.sql
file.
The optional parameters are overrides for the comment key-value pairs at the top of the sql file resolver. Comment key-value pairs specify important resolver information such as the source, feature namespace to resolve, and other details. Note that these will override any values specified in the sql string. See Configuration for more information.
See SQL file resolvers for more information on SQL file resolvers.
Can either be a BaseSQLSource or a string.
If a string is provided, it will be used to infer the source by
first scanning for a source with the same name, then inferring
the source if it is a type, e.g. snowflake
if there is only
one database of that type. Optional if source
is specified in sql
.
from chalk import make_sql_file_resolver
from chalk.features import features
@features
class User:
id: int
name: str
make_sql_file_resolver(
name="my_resolver",
sql="SELECT user_id as id, name FROM users",
source="snowflake",
resolves=User,
kind="offline",
)
Decorator to create a stream resolver.
This parameter is defined when the streaming resolver returns a windowed feature.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of
tumbling windows as adjacently arranged bins of equal width.
Tumbling windows are most often used alongside max_staleness
to allow the features
to be sent to the online store and offline store after each window period.
Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.
See more at Window modes
A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.
See more at Parsing
A mapping from input BaseModel
attribute to Chalk feature attribute to support continuous streaming re-keying.
This parameter is required for continuous resolvers.
Features that are included here do not have to be explicitly returned in the stream resolver:
the feature will automatically be set to the key value used for aggregation.
See more at Keys
An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom event timestamping
A callable function! You can unit-test stream resolvers as you would unit-test any other code.
Decorator to create a sink. Read more at Sinks
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests
@sink
def process_updates(
uid: User.id,
email: User.email,
phone: User.phone,
):
user_service.update(
uid=uid,
email=email,
phone=phone
)
process_updates(123, "sam@chalk.ai", "555-555-5555")
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
Protocol used to communicate with brokers.
Valid values are "PLAINTEXT"
, "SSL"
, "SASL_PLAINTEXT"
, and "SASL_SSL"
.
Defaults to "PLAINTEXT"
.
Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL.
Valid values are "PLAIN"
, "GSSAPI"
, "SCRAM-SHA-256"
, "SCRAM-SHA-512"
, "OAUTHBEARER"
.
Defaults to "PLAIN"
.
The subscription id of your PubSub topic from which you want to consume messages. To enable permission for consuming this screen, ensure that the service account has the permissions 'pubsub.subscriptions.consume' and 'pubsub.subscriptions.get'.
Base class for all stream sources generated from @stream
.
Identifier for the dead-letter queue (DLQ) for the stream. If not specified, failed messages will be dropped. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub.
The ChalkClient
is the primary Python interface for interacting with Chalk.
You can use it to query data, trigger resolver runs, gather offline data, and more.
Create a ChalkClient
with the given credentials.
The client secret to use to authenticate. Can either be a service token secret or a user token secret.
The ID or name of the environment to use for this client.
Not necessary if your client_id
and client_secret
are for a service token scoped to a single environment.
If not present, the client will use the environment variable
CHALK_ENVIRONMENT
.
The API server to use for this client. Required if you are
using a Chalk Dedicated deployment. If not present, the client
will check for the presence of the environment variable
CHALK_API_SERVER
, and use that if found.
If specified, Chalk will route all requests from this client
instance to the relevant branch. Some methods allow you to
override this instance-level branch configuration by passing
in a branch
argument.
If True
, the client will pick up the branch from the
current git branch.
If specified, Chalk will route all requests from this client
instance to the relevant tagged deployment. This cannot be
used with the branch
argument.
If specified, Chalk will route all requests from this client instance to the relevant preview deployment.
The query server to use for this client. Required if you are using a standalone Chalk query engine deployment. If not present, the client will default to the value of api_server.
The default wait timeout, in seconds, to wait for long-running jobs to complete when accessing query results. Jobs will not time out if this timeout elapses. For no timeout, set to None. The default is no timeout.
The default wait timeout, in seconds, to wait for network requests to complete. If not specified, the default is no timeout.
If client_id
or client_secret
are not provided, there
is no ~/.chalk.yml
file with applicable credentials,
and the environment variables CHALK_CLIENT_ID
and
CHALK_CLIENT_SECRET
are not set.
Compute features values using online resolvers. See Chalk Clients for more information.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
If an empty sequence, the output will be set to all features on the namespace
of the query. For example, if you pass as input {"user.id": 1234}
, then the query
is defined on the User
namespace, and all features on the User
namespace
(excluding has-one and has-many relationships) will be used as outputs.
The time at which to evaluate the query. If not specified, the current time will be used.
This parameter is complex in the context of online_query since the online store
only stores the most recent value of an entity's features. If now
is in the past,
it is extremely likely that None
will be returned for cache-only features.
This parameter is primarily provided to support:
If you are trying to perform an exploratory analysis of past feature values, prefer offline_query
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery
, the query will automatically pull outputs
and options specified in the matching NamedQuery
.
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery
with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
Returns metadata about the query execution under OnlineQueryResult.meta
.
This could make the query slightly slower.
For more information, see Chalk Clients.
If True
, the output of each of the query plan stages will be stored.
This option dramatically impacts the performance of the query,
so it should only be used for debugging.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
from chalk.client import ChalkClient
result = ChalkClient().query(
input={
User.name: "Katherine Johnson"
},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)
Execute multiple queries (represented by queries=
argument) in a single request. This is useful if the
queries are "rooted" in different @features
classes -- i.e. if you want to load features for User
and
Merchant
and there is no natural relationship object which is related to both of these classes, multi_query
allows you to submit two independent queries.
Returns a BulkOnlineQueryResponse
, which is functionally a list of query results. Each of these result
can be accessed by index. Individual results can be further checked for errors and converted
to pandas or polars DataFrames.
In contrast, query_bulk
executes a single query with multiple inputs/outputs.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results as a list[BulkOnlineQueryResult]
,
where each result contains a DataFrame
of the results of each
query or any errors.
from chalk.client import ChalkClient, OnlineQuery
queries = [
OnlineQuery(
input={User.name: 'Katherine Johnson'},
output=[User.fico_score],
),
OnlineQuery(
input={Merchant.name: 'Eight Sleep'},
output=[Merchant.address],
),
]
result = ChalkClient().multi_query(queries)
result[0].get_feature_value(User.fico_score)
Compute features values for many rows of inputs using online resolvers. See Chalk Clients for more information on online query.
This method is similar to query
, except it takes in list
of inputs, and produces one
output per row of inputs.
This method is appropriate if you want to fetch the same set of features for many different input primary keys.
This method contrasts with multi_query
, which executes multiple fully independent queries.
This endpoint is not available in all environments.
The time at which to evaluate the query. If not specified, the current time will be used.
The length of this list must be the same as the length of the values in input
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results as a list[BulkOnlineQueryResult]
,
where each result contains a DataFrame
of the results of each query.
from chalk.client import ChalkClient
ChalkClient().query_bulk(
input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Plan a query without executing it.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery
, the query will automatically pull outputs
and options specified in the matching NamedQuery
.
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery
with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
The number of input rows that this plan will be run with. If unknown, specify None
.
The query plan, including the resolver execution order and the resolver execution plan for each resolver.
from chalk.client import ChalkClient
result = ChalkClient().plan_query(
input=[User.id],
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.rendered_plan
result.output_schema
Check whether expected results of a query match Chalk query ouputs.
This function should be used in integration tests.
If you're using pytest
, pytest.fail
will be executed on an error.
Otherwise, an AssertionError
will be raised.
A feature set or a mapping of {feature: value}
of givens.
All values will be encoded to the json representation.
A feature set or a mapping of {feature: value}
of expected outputs.
For values where you do not care about the result, use an ...
for the
feature value (i.e. when an error is expected).
A list of the features that you expect to be read from the online store, e.g.
cache_hits=[Actor.name, Actor.num_appearances]
A map from the expected feature name to the expected errors for that feature, e.g.
expected_feature_errors={
User.id: [ChalkError(...), ChalkError(...)]
}
errors={
"user.id": [ChalkError(...), ChalkError(...)]
}
The time at which to evaluate the query. If not specified, the current time will be used.
This parameter is complex in the context of online_query
since the online store
only stores the most recent value of an entity's features. If now
is in the past,
it is extremely likely that None
will be returned for cache-only features.
This parameter is primarily provided to support:
If you are trying to perform an exploratory analysis of past feature values, prefer offline_query
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery
, the query will automatically pull outputs
and options specified in the matching NamedQuery
.
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery
with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
The relative tolerenance to allow for float equality.
If you specify both float_rel_tolerance
and float_abs_tolerance
,
the numbers will be considered equal if either tolerance is met.
Equivalent to:
abs(a - b) <= float_rel_tolerance * max(abs(a), abs(b))
from chalk.client import ChalkClient
result = ChalkClient().check(
input={Actor.id: "nm0000001"},
assertions={Actor.num_movies: 40},
)
Chalk Feature Value Mismatch
┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Kind ┃ Name ┃ Value ┃
┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Expect │ actor.id │ nm0000001 │
│ Actual │ actor.id │ nm0000001 │
│ Expect │ actor.num_appearanc… │ 40 │
│ Actual │ actor.num_appearanc… │ 41 │
└────────┴──────────────────────┴───────────┘
Get a Chalk Dataset
containing data from a previously created dataset.
If an offline query has been created with a dataset name, .get_dataset
will
return a Chalk Dataset
.
The Dataset
wraps a lazily-loading Chalk DataFrame
that enables us to analyze
our data without loading all of it directly into memory.
See Offline Queries for more information.
The name of the Dataset
to return.
Previously, you must have supplied a dataset name upon an offline query.
Dataset names are unique for each environment.
If 'dataset_name' is provided, then 'job_id' should not be provided.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
A UUID returned in the Dataset
object from an offline query.
Dataset ids are unique for each environment.
If 'dataset_id' is provided, then 'dataset_name' and 'revision_id' should not be provided.
The unique id of the DatasetRevision
to return.
If a previously-created dataset did not have a name, you can look it
up using its unique job id instead.
If 'revision_id' is provided, then 'dataset_name' and 'dataset_id' should not be provided.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(timezone.utc)
X = ChalkClient().offline_query(
input={
User.id: uids,
},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset='my_dataset_name'
)
Some time later...
dataset = ChalkClient().get_dataset(
dataset_name='my_dataset_name'
)
...
or
dataset = ChalkClient().get_dataset(
job_id='00000000-0000-0000-0000-000000000000'
)
...
If memory allows:
df: pd.DataFrame = dataset.get_data_as_pandas()
Compute feature values from the offline store or by running offline/online resolvers.
See Dataset
for more information.
The time at which the given inputs should be observed for point-in-time correctness. If given a list of
times, the list must match the length of the input
lists. Each element of input_time corresponds with the
feature values at the same index of the input
lists.
See Temporal Consistency for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
A unique name that if provided will be used to generate and
save a Dataset
constructed from the list of features computed
from the inputs.
If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The maximum number of samples to include in the DataFrame
.
If not specified, all samples will be returned.
If True, progress bars will be shown while the query is running.
Primarily intended for use in a Jupyter-like notebook environment.
This flag will also be propagated to the methods of the resulting
Dataset
.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Used to control whether resolvers are allowed to run in order to compute feature values.
If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.
A list of features that will always be sampled, and thus always excluded from recompute. Should not overlap with any features used in "recompute_features" argument.
If specified, the query will only be run on data observed after this timestamp. Accepts strings in ISO 8601 format.
If specified, the query will only be run on data observed before this timestamp. Accepts strings in ISO 8601 format.
If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
A SQL query that will query your offline store and use the result as input. See Input for more information.
Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs.
See ResourceRequests
for more information.
Boots a kubernetes job to run the queries in their own pods, separate from the engine and branch servers. This is useful for large datasets and jobs that require a long time to run.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
input={
User.id: uids,
},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
Triggers a resolver to run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
If specified, the resolver will only ingest data observed before this timestamp. Accepts strings in ISO 8601 format.
If specified, the resolver will only ingest data observed after this timestamp. Accepts strings in ISO 8601 format.
from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
resolver_fqn="mymodule.fn"
)
Retrieves the status of a resolver run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
from chalk.client import ChalkClient
ChalkClient().get_run_status(
run_id="3",
)
ResolverRunResponse(
id="3",
status=ResolverRunStatus.SUCCEEDED
)
Targets feature observation values for deletion and performs deletion online and offline.
An optional list of the feature names of the features that should be deleted for the targeted primary keys. Not specifying this and not specifying the "tags" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "tags" parameter are mutually exclusive.
An optional list of tags that specify features that should be targeted for deletion. If a feature has a tag in this list, its observations for the primary keys you listed will be targeted for deletion. Not specifying this and not specifying the "features" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "features" parameter are mutually exclusive.
Holds any errors (if any) that occurred during the drop request. Deletion of a feature may partially-succeed.
from chalk.client import ChalkClient
ChalkClient().delete_features(
namespace="user",
features=["name", "email", "age"],
primary_keys=[1, 2, 3]
)
Performs a drop on features, which involves a deletes all their data (both online and offline). Once the feature is reset in this manner, its type can be changed.
Holds any errors (if any) that occurred during the drop request. Dropping a feature may partially-succeed.
from chalk.client import ChalkClient
ChalkClient().drop_features(
namespace="user",
features=["name", "email", "age"],
)
Upload data to Chalk for use in offline resolvers or to prime a cache.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The errors encountered from uploading features.
from chalk.client import ChalkClient
ChalkClient().upload_features(
input={
User.id: 1,
User.name: "Katherine Johnson"
}
)
Upload data to Chalk for use in offline resolvers or to prime a cache.
One of three types:
pandas
, polars
, or chalk.DataFrame
.The environment under which to run the upload. API tokens can be scoped to an environment. If no environment is specified in the upload, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The errors encountered from uploading features.
from chalk.client import ChalkClient
ChalkClient().multi_upload_features(
input=[
{
User.id: 1,
User.name: "Katherine Johnson"
},
{
User.id: 2,
User.name: "Eleanor Roosevelt"
}
]
)
Get the most recent feature values from the offline store.
See Offline Queries for more information.
A pandas.DataFrame
with columns equal to the names of the features in output,
and values representing the value of the most recent observation.
from chalk.client import ChalkClient
sample_df = ChalkClient().sample(
output=[
Account.id,
Account.title,
Account.user.full_name
],
max_samples=10
)
Create a new branch based off of a deployment from the server. By default, uses the latest live deployment.
The specific deployment ID to use for the branch. If not specified, the latest live deployment on the server will be used. You can see which deployments are available by clicking on the 'Deployments' tab on the project page in the Chalk dashboard.
A response object containing metadata about the branch.
from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
Point the ChalkClient
at the given branch.
If branch_name
is None, this points the client at the
active non-branch deployment.
If the branch does not exist or if branch deployments are not enabled for the current environment, this method raises an error.
from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
client.set_branch("my-new-branch")
client.set_branch(None)
Returns a BranchGraphSummary
object that contains the
state of the branch server: Which resolver/features are
defined, and the history of live notebook updates on the
server.
The branch to query. If not specified, the branch is
expected to be included in the constructor for ChalkClient
.
Sets the incremental cursor for a resolver or scheduled query.
The resolver. Can be a function or the string name of a function.
Exactly one of resolver
and scheduled_query
is required.
from chalk.client import ChalkClient
client = ChalkClient()
client.set_incremental_cursor(
resolver="my_resolver",
max_ingested_timestamp=datetime.now(),
)
Gets the incremental cursor for a resolver or scheduled query.
The resolver. Can be a function or the string name of a function.
Exactly one of resolver
and scheduled_query
is required.
An object containing the max_ingested_timestamp
and incremental_timestamp
.
from chalk.client import ChalkClient
client = ChalkClient()
client.get_incremental_cursor(resolver="my_resolver")
Tests a streaming resolver and its ability to parse and resolve messages. See Streams for more information.
The number of messages to digest from the stream source. As messages may not be incoming into the stream, this action may time out.
A filepath from which test messages will be ingested. This file should be newline delimited json as follows:
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
Each line may optionally contain a timezone string as a value to the key "message_timestamp".
Alternatively, keys can be supplied in code along with the "test_message_bodies" argument. Both arguments must be the same length.
Message bodies can be supplied in code as strings, bytes, or Pydantic models along with the "test_message_keys" argument. Both arguments must be the same length.
A simple wrapper around a status and optional error message.
Inspecting StreamResolverTestResponse.features
will return the test results, if they exist.
Otherwise, check StreamResolverTestResponse.errors
and StreamResolverTestResponse.message
for errors.
from chalk.streams import stream, KafkaSource
from chalk.client import ChalkClient
from chalk.features import Features, features
import pydantic
# This code is an example of a simple streaming feature setup. Define the source
stream_source=KafkaSource(...)
# Define the features
@features(etl_offline_to_online=True, max_staleness="7d")
class StreamingFeature:
id: str
user_id: str
card_id: str
# Define the streaming message model
class StreamingMessage(pydantic.BaseModel):
card_id: str
user_id: str
# Define the mapping resolver
@stream(source=stream_source)
def our_stream_resolver(
m: StreamingMessage,
) -> Features[StreamingFeature.id, StreamingFeature.card_id, StreamingFeature.user_id]:
return StreamingFeature(
id=f"{m.card_id}-{m.user_id}",
card_id=m.card_id,
user_id=m.user_id,
)
# Once you have done a `chalk apply`, you can test the streaming resolver with custom messages as follows
client = ChalkClient()
keys = ["my_key"] * 10
messages = [StreamingMessage(card_id="1", user_id=str(i)).json() for i in range(10)]
resp = client.test_streaming_resolver(
resolver="our_stream_resolver",
message_keys=keys,
message_bodies=messages,
)
print(resp.features)
The value of the requested feature. If an error was encountered in resolving this feature, this field will be empty.
The ChalkError
describes an error from running a resolver
or from a feature that can't be validated.
The category of the error, given in the type field for the error codes. This will be one of "REQUEST", "NETWORK", and "FIELD".
Class wrapper around revisions for Datasets.
Loads a pl.DataFrame
containing the output. Use .to_polars_lazyframe()
if you want
a LazyFrame
instead, which allows local filtering of datasets that are larger than memory.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pl.LazyFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pl.LazyFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A polars.DataFrame
materializing query output data.
Loads a pl.LazyFrame
containing the output. This method is appropriate for working with larger-than-memory datasets.
Use .to_polars()
if you want a DataFrame
instead.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pl.LazyFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pl.LazyFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pl.LazyFrame
materializing query output data.
Loads a pl.LazyFrame
containing the output.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pl.LazyFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pl.LazyFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pl.LazyFrame
materializing query output data.
Loads a pd.DataFrame
containing the output.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pd.DataFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pd.DataFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pd.DataFrame
materializing query output data.
Loads a Chalk DataFrame
containing the output.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting DataFrame
.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Returns a list of the output uris for the revision. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.
Returns an object that loads the summary statistics of a dataset revision.
The dataframe can be retrieved by calling to_polars()
or to_pandas()
on the return object.
Data will be stored in .Parquet format. The URIs should be considered temporary,
and will expire after a server-defined time period.
Returns an object that loads a preview of a dataset revision.
The dataframe can be retrieved by calling to_polars()
or to_pandas()
on the return object.
Data will be stored in .Parquet format. The URIs should be considered temporary,
and will expire after a server-defined time period.
Waits for an offline query job to complete. Raises if the query is unsuccessful, otherwise returns itself on success.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Downloads output files pertaining to the revision to given path.
Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting DataFrame
.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Loads a pl.LazyFrame
containing the inputs.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pl.LazyFrame
materializing query input data.
Waits for the revision job to complete.
ChalkClient.offline_query
returns a DatasetRevision
instance immediately after
submitting the revision job. This method can be used to wait for the
revision job to complete.
Once the revision job is complete, the status
attribute of the
DatasetRevision
instance will be updated to reflect the status of the
revision job.
If the revision job was successful, you can then use methods such as
get_data_as_pandas()
without having to wait for the revision job to
complete.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Downloads the resolver replay data for the given resolver in the revision, provided the revision had store_plan_stages enabled.
The replay data is functionally similar to viewing the intermediate results on the plan explorer.
If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Wrapper around Offline Query results.
Datasets are obtained by invoking ChalkClient.offline_query()
.
Dataset
instances store important metadata and enable the retrieval of
offline query outputs.
from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset: Dataset = ChalkClient().offline_query(
input={
User.id: uids,
},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
df.recompute(features=[User.fraud_score], branch="feature/testing")
A list of all DatasetRevision
instances belonging to this dataset.
Loads a pl.DataFrame
containing the output. Use .to_polars_lazyframe()
if you want
a LazyFrame
instead, which allows local filtering of datasets that are larger than memory.
A pl.DataFrame
materializing query output data.
Loads a pl.LazyFrame
containing the output.
A pl.LazyFrame
materializing query output data.
Loads a pd.DataFrame
containing the output.
A pd.DataFrame
materializing query output data.
Loads a pd.DataFrame
containing the output of the most recent revision.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pd.DataFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pd.DataFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pd.DataFrame
materializing query output data.
Returns a list of the output uris for the revision. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.
Whether to return the primary key feature in a column
named "__chalk__.__id__"
in the resulting pd.DataFrame
.
Whether to return the input-time feature in a column
named "__chalk__.CHALK_TS"
in the resulting pd.DataFrame
.
If set to a non-empty str
, used as the input-time column name.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Waits for an offline query job to complete. Returns a list of errors if unsuccessful, or None if successful.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Downloads output files pertaining to the revision to the given path.
Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.
An executor to use to download the data in parallel. If not specified, the default executor will be used.
How long to wait, in seconds, for job completion before raising a TimeoutError
.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
from chalk.client import ChalkClient, Dataset
from datetime import datetime, timezone
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
input={User.id: uids},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset',
)
dataset.download_data('my_directory')
Returns an object that loads the summary statistics of a dataset revision.
The dataframe can be retrieved by calling to_polars()
or to_pandas()
on the return object.
Data will be stored in .Parquet format. The URIs should be considered temporary,
and will expire after a server-defined time period.
Returns an object that loads a preview of a dataset revision.
The dataframe can be retrieved by calling to_polars()
or to_pandas()
on the return object.
Data will be stored in .Parquet format. The URIs should be considered temporary,
and will expire after a server-defined time period.
Loads a pl.LazyFrame
containing the inputs that were used to create the dataset.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
A pl.LazyFrame
materializing query input data.
Creates a new revision of this Dataset
by recomputing the specified features.
Carries out the new computation on the branch specified when constructing the client.
A list of specific features to recompute. Features that don't exist in the dataset will be added. Features that already exist in the dataset will be recomputed. If not provided, all the existing features in the dataset will be recomputed.
If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.
If True, progress bars will be shown while recomputation is running.
This flag will also be propogated to the methods of the resulting
Dataset
.
If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
Boots a kubernetes job to run the queries in their own pods, separate from the engine and branch servers. This is useful for large datasets and jobs that require a long time to run. This must be specified as True to run this job asynchronously, even if the previous revision was run asynchronously.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
If no branch was provided to the Chalk Client.
from chalk.client import ChalkClient
dataset = ChalkClient(branch="data_science").offline_query(...)
df = dataset.get_data_as_polars()
# make changes to resolvers in your project
dataset.recompute()
new_df = dataset.get_data_as_polars() # receive newly computed data
Downloads the resolver replay data for the given resolver in the latest revision of the dataset.
The replay data is functionally similar to viewing the intermediate results on the plan explorer.
If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
The category of an error.
For more detailed error information, see ErrorCode
Request errors are raised before execution of your resolver code. They may occur due to invalid feature names in the input or a request that cannot be satisfied by the resolvers you have defined.
Field errors are raised while running a feature resolver for a particular field. For this type of error, you'll find a feature and resolver attribute in the error type. When a feature resolver crashes, you will receive null value in the response. To differentiate from a resolver returning a null value and a failure in the resolver, you need to check the error schema.
The detailed error code.
For a simpler category of error, see ErrorCodeCategory
.
Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs. Note that making these too large could prevent your job from being scheduled, so please test before using these in a recurring pipeline.
CPU requests: Increasing this will make some Chalk operations that are parallel and CPU-bound faster. Default unit is physical CPU cores, i.e. "8" means 8 CPU cores, "0.5" means half of a CPU core. An alternative unit is "millicore", which is one-thousandth of a CPU core, i.e. 500m is half of a CPU core.
Memory requests: you can use these to give your pod more memory, i.e. to prevent especially large jobs from OOMing. Default unit is bytes, i.e. 1000000000 is 1 gigabyte of memory. You can also specify a suffix such as K, M, or G for kilobytes, megabytes, and gigabytes, respectively. It's also possible to use the power of two equivalents, such as Ki, Mi, and Gi.
Chalk can use this for spilling intermediate state of some large computations, i.e. joins, aggregations, and sorting. Default unit is bytes, i.e. 1000000000 is 1 gigabyte of memory. You can also specify a suffix such as K, M, or G for kilobytes, megabytes, and gigabytes, respectively. It's also possible to use the power of two equivalents, such as Ki, Mi, and Gi.
Ephemeral storage for miscellaneous file system access. Should probably not be below 1Gi to ensure there's enough space for the Docker image, etc. Should also not be too high or else the pod will not be scheduled.
Convenience method for accessing feature result from the data response.
The FeatureResult
for the feature, if it exists.
from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature(User.name).ts
datetime.datetime(2023, 2, 5, 23, 25, 26, 427605)
data.get_feature("user.name").meta.cache_hit
False
Create a named query.
Named queries are aliases for specific queries that can be used by API clients.
A name for the named query—this can be versioned with the version parameter, but must otherwise be unique. The name of the named query shows up in the dashboard and is used to specify the outputs for a query.
A string specifying the version of the named query: version is not required, but if specified it must be a valid "semantic version".
The features which will be provided by callers of this query.
For example, [User.id]
. Features can also be expressed as snakecased strings,
e.g. ["user.id"]
.
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
If an empty sequence, the output will be set to all features on the namespace
of the query. For example, if you pass as input {"user.id": 1234}
, then the query
is defined on the User
namespace, and all features on the User
namespace
(excluding has-one and has-many relationships) will be used as outputs.
The owner of the query. This should be a Slack username or email address. This is used to notify the owner in case of incidents
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
from chalk import NamedQuery
# this query's name and version can be used to specify query outputs in an API request.
NamedQuery(
name="fraud_model",
version="1.0.0",
input=[User.id],
output=[User.age, User.fraud_score, User.credit_report.fico],
)
Context in which to execute a query.
Raised when constructing a ChalkClient
without valid credentials.
When this exception is raised, no explicit client_id
and client_secret
were provided, there was no ~/.chalk.yml
file with applicable credentials,
and the environment variables CHALK_CLIENT_ID
and CHALK_CLIENT_SECRET
were not set.
You may need to run chalk login
from your command line, or check that your
working directory is set to the root of your project.
Duration is used to describe time periods in natural language. To specify using natural language, write the count of the unit you would like, followed by the representation of the unit.
Chalk supports the following units:
Signifier | Meaning |
---|---|
w | Weeks |
d | Days |
h | Hours |
m | Minutes |
s | Seconds |
ms | Milliseconds |
As well as the special keywords "infinity"
and "all"
.
Examples:
Signifier | Meaning |
---|---|
"10h" | 10 hours |
"1w 2m" | 1 week and 2 minutes |
"1h 10m 2s" | 1 hour, 10 minutes, and 2 seconds |
"infinity" | Unbounded time duration |
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environments
can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersSee more at Environments
Tags allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
Like Environments, tags control when resolvers run based on the Online Context or Training Context matching the tags provided to the resolver decorator. Resolvers optionally take a keyword argument named tags that can take one of three types:
None
(default) - The resolver will be a candidate to run for every set of tags.str
- The resolver will run only if this tag is provided.list[str]
- The resolver will run in all of the specified tags match.See more at Tags
Get the tags for a feature, feature class, or resolver.
If the supplied variable is not a feature, feature class, or resolver.
Feature tags
@features(tags="group:risk")
class User:
id: str
# :tags: pii
email: str
tags(User.id)
['group:risk']
Feature class tags
tags(User)
['group:risk']
Feature + feature class tags
tags(User.email)
['pii', 'group:risk']
Get the description of a feature, feature class, or resolver.
If the supplied variable is not a feature, feature class, or resolver.
@features
class RocketShip:
# Comments above a feature become
# descriptions for the feature!
software_version: str
description(RocketShip.software_version)
'Comments above a feature become descriptions for the feature!'
Determine whether a feature is a feature time.
See Time for more details on FeatureTime
.
True
if the feature is a FeatureTime
and False
otherwise.
from chalk.features import features
@features
class User:
id: str
updated_at: datetime = feature_time()
assert is_feature_time(User.updated_at) is True
assert is_feature_time(User.id) is False
Any class decorated by @dataclass
.
There isn't a base class for dataclass
, so we use this
TypeAlias
to refer to indicate any class decorated with
@dataclass
.
The base type for Chalk exceptions.
This exception makes error handling easier, as you can look only for this exception class.
Use chalk.functions
to apply common conversions to your features.
Create a conditional expression, roughly equivalent to
if condition:
return if_true
else:
return if_false
Unlike a Python if/else, all three inputs (condition, if_true, if_false)
are evaluated
in parallel for all rows, and then the correct side is selected based on the result of
the condition expression.
from chalk import _
from chalk.features import features
@features
class Transaction:
id: int
amount: int
risk_score: bool = _.if_then_else(
_.amount > 10_000,
_.amount * 0.1,
_.amount * 0.05,
)
Build a conditional expression.
import chalk.functions as F
from chalk.features import _, features
@features
class User:
id: str
age: float
age_group: str = (
F.when(_.age < 1)
.then("baby")
.when(_.age < 3)
.then("toddler")
.when(_.age < 13)
.then("child")
.when(_.age < 18)
.then("teen")
.otherwise(F.cast(F.cast(F.floor(_.age / 10), int), str) + "0s")
)
Evaluates if the string matches the pattern.
Patterns can contain regular characters as well as wildcards. Wildcard characters can be escaped using the single character specified for the escape parameter. Matching is case-sensitive.
Note: The wildcard %
represents 0, 1 or multiple characters
and the wildcard _
represents exactly one character.
For example, the pattern John%
will match any string that starts
with John
, such as John
, JohnDoe
, JohnSmith
, etc.
The pattern John_
will match any string that starts with John
and is followed by exactly one character, such as JohnD
, JohnS
, etc.
but not John
, JohnDoe
, JohnSmith
, etc.
import chalk.functions as F
from chalk.features import _, features
@features
class User:
id: str
name: str
is_john: bool = F.like(_.name, "John%")
Finds the first occurrence of the regular expression pattern in the string and returns the capturing group number group.
import chalk.functions as F
from chalk.features import _, features
@features
class HiddenNumber:
id: str
hidden_number: str = "O0OOO",
number: str = F.regexp_extract(_.time, r"([0-9]+)", 1)
Finds all occurrences of the regular expression pattern in string and returns the capturing group number group.
import chalk.functions as F
from chalk.features import _, features
@features
class Time:
id: str
time: str = "1y 342d 20h 60m 6s",
processed_time: list[str] = F.regexp_extract_all(_.time, "([0-9]+)([ydhms])", 2)
Evaluates the regular expression pattern and determines if it is contained within string.
This function is similar to the like
function, except that the pattern only needs to be
contained within string, rather than needing to match all the string.
In other words, this performs a contains operation rather than a match operation.
You can match the entire string by anchoring the pattern using ^
and $
.
import chalk.functions as F
from chalk.features import _, features
@features
class User:
id: str
name: str
is_john: bool = F.regexp_like(_.name, "^John.*$")
Extract a scalar from a JSON feature using a JSONPath expression. The value of the referenced path must be a JSON scalar (boolean, number, string).
import chalk.functions as F
from chalk import JSON
from chalk.features import _, features
@features
class User:
id: str
profile: JSON
favorite_color: str = F.json_value(_.raw, "$.prefs.color")
Extract the day of the month from a date.
The supported types for x are date and datetime.
Ranges from 1 to 31 inclusive.
from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction
id: str
date: date
day: int = F.day_of_month(_.date)
from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction
id: str
date: date
day: int = F.day_of_week(_.date)
Compute the total number of seconds covered in a duration.
from datetime import date
from chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
id: str
signup: date
last_login: date
signup_to_last_login_days: float = F.total_seconds(_.las_login - _.signup) / (60 * 60 * 24)
Extract a single-column DataFrame
into a list of values for that column.
from datetime import datetime
import chalk.functions as F
from chalk import DataFrame
from chalk.features import _, features
@features
class Merchant:
id: str
events: "DataFrame[FraudEvent]"
fraud_codes: list[str] = F.array_agg(_.events[_.is_fraud == True, _.tag])
@features
class FraudEvent:
id: int
tag: str
is_fraud: bool
mer_id: Merchant.id
Returns the first n items from a dataframe or has-many
from datetime import datetime
import chalk.functions as F
from chalk import windowed, DataFrame, Windowed
from chalk.features import _, features, Primary
@features
class Merchant:
id: str
@features
class ConfirmedFraud:
id: int
trn_dt: datetime
is_fraud: int
mer_id: Merchant.id
@features
class MerchantFraud:
mer_id: Primary[Merchant.id]
merchant: Merchant
confirmed_fraud: DataFrame[ConfirmedFraud] = dataframe(
lambda: ConfirmedFraud.mer_id == MerchantFraud.mer_id,
)
first_five_merchant_window_fraud: Windowed[list[int]] = windowed(
"1d",
"30d",
expression=F.head(_.confirmed_fraud[_.trn_dt > _.chalk_window, _.id, _.is_fraud == 1], 5)
)
Returns a subset of the original array
Starting index of the slice (0-indexed). If negative, slice starts from the end of the array
from datetime import datetime
import chalk.functions as F
from chalk.features import _, features
@features
class Wordle:
id: str
words: list[str] = ["crane", "kayak", "plots", "fight", "exact", "zebra", "hello", "world"]
three_most_recent_words: list[str] = F.slice(_.words, -3, 3) # computes ["zebra", "hello", "world"]
Runs a sagemaker prediction on the specified endpoint, passing in the serialized bytes as a feature.
The content type of the input data. If not specified, the content type will be inferred from the endpoint.
An optional argument which specifies the target model for the prediction. This should only be used for multimodel sagemaker endpoints.
An optional argument which specifies the target variant for the prediction. This should only be used for multi variant sagemaker endpoints.
import chalk.functions as F
from chalk.features import _, features
@features
class User:
id: str
encoded_sagemaker_data: bytes
prediction: float = F.sagemaker_predict(
_.encoded_sagemaker_data,
endpoint="prediction-model_1.0.1_2024-09-16",
target_model="model_v2.tar.gz",
target_variant="blue"
)
Create an offline query which runs on a schedule.
Scheduled queries do not produce datasets, but persist their results in the online and/or offline feature stores.
By default, scheduled queries use incrementalization to only ingest data that has been updated since the last run.
A unique name for the scheduled query. The name of the scheduled query will show up in the dashboard and will be uset to set the incremetalization metadata.
A cron schedule or a Duration
object representing the interval at which
the query should run.
The features that this query will compute. Namespaces are exploded into all features in the namespace.
If set to None, Chalk will incrementalize resolvers in the query's root namespaces.
If set to a list of resolvers, this set will be used for incrementalization.
Incremental resolvers must return a feature time in its output, and must return a DataFrame
.
Most commonly, this will be the name of a SQL file resolver. Chalk will ingest all new data
from these resolvers and propagate changes to values in the root namespace.
A scheduled query object.
from chalk.features import ScheduledQuery
# this scheduled query will automatically run every 5 minutes after `chalk apply`
ScheduledQuery(
name="ingest_users",
schedule="*/5 * * * *",
output=[User],
store_online=True,
store_offline=True,
)
from chalk.monitoring import Chart, Series
Chart(name="Request count").with_trigger(
Series
.feature_null_ratio_metric()
.where(feature=User.fico_score) > 0.2,
)
Change the window period for a Chart
.
Triggers are applied when a certain series is above or below a given value. The expression specifies the series, operand, and value as follows
A description to your Trigger
. Descriptions
provided here will be included in the alert message in
Slack or PagerDuty.
For Slack alerts, you can use the mrkdwn syntax described here: https://api.slack.com/reference/surfaces/formatting#basics
Class describing a series of data in two dimensions, as in a line chart. Series should be instantiated with one of the classmethods that specifies the metric to be tracked.
Creates a Series
of metric kind FeatureStaleness
.
The time window to calculate the metric over.
A new FeatureStalenessSeries
instance that inherits from the Series
class.
Creates a Series
of metric kind FeatureValue
.
The time window to calculate the metric over.
A new FeatureValueSeries
instance that inherits from the Series
class.
Creates a Series
of metric kind ResolverLatency
.
The time window to calculate the metric over.
A new ResolverLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind QueryLatency
.
The time window to calculate the metric over.
A new QueryLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind CronLatency
.
The time window to calculate the metric over.
A new CronLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamMessageLatency
.
The time window to calculate the metric over.
A new StreamMessageLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamWindowLatency
.
The time window to calculate the metric over.
A new StreamWindowLatencySeries
instance that inherits from the Series
class.
Creates a Series
of metric kind StreamLag
.
The time window to calculate the metric over.
A new StreamLagSeries
instance that inherits from the Series
class.
Given two DataFrame
s, left
and right
, check if left == right
,
and raise otherwise.
If False
, allows the assert/test to succeed if the required columns are present,
irrespective of the order in which they appear.
If False
, allows the assert/test to succeed if the required rows are present,
irrespective of the order in which they appear; as this requires
sorting, you cannot set on frames that contain un-sortable columns.
If left
does not equal right
If chalkpy[runtime]
is not installed.
@online
def get_average_spend_30d(
spend: User.cards[after(days_ago=30)],
) -> User.average_spend_30d:
return spend.mean()
with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)):
now = datetime.now(tz=timezone.utc)
get_average_spend_30d(
spend=DataFrame([
Card(spend=10, ts=now - timedelta(days=31)),
Card(spend=20, ts=now - timedelta(days=29)),
Card(spend=30, ts=now - timedelta(days=28)),
])
)