Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:
from chalk.features import features, DataFrame
@features
class CreditCard:
id: int
user_id: "User.id"
limit: float
@features
class User:
id: int
name: str
email: str
credit_cards: DataFrame[CreditCard]
total_limit: float = _.credit_cards[_.limit].sum()
Features can be nested, as with credit_cards
above.
In this section, we'll dive into API components that make up the
building blocks of features.
The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Setting etl_offline_to_online
on a feature class assigns it to all features on the
class which do not explicitly specify etl_offline_to_online
.
When a feature is expensive or slow to compute, you may wish to cache its value.
Chalk uses the terminology "maximum staleness" to describe how recently a feature
value needs to have been computed to be returned without re-running a resolver.
Assigning a max_staleness
to the feature class assigns it to all features on the
class which do not explicitly specify a max_staleness
value of their own.
@features(
owner="andy@chalk.ai",
max_staleness="30m",
etl_offline_to_online=True,
tags="user-group",
)
class User:
id: str
# Comments here appear in the web!
# :tags: pii
name: str | None
# :owner: userteam@mycompany.com
location: LatLng
Add metadata and configuration to a feature.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags
The maximum version for a feature. Versioned features can be
referred to with the @
operator:
@features
class User:
id: str
score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"
See more at Versioning
The default version for a feature. When you reference a
versioned feature without the @
operator, you reference
the default_version
. Set to 1
by default.
@features
class User:
id: str
score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"
See more at Default versions
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching
When True
(default), Chalk will cache all values, including nulls.
When False
, Chalk will not update the null entry in the cache.
When "evict_nulls"
, Chalk will evict the entry that would have been
null from the cache, if it exists.
Concretely, suppose the current state of a database is {a: 1, b: 2}, and you write a row {a: 2, b: None}. Here is the expected result in the db:
cache_nulls=True
(default)cache_nulls=False
cache_nulls="evict_nulls"
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Read more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of validations to apply to this feature. Generally, max
, min
, max_length
,
and min_length
are more convenient, but the parameter strict
applies to all
of those parameters. Use this parameter if you want to mix strict and non-strict validations.
The default value of the feature if it otherwise can't be computed.
If you don't need to specify other metadata, you can also assign a default
in the same way you would assign a default to a dataclass
:
from chalk.features import features
@features
class User:
num_purchases: int = 0
An underscore expression for defining the feature. Typically,
this value is assigned directly to the feature without needing
to use the feature(...)
function. However, if you want to define
other properties, like a default
or max_staleness
, you'll
want to use the expression
keyword argument.
from chalk.features import features
from chalk import _
@features
class Receipt:
subtotal: int
tax: int
total: int = feature(expression=_.subtotal + _.tax, default=0)
See more at Underscore
If True
, this feature is considered deprecated, which impacts the dashboard, alerts,
and warnings.
The type of the input feature, given by _TRich
.
from chalk.features import Primary, features, feature
@features
class User:
uid: Primary[int]
# Uses a default value of 0 when one cannot be computed.
num_purchases: int = 0
# Description of the name feature.
# :owner: fraud@company.com
# :tags: fraud, credit
name: str = feature(
max_staleness="10m",
etl_offline_to_online=True
)
score: int = feature(
version=2, default_version=2
)
Specify a feature that represents a one-to-one relationship.
This function allows you to explicitly specify a join condition between
two @features
classes. When there is only one way to join two classes,
we recommend using the foreign-key definition instead of this has_one
function. For example, if you have a User
class and a Card
class, and
each user has one card, you can define the Card
and User
classes as
follows:
@features
class User
id: str
@features
class Card
id: str
user_id: User.id
user: User
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
card: Card = has_one(
lambda: User.id == Card.user_id
)
Specify a feature that represents a one-to-many relationship.
The join condition between @features
classes.
This argument is callable to allow for forward
references to members of this class and the joined
class.
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
cards: DataFrame[Card] = has_many(
lambda: User.id == Card.user_id
)
The function after
can be used with DataFrame
to compute windowed features.
after
filters a DataFrame
relative to the current time in context, such that if
the after
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where now - {time_window} <= t <= now
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to after
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]
The function before
can be used with DataFrame
to compute windowed features.
before
filters a DataFrame
relative to the current time in context such that
if the before
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where t <= now - {time_window}
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to before
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]
Declare a windowed feature.
@features
class User:
failed_logins: Windowed[int] = windowed("10m", "24h")
Create a windowed feature.
See more at Windowed
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.
See more at Caching
Sets a maximum age for values eligible to be retrieved from the offline store, defined in relation to the query's current point-in-time.
Feature versions allow you to manage a feature as its definition changes over time.
The version
keyword argument allows you to specify the
maximum number of versions available for this feature.
See more at Versioning
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
See more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of Validations to apply to this feature.
See more at https://docs.chalk.ai/api-docs#Validation
The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum()
.
Configuration for aggregating data. Pass bucket_duration
with a
Duration to configure the bucket size for aggregation.
If True
, each of the windows will use a bucket duration equal to its
window duration.
See more at Materialized window aggregations
Metadata for the windowed feature, parameterized by
TPrim
(the primitive type of the feature) and
TRich
(the decoded type of the feature, if decoder
is provided).
from chalk import windowed, Windowed
@features
class User:
id: int
email_count: Windowed[int] = windowed(days=range(1, 30))
logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]
Create a windowed feature with grouping.
See more at Grouped materialized window aggregations
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
The expression to compute the feature. This is an underscore expression,
like _.transactions[_.amount].sum()
.
Configuration for aggregating data. Pass bucket_duration
with a
Duration to configure the bucket size for aggregation.
See more at Materialized window aggregations
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
from chalk import group_by_windowed, DataFrame
from chalk.features import features
@features
class Email:
id: int
user_id: "User.id"
@features
class User:
id: int
emails: DataFrame[Email]
emails_by_category: DataFrame = group_by_windowed(
"10m", "30m",
expression=_.emails.group_by(_.category).count(),
)
Marks a feature as the primary feature for a feature class.
Features named id
on feature classes without an explicit primary
feature are declared primary keys by default, and don't need to be
marked with Primary
.
If you have primary key feature with a name other than id
,
you can use this marker to indicate the primary key.
from chalk.features import features
from chalk import Primary
@features
class User:
username: Primary[str]
Specify explicit data validation for a feature.
The feature()
function can also specify these validations,
but this class allows you to specify both strict and non-strict
validations at the same time.
from chalk.features import features, feature
@features
class User:
fico_score: int = feature(
validations=[
Validation(min=300, max=850, strict=True),
Validation(min=300, max=320, strict=False),
Validation(min=840, max=850, strict=False),
]
)
# If only one set of validations were needed,
# you can use the [`feature`](#feature) function instead:
first_name: str = feature(
min_length=2, max_length=64, strict=True
)
The Vector class can be used type annotation to denote a Vector feature.
Instances of this class will be provided when working with raw vectors inside of resolvers.
Generally, you do not need to construct instances of this class directly, as Chalk will
automatically convert list-like features into Vector
instances when working with a Vector
annotation.
data: numpy.Array | list[float] | pyarrow.FixedSizeListScalar The vector values
from chalk.features import Vector, features
@features
class Document:
embedding: Vector[1536]
Define a nearest neighbor relationship for performing Vector similarity search.metric: "l2" | "ip" | "cos" The metric to use to compute distance between two vectors. L2 Norm ("l2"), Inner Product ("ip"), and Cosine ("cos") are supported. Defaults to "l2".
A nearest neighbor relationship filter.
Decorator to create an online resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
Like tags, when
can filter when a resolver is eligible
to run. Unlike tags, when
can use feature values,
so that you can write resolvers like:
@online(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
...
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time to build a static computation graph. If True, all inputs will either be StaticOperators (for has-many and dataframe relationships) or StaticExpressions (for individual features). The resolver must return a StaticOperator as output.
Whether this resolver returns all ids of a given namespace.
To have this annotation, the resolver must take no arguments
and return a DataFrame
. Typically, this annotation would
be used in a SQL-file resolver.
A list of features that must be unique for each row of the output. This enables unique optimizations in the resolver execution. Only applicable to resolvers that return a DataFrame.
A list of features that correspond to partition keys in the data source. This field indicates that this resolver executes its query against a data storage system that is partitioned by a particular set of columns. This is most common with data-warehouse sources like Snowflake, BigQuery or Databricks.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@online
def name_match(
name: User.full_name,
account_name: User.bank_account.title
) -> User.account_name_match_score:
if name.lower() == account_name.lower():
return 1.
return 0.
Decorator to create an offline resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Like tags, when
can filter when a resolver
is eligible to run. Unlike tags, when
can use feature values,
so that you can write resolvers like::
@offline(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
...
Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time to
build a static computation graph. If True
, all inputs will either
be StaticOperators
(for has-many and dataframe relationships) or
StaticExpressions
(for individual features). The resolver must
return a StaticOperator
as output.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@offline(cron="1h")
def get_fraud_score(
email: User.email,
name: User.name,
) -> User.fraud_score:
return socure.get_sigma_score(email, name)
Run an online or offline resolver on a schedule.
This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.
The period of the cron job. Can be either a crontab ("0 * * * *"
)
or a Duration
("2h"
).
Optionally, a function to filter down the arguments to consider.
See Filtering examples for more information.
Explicitly provide the sample function for the cron job.
See Custom examples for more information.
Using a filter
def only_active_filter(v: User.active):
return v
@online(cron=Cron(schedule="1d", filter=only_active_filter))
def score_user(d: User.signup_date) -> User.score:
return ...
Using a sample function
def s() -> DataFrame[User.id]:
return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others
Read more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The arguments to pass to the decorated function.
If one of the arguments is a DataFrame
with a
filter or projection applied, the resolver will
only be called with the filtered or projected
data. Read more at
https://docs.chalk.ai/docs/unit-tests#data-frame-inputs
The result of calling the decorated function
with args
. Useful for unit-testing.
Read more at Unit Tests
@online
def get_num_bedrooms(
rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
return len(rooms)
rooms = [
Room(id=1, name="bedroom"),
Room(id=2, name="kitchen"),
Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
The type of machine to use.
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
Chalk includes a DataFrame
class that models tabular data in much the same
way that pandas
does. However, there are some key differences that
allow the Chalk DataFrame
to increase type safety and performance.
Like pandas, Chalk's DataFrame
is a two-dimensional data structure with
rows and columns. You can perform operations like filtering, grouping,
and aggregating on a DataFrame
. However, there are two main differences.
DataFrame
is lazy and can be backed by multiple data sources, where a pandas.DataFrame
executes eagerly in memory.DataFrame[...]
can be used to represent a type of data with pre-defined filters.Unlike pandas, the implementation of a Chalk DataFrame
is lazy, and
can be executed against many different backend sources of data.
For example, in unit tests, a DataFrame
uses an implementation backed by polars.
But if your DataFrame
was returned from a SQL source,
filters and aggregations may be pushed down to the database for efficient
execution.
Each column of a Chalk DataFrame
typed by a Feature
type.
For example, you might have a resolver returning a DataFrame
containing user ids and names:
@features
class User:
id: int
name: str
email: str
@online
def get_users() -> DataFrame[User.id, User.name]:
return DataFrame([
User(id=1, name="Alice"),
User(id=2, name="Bob")
])
Construct a Chalk DataFrame
.
The data. Can be an existing pandas.DataFrame
,
polars.DataFrame
or polars.LazyFrame
,
a sequence of feature instances, or a dict
mapping
a feature to a sequence of values.
The strategy to use to handle missing values.
A feature value is "missing" if it is an ellipsis (...
),
or it is None
and the feature is not annotated as Optional[...]
.
The available strategies are:
'error'
: Raise a TypeError
if any missing values are found.
Do not attempt to replace missing values with the default
value for the feature.'default_or_error'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, raise a TypeError
.'default_or_allow'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, leave it as None
. This is the default strategy.'allow'
: Allow missing values to be stored in the DataFrame
.
This option may result non-nullable features being assigned
None
values.Row-wise construction
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
Column-wise construction
df = DataFrame({
User.id: [1, 2],
User.first: ["Sam", "Iris"],
User.last: ["Wu", "Xi"]
})
Construction from polars.DataFrame
import polars
df = DataFrame(polars.DataFrame({
"user.id": [1, 2],
"user.first": ["Sam", "Iris"],
"user.last": ["Wu", "Xi"]
}))
Aggregate the DataFrame
by the specified columns.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Compute a histogram with fixed width bins.
The column to compute the histogram on. If not supplied, the DataFrame
is assumed to contain a single column.
DataFrame({
Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]
Group based on a time value (date
or datetime
).
The groups are defined by a time-based window, and optionally,
other columns in the DataFrame
. The "width" of the window is
defined by the period
parameter, and the spacing between the
windows is defined by the every
parameter. Note that if the
every
parameter is smaller than the period
parameter, then
the windows will overlap, and a single row may be assigned to
multiple groups.
As an example, consider the following DataFrame
:
val: a b c d e f g h
─────────●─────────●─────────●─────────●───────▶
time: A B C D
┌─────────┐
1 │ a b │ 1: [a, b]
└────┬────┴────┐
2 ◀───▶│ b c │ 2: [b, c]
every└────┬────┴────┐
3 ◀────────▶│ c d e │ 3: [c, d, e]
period └────┬────┴────┐
4 │d e f│ 4: [d, e, f]
└────┬────┴────┐
5 │ f │ 5: [f]
└────┬────┴────┐
6 │ │
└────┬────┴────┐
7 │ g h │ 7: [g, h]
└────┬────┴────┐
8 │g h │ 8: [g, h]
└─────────┘
In the above example, the sixth time bucket is empty, and
will not be included in the resulting DataFrame
.
from chalk import DataFrame, op
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
},
).group_by_hopping(
index=User.ts,
group={User.id: User.id},
agg={User.val: op.median(User.val)},
period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts │ User.val │
╞═════════╪══════════╪══════════╡
│ 1 │ 2020-1-1 │ 3 │
├─────────┼──────────┼──────────┤
│ 3 │ 2020-1-3 │ 10 │
╰─────────┴──────────┴──────────╯
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
User.first_name, User.first
)
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
User.first_name: User.first
})
Read a .csv file as a DataFrame
.
values = DataFrame.read_csv(
"s3://...",
columns={0: MyFeatures.id, 1: MyFeatures.name},
has_header=False,
)
Sort the DataFrame
by the given columns.
df = DataFrame({
User.a: [1, 2, 3],
User.b: [3, 2, 1],
})
df.sort(User.a)
a b
-----------
0 1 3
1 2 2
2 3 1
Get the underlying DataFrame
as a polars.LazyFrame
.
The underlying polars.LazyFrame
.
Get the underlying DataFrame
as a pyarrow.Table
.
The underlying pyarrow.Table
. This format is the canonical
representation of the data in Chalk.
Get the underlying DataFrame
as a pandas.DataFrame
.prefixed
Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name
, if
if prefixed=False, name
)
The data formatted as a pandas.DataFrame
.
Operations for aggregations in DataFrame
.
The class methods on this class are used to create
aggregations for use in DataFrame.group_by
.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
User.active: [True, True, False],
}
).group_by(
group={User.id: User.id}
agg={
User.val: op.product(User.val),
User.active: op.product(User.active),
}
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
│ 1 │ 2 │ 1 │
├─────────┼──────────┼─────────────┤
│ 3 │ 10 │ 0 │
╰─────────┴──────────┴─────────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 0.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 6.5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 2 │
├─────────┼──────────┤
│ 3 │ 1 │
╰─────────┴──────────╯
Concatenate the string values of col
and col2
in a DataFrame
.
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val='a'),
User(id=1, val='b'),
User(id=3, val='c'),
User(id=3, val='d'),
]
).group_by(
group={User.id: User.id},
agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ "ab" │
├─────────┼──────────┤
│ 3 │ "cd" │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=True,
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=False
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.
The aggregation, allowing you to continue to chain methods.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ null │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Create a Snowflake data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a PostgreSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a MySQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a DynamoDB data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details. DynamoDBSources can be queried via PartiQL SQL
resolvers.
You may override the ambient AWS credentials by providing either a client ID and secret, or a role ARN.
Create a BigQuery data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Redshift data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a CloudSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Testing SQL source.
If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.
The SQL source for use in Chalk resolvers.
source = SQLiteInMemorySource(name="RISK")
Create a Databricks data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Spanner data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Incremental settings for Chalk SQL queries.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter" mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
. This will incrementalize your query using the timestamp
of the latest row that has been ingested.
Chalk will also include another query parameter named "chalk_last_execution_timestamp"
that can be used instead.
This will incrementalize your query using the last time the query was executed.
incremental_timestamp:
If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.
If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.
The timestamp to set as the lower bound
Run a query from a SQL file.
This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL file resolvers for more information.
The path to the file with the sql file,
relative to the caller's file, or to the
directory that your chalk.yaml
file lives in.
Automatically ingest a table.
from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
name="users",
features=User,
).with_table(
name="accounts",
features=Account,
# Override one of the column mappings.
column_to_feature={
"acct_id": Account.id,
},
)
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Return the results represented by this Query as a DataFrame
.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Materialize the query.
Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.
Note: this requires the usage of the fields={...}
argument when used in conjunction with query_string
or query_sql_file
.
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
Generate a Chalk SQL file resolver from a filepath and a sql string.
This will generate a resolver in your web dashboard that can be queried,
but will not output a .chalk.sql
file.
The optional parameters are overrides for the comment key-value pairs at the top of the sql file resolver. Comment key-value pairs specify important resolver information such as the source, feature namespace to resolve, and other details. Note that these will override any values specified in the sql string. See Configuration for more information.
See SQL file resolvers for more information on SQL file resolvers.
Can either be a BaseSQLSource or a string.
If a string is provided, it will be used to infer the source by
first scanning for a source with the same name, then inferring
the source if it is a type, e.g. snowflake
if there is only
one database of that type. Optional if source
is specified in sql
.
from chalk import make_sql_file_resolver
from chalk.features import features
@features
class User:
id: int
name: str
make_sql_file_resolver(
name="my_resolver",
sql="SELECT user_id as id, name FROM users",
source="snowflake",
resolves=User,
kind="offline",
)
Decorator to create a stream resolver.
This parameter is defined when the streaming resolver returns a windowed feature.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of
tumbling windows as adjacently arranged bins of equal width.
Tumbling windows are most often used alongside max_staleness
to allow the features
to be sent to the online store and offline store after each window period.
Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.
See more at Window modes
A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.
See more at Parsing
A mapping from input BaseModel
attribute to Chalk feature attribute to support continuous streaming re-keying.
This parameter is required for continuous resolvers.
Features that are included here do not have to be explicitly returned in the stream resolver:
the feature will automatically be set to the key value used for aggregation.
See more at Keys
An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom event timestamping
A callable function! You can unit-test stream resolvers as you would unit-test any other code.
Decorator to create a sink. Read more at Sinks
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests
@sink
def process_updates(
uid: User.id,
email: User.email,
phone: User.phone,
):
user_service.update(
uid=uid,
email=email,
phone=phone
)
process_updates(123, "sam@chalk.ai", "555-555-5555")
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
Protocol used to communicate with brokers.
Valid values are "PLAINTEXT"
, "SSL"
, "SASL_PLAINTEXT"
, and "SASL_SSL"
.
Defaults to "PLAINTEXT"
.
Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL.
Valid values are "PLAIN"
, "GSSAPI"
, "SCRAM-SHA-256"
, "SCRAM-SHA-512"
, "OAUTHBEARER"
.
Defaults to "PLAIN"
.
Base class for all stream sources generated from @stream
.
Identifier for the dead-letter queue (DLQ) for the stream. If not specified, failed messages will be dropped. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub.
The ChalkClient
is the primary Python interface for interacting with Chalk.
You can use it to query data, trigger resolver runs, gather offline data, and more.
Create a ChalkClient
with the given credentials.
The client secret to use to authenticate. Can either be a service token secret or a user token secret.
The ID or name of the environment to use for this client.
Not necessary if your client_id
and client_secret
are for a service token scoped to a single environment.
If not present, the client will use the environment variable
CHALK_ENVIRONMENT
.
The API server to use for this client. Required if you are
using a Chalk Dedicated deployment. If not present, the client
will check for the presence of the environment variable
CHALK_API_SERVER
, and use that if found.
If specified, Chalk will route all requests from this client
instance to the relevant branch. Some methods allow you to
override this instance-level branch configuration by passing
in a branch
argument.
If specified, Chalk will route all requests from this client
instance to the relevant tagged deployment. This cannot be
used with the branch
argument.
If specified, Chalk will route all requests from this client instance to the relevant preview deployment.
The query server to use for this client. Required if you are using a standalone Chalk query engine deployment. If not present, the client will default to the value of api_server.
The default wait timeout, in seconds, to wait for long-running jobs to complete when accessing query results. Jobs will not time out if this timeout elapses. For no timeout, set to None. The default is no timeout.
The default wait timeout, in seconds, to wait for network requests to complete. If not specified, the default is no timeout.
Compute features values using online resolvers. See Chalk Clients for more information.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
If an empty sequence, the output will be set to all features on the namespace
of the query. For example, if you pass as input {"user.id": 1234}
, then the query
is defined on the User
namespace, and all features on the User
namespace
(excluding has-one and has-many relationships) will be used as outputs.
The time at which to evaluate the query. If not specified, the current time will be used.
This parameter is complex in the context of online_query since the online store
only stores the most recent value of an entity's features. If now
is in the past,
it is extremely likely that None
will be returned for cache-only features.
This parameter is primarily provided to support:
If you are trying to perform an exploratory analysis of past feature values, prefer offline_query
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery, the query will automatically pull outputs
and options specified in the matching NamedQuery.'
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
Returns metadata about the query execution under OnlineQueryResult.meta
.
This could make the query slightly slower.
For more information, see Chalk Clients.
If True
, the output of each of the query plan stages will be stored.
This option dramatically impacts the performance of the query,
so it should only be used for debugging.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
from chalk.client import ChalkClient
result = ChalkClient().query(
input={
User.name: "Katherine Johnson"
},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)
Execute multiple queries (represented by queries=
argument) in a single request. This is useful if the
queries are "rooted" in different @features
classes -- i.e. if you want to load features for User
and
Merchant
and there is no natural relationship object which is related to both of these classes, multi_query
allows you to submit two independent queries.
Returns a BulkOnlineQueryResponse, which is functionally a list of query results. Each of these result can be accessed by index. Individual results can be further checked for errors and converted to pandas or polars DataFrames.
In contrast, query_bulk
executes a single query with multiple inputs/outputs.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results: list[BulkOnlineQueryResult], where each result contains dataframes of the results of each query or any errors.
from chalk.client import ChalkClient, OnlineQuery
queries = [
OnlineQuery(input={User.name: ['Katherine Johnson'], output=[User.fico_score]}),
OnlineQuery(input={Merchant.name': ['Myrrh Chant'], output=['Merchant.address']}),
OnlineQuery(input={NonFeature.wrong': ['Wrong!'], output=['NonFeature.wrong']}),
]
result = ChalkClient().multi_query(
queries=queries,
)
result[0].get_feature_value(User.fico_score)
queries_with_errors = [q for q, r in zip(queries, result) if r.errors is not None]
Compute features values for many rows of inputs using online resolvers. See Chalk Clients for more information on online query.
This method is similar to query
, except it takes in list
of inputs, and produces one
output per row of inputs.
This method is appropriate if you want to fetch the same set of features for many different input primary keys.
This method contrasts with multi_query
, which executes multiple fully independent queries.
This endpoint is not available in all environments.
The time at which to evaluate the query. If not specified, the current time will be used.
The length of this list must be the same as the length of the values in input
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
from chalk.client import ChalkClient
ChalkClient().query_bulk(
input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Plan a query without executing it.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery, the query will automatically pull outputs
and options specified in the matching NamedQuery.'
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
from chalk.client import ChalkClient
result = ChalkClient().plan_query(
input=[User.id],
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.rendered_plan
result.output_schema
Get a Chalk Dataset
containing data from a previously created dataset.
If an offline query has been created with a dataset name, .get_dataset
will
return a Chalk Dataset
.
The Dataset
wraps a lazily-loading Chalk DataFrame
that enables us to analyze
our data without loading all of it directly into memory.
See Offline Queries for more information.
The name of the Dataset
to return.
Previously, you must have supplied a dataset name upon an offline query.
Dataset names are unique for each environment.
If 'dataset_name' is provided, then 'job_id' should not be provided.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
A UUID returned in the Dataset
object from an offline query.
Dataset ids are unique for each environment.
If 'dataset_id' is provided, then 'dataset_name' and 'revision_id' should not be provided.
The unique id of the DatasetRevision
to return.
If a previously-created dataset did not have a name, you can look it
up using its unique job id instead.
If 'revision_id' is provided, then 'dataset_name' and 'dataset_id' should not be provided.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(timezone.utc)
X = ChalkClient().offline_query(
input={
User.id: uids,
},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset='my_dataset_name'
)
Some time later...
dataset = ChalkClient().get_dataset(
dataset_name='my_dataset_name'
)
...
or
dataset = ChalkClient().get_dataset(
job_id='00000000-0000-0000-0000-000000000000'
)
...
If memory allows:
df: pd.DataFrame = dataset.get_data_as_pandas()
Compute feature values from the offline store or by running offline/online resolvers.
See Dataset
for more information.
The time at which the given inputs should be observed for point-in-time correctness. If given a list of
times, the list must match the length of the input
lists. Each element of input_time corresponds with the
feature values at the same index of the input
lists.
See Temporal Consistency for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
A unique name that if provided will be used to generate and
save a Dataset
constructed from the list of features computed
from the inputs.
If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The maximum number of samples to include in the DataFrame
.
If not specified, all samples will be returned.
If True, progress bars will be shown while the query is running.
Primarily intended for use in a Jupyter-like notebook environment.
This flag will also be propagated to the methods of the resulting
Dataset
.
How long to wait, in seconds, for job completion before raising a TimeoutError.
Jobs will continue to run in the background if they take longer than this timeout.
For no timeout, set to None
. If no timeout is specified, the client's default
timeout is used.
Used to control whether resolvers are allowed to run in order to compute feature values.
If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.
A list of features that will always be sampled, and thus always excluded from recompute. Should not overlap with any features used in "recompute_features" argument.
If specified, the query will only be run on data observed after this timestamp. Accepts strings in ISO 8601 format.
If specified, the query will only be run on data observed before this timestamp. Accepts strings in ISO 8601 format.
If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
A SQL query that will query your offline store and use the result as input. See Input for more information.
Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs.
See ResourceRequests
for more information.
from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
input={
User.id: uids,
},
input_times=[at] * len(uids),
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
Triggers a resolver to run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
If specified, the resolver will only ingest data observed before this timestamp. Accepts strings in ISO 8601 format.
If specified, the resolver will only ingest data observed after this timestamp. Accepts strings in ISO 8601 format.
from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
resolver_fqn="mymodule.fn"
)
Retrieves the status of a resolver run. See Triggered Runs for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.