Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:
from chalk.features import features, DataFrame
@features
class CreditCard:
id: int
user_id: "User.id"
limit: float
@features
class User:
id: int
name: str
email: str
credit_cards: DataFrame[CreditCard]
total_limit: float = _.credit_cards[_.limit].sum()
Features can be nested, as with credit_cards
above.
In this section, we'll dive into API components that make up the
building blocks of features.
The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Setting etl_offline_to_online
on a feature class assigns it to all features on the
class which do not explicitly specify etl_offline_to_online
.
When a feature is expensive or slow to compute, you may wish to cache its value.
Chalk uses the terminology "maximum staleness" to describe how recently a feature
value needs to have been computed to be returned without re-running a resolver.
Assigning a max_staleness
to the feature class assigns it to all features on the
class which do not explicitly specify a max_staleness
value of their own.
@features(
owner="andy@chalk.ai",
max_staleness="30m",
etl_offline_to_online=True,
tags="user-group",
)
class User:
id: str
# Comments here appear in the web!
# :tags: pii
name: str | None
# :owner: userteam@mycompany.com
location: LatLng
Add metadata and configuration to a feature.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner
from chalk.features import features, feature
from datetime import date
@features
class User:
id: str
# :owner: user-team@company.com
name: str
dob: date = feature(owner="user-team@company.com")
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags
from chalk.features import features, feature
@features
class User:
id: str
# :tags: pii
name: str
dob: date = feature(tags=["pii"])
The maximum version for a feature. Versioned features can be
referred to with the @
operator:
@features
class User:
id: str
score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"
See more at Versioning
The default version for a feature. When you reference a
versioned feature without the @
operator, you reference
the default_version
. Set to 1
by default.
@features
class User:
id: str
score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"
See more at Default versions
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching
When True
(default), Chalk will cache all values, including nulls.
When False
, Chalk will not update the null entry in the cache.
When "evict_nulls"
, Chalk will evict the entry that would have been
null from the cache, if it exists.
Concretely, suppose the current state of a database is {a: 1, b: 2}
,
and you write a row {a: 2, b: None}
. Here is the expected result in the db:
{a: 2, b: None}
when cache_nulls=True
(default){a: 2, b: 2}
when cache_nulls=False
{a: 2}
when cache_nulls="evict_nulls"
When True
(default), Chalk will cache all values, including default values.
When False
, Chalk will not update the default entry in the cache.
When "evict_defaults"
, Chalk will evict the entry that would have been
a default value from the cache, if it exists.
Concretely, suppose the current state of a database is {a: 1, b: 2}
,
and you write a row {a: 2, b: "default"}
, and the default value for feature b is "default"
.
Here is the expected result in the db:
{a: 2, b: "default"}
when cache_defaults=True
{a: 2, b: 2}
when cache_defaults=False
{a: 2}
when cache_defaults="evict_defaults"
The cache_nulls
and cache_defaults
options can be used together on the same feature with the
following exceptions: if cache_nulls=False
, then cache_defaults
cannot be "evict_defaults"
, and if
cache_nulls="evict_defaults"
, then cache_defaults
cannot be False
.
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
Read more at Reverse ETL
If specified, when this feature is computed, Chalk will check that x >= min
.
from chalk.features import features, feature
@features
class User:
id: str
fico_score: int = feature(min=300, max=850)
If specified, when this feature is computed, Chalk will check that x <= max
.
from chalk.features import features, feature
@features
class User:
id: str
fico_score: int = feature(min=300, max=850)
If specified, when this feature is computed, Chalk will check that len(x) >= min_length
.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(min_length=1)
If specified, when this feature is computed, Chalk will check that len(x) <= max_length
.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(min_length=1000)
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
The backing pyarrow.DataType
for the feature. This parameter can
be used to control the storage format of data. For example, if you
have a lot of data that could be represented as smaller data types,
you can use this parameter to save space.
import pyarrow as pa
@features
class WatchEvent:
id: str
duration_hours: float = feature(dtype=pa.float8())
The default value of the feature if it otherwise can't be computed.
If you don't need to specify other metadata, you can also assign a default
in the same way you would assign a default to a dataclass
:
from chalk.features import features
@features
class User:
num_purchases: int = 0
An underscore expression for defining the feature. Typically,
this value is assigned directly to the feature without needing
to use the feature(...)
function. However, if you want to define
other properties, like a default
or max_staleness
, you'll
want to use the expression
keyword argument.
from chalk.features import features
from chalk import _
@features
class Receipt:
subtotal: int
tax: int = 0 # Default value, without other metadata
total: int = feature(expression=_.subtotal + _.tax, default=0)
See more at Expressions
If True
, this feature is considered deprecated, which impacts the dashboard, alerts,
and warnings.
from chalk.features import features, feature
@features
class User:
id: str
name: str = feature(deprecated=True)
The type of the input feature, given by _TRich
.
from chalk.features import Primary, features, feature
@features
class User:
uid: Primary[int]
# Uses a default value of 0 when one cannot be computed.
num_purchases: int = 0
# Description of the name feature.
# :owner: fraud@company.com
# :tags: fraud, credit
name: str = feature(
max_staleness="10m",
etl_offline_to_online=True
)
score: int = feature(
version=2, default_version=2
)
Specify a feature that represents a one-to-one relationship.
This function allows you to explicitly specify a join condition between
two @features
classes. When there is only one way to join two classes,
we recommend using the foreign-key definition instead of this has_one
function. For example, if you have a User
class and a Card
class, and
each user has one card, you can define the Card
and User
classes as
follows:
@features
class User
id: str
@features
class Card
id: str
user_id: User.id
user: User
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
card: Card = has_one(
lambda: User.id == Card.user_id
)
Specify a feature that represents a one-to-many relationship.
The join condition between @features
classes.
This argument is callable to allow for forward
references to members of this class and the joined
class.
from chalk.features import DataFrame, features
@features
class Card
id: str
user_id: str
balance: float
@features
class User
id: str
cards: DataFrame[Card] = has_many(
lambda: User.id == Card.user_id
)
The function after
can be used with DataFrame
to compute windowed features.
after
filters a DataFrame
relative to the current time in context, such that if
the after
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where now - {time_window} <= t <= now
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to after
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]
The function before
can be used with DataFrame
to compute windowed features.
before
filters a DataFrame
relative to the current time in context such that
if the before
filter is defined as now - {time_window}
, the filter will include
all features with timestamps t where t <= now - {time_window}
.
This time could be in the past if you’re using an offline resolver.
Using window functions ensures that you maintain point-in-time correctness.
The parameters to before
take many keyword arguments describing the
time relative to the present.
The feature to use for the filter. By default, index
is the FeatureTime
of the referenced feature class.
from chalk.features import DataFrame, features
@features
class Card:
...
@features
class User:
cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]
Declare a windowed feature.
@features
class User:
failed_logins: Windowed[int] = windowed("10m", "24h")
Create a windowed feature.
See more at Windowed Aggregations
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.
See more at Caching
Sets a maximum age for values eligible to be retrieved from the offline store, defined in relation to the query's current point-in-time.
Feature versions allow you to manage a feature as its definition changes over time.
The version
keyword argument allows you to specify the
maximum number of versions available for this feature.
See more at Versioning
When True
, Chalk copies this feature into the online environment
when it is computed in offline resolvers.
See more at Reverse ETL
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
A list of Validations to apply to this feature.
See more at https://docs.chalk.ai/api-docs#Validation
The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum()
.
Metadata for the windowed feature, parameterized by
TPrim
(the primitive type of the feature) and
TRich
(the decoded type of the feature, if decoder
is provided).
from chalk import windowed, Windowed
@features
class User:
id: int
email_count: Windowed[int] = windowed(days=range(1, 30))
logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]
Create a windowed feature with grouping.
See more at https://docs.chalk.ai/docs/materialized_aggregations
The size of the buckets for the window function.
Buckets are specified as strings in the format "1d"
, "2h"
, "1h30m"
, etc.
You may also choose to specify the buckets using the days, hours, and minutes
parameters instead. The buckets parameter is helpful if you want to use
multiple units to express the bucket size, like "1h30m"
.
The expression to compute the feature. This is an underscore expression,
like _.transactions[_.amount].sum()
.
Configuration for aggregating data. Pass bucket_duration
with a
Duration to configure the bucket size for aggregation.
See more at https://docs.chalk.ai/docs/materialized_aggregations
Convenience parameter for specifying the buckets in days.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1d"
.
Convenience parameter for specifying the buckets in hours.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1h"
.
Convenience parameter for specifying the buckets in minutes.
Using this parameter is equvalent to specifying the buckets parameter
with a string like "1m"
.
You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.
Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.
If True
, if this feature does not meet the validation criteria, Chalk will not persist
the feature value and will treat it as failed.
The backing pyarrow.DataType
for the feature. This parameter can
be used to control the storage format of data. For example, if you
have a lot of data that could be represented as smaller data types,
you can use this parameter to save space.
import pyarrow as pa
from chalk.features import features
@features
class User:
id: str
email_count: Windowed[int] = windowed(
"10m", "30m",
dtype=pa.int16(),
)
from chalk import group_by_windowed, DataFrame
from chalk.features import features
@features
class Email:
id: int
user_id: "User.id"
@features
class User:
id: int
emails: DataFrame[Email]
emails_by_category: DataFrame = group_by_windowed(
"10m", "30m",
expression=_.emails.group_by(_.category).count(),
)
Configuration for window aggregates. At least one of
bucket_duration
and bucket_durations
must be provided.
If both are provided, bucket_duration
acts as a default
for the window materialization, which may be overridden by
bucket_duration
.
The duration of each bucket in the window, using a
chalk.Duration
string, e.g. "1m"
, "1h"
, "1d"
.
To use different bucket durations for different window
sizes, see bucket_durations
below.
A mapping from the desired bucket duration to the window size(s) that should use that bucket duration.
If bucket_duration
is also provided, any window
durations not specified in this mapping will pick up
the bucket duration from the bucket_duration
parameter.
This parameter is useful when you have some very large windows and some very small windows. For example, if you have a 365-day window and a 10-minute window, you wouldn't want to maintain 365 days of 10-minute buckets in the online store. However, using a 1-day bucket for the 10-minute window would also lead to significantly more events fitting into the window than you might want.
In this case, you could specify:
count: Windowed[int] = windowed(
"1d", "365d",
materialization={
# 1-day buckets for the 365d window
bucket_duration="1d",
bucket_durations={
# 10-minute buckets for the 1d window
"10m": "1d",
}
},
expression=_.events.count(),
)
Marks a feature as the primary feature for a feature class.
Features named id
on feature classes without an explicit primary
feature are declared primary keys by default, and don't need to be
marked with Primary
.
If you have primary key feature with a name other than id
,
you can use this marker to indicate the primary key.
from chalk.features import features
from chalk import Primary
@features
class User:
username: Primary[str]
Specify explicit data validation for a feature.
The feature()
function can also specify these validations,
but this class allows you to specify both strict and non-strict
validations at the same time.
from chalk.features import features, feature
@features
class User:
fico_score: int = feature(
validations=[
Validation(min=300, max=850, strict=True),
Validation(min=300, max=320, strict=False),
Validation(min=840, max=850, strict=False),
]
)
# If only one set of validations were needed,
# you can use the [`feature`](#feature) function instead:
first_name: str = feature(
min_length=2, max_length=64, strict=True
)
The Vector class can be used type annotation to denote a Vector feature.
Instances of this class will be provided when working with raw vectors inside of resolvers.
Generally, you do not need to construct instances of this class directly, as Chalk will
automatically convert list-like features into Vector
instances when working with a Vector
annotation.
data: numpy.Array | list[float] | pyarrow.FixedSizeListScalar The vector values
from chalk.features import Vector, features
@features
class Document:
embedding: Vector[1536]
Define a nearest neighbor relationship for performing Vector similarity search.metric: "l2" | "ip" | "cos" The metric to use to compute distance between two vectors. L2 Norm ("l2"), Inner Product ("ip"), and Cosine ("cos") are supported. Defaults to "l2".
A nearest neighbor relationship filter.
Decorator to create an online resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time
to build a static computation graph. If True
, all inputs will
either be StaticOperators
(for has-many and DataFrame
relationships)
or StaticExpressions
(for individual features). The resolver must
return a StaticOperator
as output.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@online
def name_match(
name: User.full_name,
account_name: User.bank_account.title
) -> User.account_name_match_score:
if name.lower() == account_name.lower():
return 1.
return 0.
Decorator to create an offline resolver.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.
Cron can sample all examples, a subset of all examples, or a custom provided set of examples.
Read more at Scheduling
Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.
Whether this resolver should be invoked once during planning time to
build a static computation graph. If True
, all inputs will either
be StaticOperators
(for has-many and dataframe relationships) or
StaticExpressions
(for individual features). The resolver must
return a StaticOperator
as output.
A ResolverProtocol
which can be called as a normal function! You can unit-test
resolvers as you would unit-test any other code.
Read more at Unit Tests
@offline(cron="1h")
def get_fraud_score(
email: User.email,
name: User.name,
) -> User.fraud_score:
return socure.get_sigma_score(email, name)
Run an online or offline resolver on a schedule.
This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.
The period of the cron job. Can be either a crontab ("0 * * * *"
)
or a Duration
("2h"
).
Optionally, a function to filter down the arguments to consider.
See Filtering examples for more information.
Explicitly provide the sample function for the cron job.
See Custom examples for more information.
Using a filter
def only_active_filter(v: User.active):
return v
@online(cron=Cron(schedule="1d", filter=only_active_filter))
def score_user(d: User.signup_date) -> User.score:
return ...
Using a sample function
def s() -> DataFrame[User.id]:
return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:
A subset of the global variables mentioned inside of the function, which are saved here in order to allow the function to be emulated symbolically.
Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others
Read more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The arguments to pass to the decorated function.
If one of the arguments is a DataFrame
with a
filter or projection applied, the resolver will
only be called with the filtered or projected
data. Read more at
https://docs.chalk.ai/docs/unit-tests#data-frame-inputs
The result of calling the decorated function
with args
. Useful for unit-testing.
Read more at Unit Tests
@online
def get_num_bedrooms(
rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
return len(rooms)
rooms = [
Room(id=1, name="bedroom"),
Room(id=2, name="kitchen"),
Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
An immutable context that can be accessed from Python resolvers. This context wraps a JSON-compatible dictionary or JSON string with type restrictions.
from chalk.client import ChalkClient
from chalk.features import features
from chalk import ChalkContext, online
import requests
import json
...
@features
class User:
id: int
endpoint_url: str
endpoint_response: str
@online
def get_user_endpoint_response(endpoint_url: User.endpoint_url) -> User.endpoint_response:
context_headers = {}
optional_correlation_id = ChalkContext.get("request_correlation_id")
if optional_correlation_id is not None:
context_headers["correlation-id"] = optional_correlation_id
response = requests.get(endpoint_url, headers=context_headers)
return json.dumps(response.json())
ChalkClient().query(
input={User.id: 1, User.endpoint_url: "https://api.example.com/message"},
output=[User.endpoint_response],
query_context={"request_correlation_id": "df0cc84b-bb0e-41b1-82cd-74ccd968b2fa"},
)
The type of machine to use.
You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.
Chalk includes a DataFrame
class that models tabular data in much the same
way that pandas
does. However, there are some key differences that
allow the Chalk DataFrame
to increase type safety and performance.
Like pandas, Chalk's DataFrame
is a two-dimensional data structure with
rows and columns. You can perform operations like filtering, grouping,
and aggregating on a DataFrame
. However, there are two main differences.
DataFrame
is lazy and can be backed by multiple data sources, where a pandas.DataFrame
executes eagerly in memory.DataFrame[...]
can be used to represent a type of data with pre-defined filters.Unlike pandas, the implementation of a Chalk DataFrame
is lazy, and
can be executed against many different backend sources of data.
For example, in unit tests, a DataFrame
uses an implementation backed by polars.
But if your DataFrame
was returned from a SQL source,
filters and aggregations may be pushed down to the database for efficient
execution.
Each column of a Chalk DataFrame
typed by a Feature
type.
For example, you might have a resolver returning a DataFrame
containing user ids and names:
@features
class User:
id: int
name: str
email: str
@online
def get_users() -> DataFrame[User.id, User.name]:
return DataFrame([
User(id=1, name="Alice"),
User(id=2, name="Bob")
])
Construct a Chalk DataFrame
.
The data. Can be an existing pandas.DataFrame
,
polars.DataFrame
or polars.LazyFrame
,
a sequence of feature instances, or a dict
mapping
a feature to a sequence of values.
The strategy to use to handle missing values.
A feature value is "missing" if it is an ellipsis (...
),
or it is None
and the feature is not annotated as Optional[...]
.
The available strategies are:
'error'
: Raise a TypeError
if any missing values are found.
Do not attempt to replace missing values with the default
value for the feature.'default_or_error'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, raise a TypeError
.'default_or_allow'
: If the feature has a default value, then
replace missing values with the default value for the feature.
Otherwise, leave it as None
. This is the default strategy.'allow'
: Allow missing values to be stored in the DataFrame
.
This option may result non-nullable features being assigned
None
values.Row-wise construction
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
Column-wise construction
df = DataFrame({
User.id: [1, 2],
User.first: ["Sam", "Iris"],
User.last: ["Wu", "Xi"]
})
Construction from polars.DataFrame
import polars
df = DataFrame(polars.DataFrame({
"user.id": [1, 2],
"user.first": ["Sam", "Iris"],
"user.last": ["Wu", "Xi"]
}))
Aggregate the DataFrame
by the specified columns.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Compute a histogram with fixed width bins.
The column to compute the histogram on. If not supplied, the DataFrame
is assumed to contain a single column.
DataFrame({
Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]
Group based on a time value (date
or datetime
).
The groups are defined by a time-based window, and optionally,
other columns in the DataFrame
. The "width" of the window is
defined by the period
parameter, and the spacing between the
windows is defined by the every
parameter. Note that if the
every
parameter is smaller than the period
parameter, then
the windows will overlap, and a single row may be assigned to
multiple groups.
As an example, consider the following DataFrame
:
val: a b c d e f g h
─────────●─────────●─────────●─────────●───────▶
time: A B C D
┌─────────┐
1 │ a b │ 1: [a, b]
└────┬────┴────┐
2 ◀───▶│ b c │ 2: [b, c]
every└────┬────┴────┐
3 ◀────────▶│ c d e │ 3: [c, d, e]
period └────┬────┴────┐
4 │d e f│ 4: [d, e, f]
└────┬────┴────┐
5 │ f │ 5: [f]
└────┬────┴────┐
6 │ │
└────┬────┴────┐
7 │ g h │ 7: [g, h]
└────┬────┴────┐
8 │g h │ 8: [g, h]
└─────────┘
In the above example, the sixth time bucket is empty, and
will not be included in the resulting DataFrame
.
from chalk import DataFrame, op
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
},
).group_by_hopping(
index=User.ts,
group={User.id: User.id},
agg={User.val: op.median(User.val)},
period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts │ User.val │
╞═════════╪══════════╪══════════╡
│ 1 │ 2020-1-1 │ 3 │
├─────────┼──────────┼──────────┤
│ 3 │ 2020-1-3 │ 10 │
╰─────────┴──────────┴──────────╯
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
User.first_name, User.first
)
df = DataFrame([
User(id=1, first="Sam", last="Wu"),
User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
User.first_name: User.first
})
Read a .csv file as a DataFrame
.
values = DataFrame.read_csv(
"s3://...",
columns={0: MyFeatures.id, 1: MyFeatures.name},
has_header=False,
)
Sort the DataFrame
by the given columns.
df = DataFrame({
User.a: [1, 2, 3],
User.b: [3, 2, 1],
})
df.sort(User.a)
a b
-----------
0 1 3
1 2 2
2 3 1
Get the underlying DataFrame
as a polars.LazyFrame
.
The underlying polars.LazyFrame
.
Get the underlying DataFrame
as a pyarrow.Table
.
The underlying pyarrow.Table
. This format is the canonical
representation of the data in Chalk.
Get the underlying DataFrame
as a pandas.DataFrame
.prefixed
Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name
, if
if prefixed=False, name
)
The data formatted as a pandas.DataFrame
.
Operations for aggregations in DataFrame
.
The class methods on this class are used to create
aggregations for use in DataFrame.group_by
.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
User.active: [True, True, False],
}
).group_by(
group={User.id: User.id}
agg={
User.val: op.product(User.val),
User.active: op.product(User.active),
}
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
│ 1 │ 2 │ 1 │
├─────────┼──────────┼─────────────┤
│ 3 │ 10 │ 0 │
╰─────────┴──────────┴─────────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 4 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 0.5 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 3 │
├─────────┼──────────┤
│ 3 │ 6.5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [1, 5, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 2 │
├─────────┼──────────┤
│ 3 │ 1 │
╰─────────┴──────────╯
Concatenate the string values of col
and col2
in a DataFrame
.
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val='a'),
User(id=1, val='b'),
User(id=3, val='c'),
User(id=3, val='d'),
]
).group_by(
group={User.id: User.id},
agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ "ab" │
├─────────┼──────────┤
│ 3 │ "cd" │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=True,
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
from chalk.features import DataFrame
DataFrame(
[
User(id=1, val=1),
User(id=1, val=3),
User(id=3, val=7),
User(id=3, val=5),
]
).sort(
User.amount, descending=False
).group_by(
group={User.id: User.id},
agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ 1 │
├─────────┼──────────┤
│ 3 │ 5 │
╰─────────┴──────────╯
Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.
The aggregation, allowing you to continue to chain methods.
from chalk.features import DataFrame
df = DataFrame(
{
User.id: [1, 1, 3],
User.val: [0.5, 4, 10],
}
).group_by(
group={User.id: User.id}
agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1 │ null │
├─────────┼──────────┤
│ 3 │ 10 │
╰─────────┴──────────╯
Create a Snowflake data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a PostgreSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a MySQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a DynamoDB data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details. DynamoDBSources can be queried via PartiQL SQL
resolvers.
You may override the ambient AWS credentials by providing either a client ID and secret, or a role ARN.
Create a BigQuery data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Redshift data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a CloudSQL data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Testing SQL source.
If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.
The SQL source for use in Chalk resolvers.
source = SQLiteInMemorySource(name="RISK")
Create a Databricks data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Create a Spanner data source. SQL-based data sources
created without arguments assume a configuration in your
Chalk Dashboard. Those created with the name=
keyword
argument will use the configuration for the integration
with the given name. And finally, those created with
explicit arguments will use those arguments to configure
the data source. See the overloaded signatures for more
details.
Incremental settings for Chalk SQL queries.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter" mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
. This will incrementalize your query using the timestamp
of the latest row that has been ingested.
Chalk will also include another query parameter named "chalk_last_execution_timestamp"
that can be used instead.
This will incrementalize your query using the last time the query was executed.
incremental_timestamp:
If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.
If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.
The timestamp to set as the lower bound
Run a query from a SQL file.
This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL file resolvers for more information.
The path to the file with the sql file,
relative to the caller's file, or to the
directory that your chalk.yaml
file lives in.
Automatically ingest a table.
from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
name="users",
features=User,
).with_table(
name="accounts",
features=Account,
# Override one of the column mappings.
column_to_feature={
"acct_id": Account.id,
},
)
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Return the results represented by this Query as a DataFrame
.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
A query that can be returned from a resolver.
Materialize the query.
Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.
Note: this requires the usage of the fields={...}
argument when used in conjunction with query_string
or query_sql_file
.
Return at most one result or raise an exception.
Returns None
if the query selects no rows. Raises if
multiple object identities are returned, or if multiple
rows are returned for a query that returns only scalar
values as opposed to full identity-mapped entities.
A query that can be returned from a resolver.
Operates like .all()
, but tracks previous_latest_row_timestamp
between query executions in
order to limit the amount of data returned.
previous_latest_row_timestamp
will be set the start of the query execution, or if you return
a FeatureTime
-mapped column, Chalk will update previous_latest_row_timestamp
to the maximum
observed FeatureTime
value.
In "row"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only return "new" results, by adding a clause that looks like:
WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>
In "group"
mode:
incremental_column
MUST be set.
Returns the results represented by this query as a list (like .all()
), but modifies the query to
only results from "groups" which have changed since the last run of the query.
This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:
SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id
would be rewritten like this:
SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
SELECT DISTINCT(user_id)
FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id
In "parameter"
mode:
incremental_column
WILL BE IGNORED.
This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query.
Chalk will include a query parameter named "chalk_incremental_timestamp"
. Depending on your SQL
dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp
or
%(chalk_incremental_timestamp)s
.
This should reference a timestamp column in your underlying table, typically something
like "updated_at"
, "created_at"
, "event_time"
, etc.
Defaults to "row"
, which indicates that only rows newer than the last observed row should be
considered. When set to "group"
, Chalk will only ingest features from groups which are newer
than the last observation time. This requires that the query is grouped by a primary key.
A query that can be returned from a resolver.
Generate a Chalk SQL file resolver from a filepath and a sql string.
This will generate a resolver in your web dashboard that can be queried,
but will not output a .chalk.sql
file.
The optional parameters are overrides for the comment key-value pairs at the top of the sql file resolver. Comment key-value pairs specify important resolver information such as the source, feature namespace to resolve, and other details. Note that these will override any values specified in the sql string. See Configuration for more information.
See SQL file resolvers for more information on SQL file resolvers.
Can either be a BaseSQLSource or a string.
If a string is provided, it will be used to infer the source by
first scanning for a source with the same name, then inferring
the source if it is a type, e.g. snowflake
if there is only
one database of that type. Optional if source
is specified in sql
.
from chalk import make_sql_file_resolver
from chalk.features import features
@features
class User:
id: int
name: str
make_sql_file_resolver(
name="my_resolver",
sql="SELECT user_id as id, name FROM users",
source="snowflake",
resolves=User,
kind="offline",
)
Decorator to create a stream resolver.
This parameter is defined when the streaming resolver returns a windowed feature.
Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of
tumbling windows as adjacently arranged bins of equal width.
Tumbling windows are most often used alongside max_staleness
to allow the features
to be sent to the online store and offline store after each window period.
Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.
See more at Window modes
A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.
See more at Parsing
A mapping from input BaseModel
attribute to Chalk feature attribute to support continuous streaming re-keying.
This parameter is required for continuous resolvers.
Features that are included here do not have to be explicitly returned in the stream resolver:
the feature will automatically be set to the key value used for aggregation.
See more at Keys
An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom event timestamping
If set to False
, the stream resolver will not update materialized aggregations, but is still eligible for ETL.
A callable function! You can unit-test stream resolvers as you would unit-test any other code.
Decorator to create a sink. Read more at Sinks
Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.
Environment can take one of three types:
None
(default) - candidate to run in every environmentstr
- run only in this environmentlist[str]
- run in any of the specified environment and no othersRead more at Environments
Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.
You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.
Read more at Tags
The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.
A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests
@sink
def process_updates(
uid: User.id,
email: User.email,
phone: User.phone,
):
user_service.update(
uid=uid,
email=email,
phone=phone
)
process_updates(123, "sam@chalk.ai", "555-555-5555")
An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.
Protocol used to communicate with brokers.
Valid values are "PLAINTEXT"
, "SSL"
, "SASL_PLAINTEXT"
, and "SASL_SSL"
.
Defaults to "PLAINTEXT"
.
Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL.
Valid values are "PLAIN"
, "GSSAPI"
, "SCRAM-SHA-256"
, "SCRAM-SHA-512"
, "OAUTHBEARER"
.
Defaults to "PLAIN"
.
The subscription id of your PubSub topic from which you want to consume messages. To enable permission for consuming this screen, ensure that the service account has the permissions 'pubsub.subscriptions.consume' and 'pubsub.subscriptions.get'.
Base class for all stream sources generated from @stream
.
Identifier for the dead-letter queue (DLQ) for the stream. If not specified, failed messages will be dropped. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub.
The ChalkClient
is the primary Python interface for interacting with Chalk.
You can use it to query data, trigger resolver runs, gather offline data, and more.
Create a ChalkClient
with the given credentials.
The client secret to use to authenticate. Can either be a service token secret or a user token secret.
The ID or name of the environment to use for this client.
Not necessary if your client_id
and client_secret
are for a service token scoped to a single environment.
If not present, the client will use the environment variable
CHALK_ENVIRONMENT
.
The API server to use for this client. Required if you are
using a Chalk Dedicated deployment. If not present, the client
will check for the presence of the environment variable
CHALK_API_SERVER
, and use that if found.
If specified, Chalk will route all requests from this client
instance to the relevant branch. Some methods allow you to
override this instance-level branch configuration by passing
in a branch
argument.
If True
, the client will pick up the branch from the
current git branch.
If specified, Chalk will route all requests from this client
instance to the relevant tagged deployment. This cannot be
used with the branch
argument.
If specified, Chalk will route all requests from this client instance to the relevant preview deployment.
The query server to use for this client. Required if you are using a standalone Chalk query engine deployment. If not present, the client will default to the value of api_server.
The default wait timeout, in seconds, to wait for long-running jobs to complete when accessing query results. Jobs will not time out if this timeout elapses. For no timeout, set to None. The default is no timeout.
The default wait timeout, in seconds, to wait for network requests to complete. If not specified, the default is no timeout.
If client_id
or client_secret
are not provided, there
is no ~/.chalk.yml
file with applicable credentials,
and the environment variables CHALK_CLIENT_ID
and
CHALK_CLIENT_SECRET
are not set.
Compute features values using online resolvers. See Chalk Clients for more information.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
If an empty sequence, the output will be set to all features on the namespace
of the query. For example, if you pass as input {"user.id": 1234}
, then the query
is defined on the User
namespace, and all features on the User
namespace
(excluding has-one and has-many relationships) will be used as outputs.
The time at which to evaluate the query. If not specified, the current time will be used.
This parameter is complex in the context of online_query since the online store
only stores the most recent value of an entity's features. If now
is in the past,
it is extremely likely that None
will be returned for cache-only features.
This parameter is primarily provided to support:
If you are trying to perform an exploratory analysis of past feature values, prefer offline_query
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
You can specify a correlation ID to be used in logs and web interfaces.
This should be globally unique, i.e. a uuid
or similar. Logs generated
during the execution of your query will be tagged with this correlation id.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery
, the query will automatically pull outputs
and options specified in the matching NamedQuery
.
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery
with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
Returns metadata about the query execution under OnlineQueryResult.meta
.
This could make the query slightly slower.
For more information, see Chalk Clients.
If True
, the output of each of the query plan stages will be stored.
This option dramatically impacts the performance of the query,
so it should only be used for debugging.
If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.
from chalk.client import ChalkClient
result = ChalkClient().query(
input={
User.name: "Katherine Johnson"
},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)
Execute multiple queries (represented by queries=
argument) in a single request. This is useful if the
queries are "rooted" in different @features
classes -- i.e. if you want to load features for User
and
Merchant
and there is no natural relationship object which is related to both of these classes, multi_query
allows you to submit two independent queries.
Returns a BulkOnlineQueryResponse
, which is functionally a list of query results. Each of these result
can be accessed by index. Individual results can be further checked for errors and converted
to pandas or polars DataFrames.
In contrast, query_bulk
executes a single query with multiple inputs/outputs.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results as a list[BulkOnlineQueryResult]
,
where each result contains a DataFrame
of the results of each
query or any errors.
from chalk.client import ChalkClient, OnlineQuery
queries = [
OnlineQuery(
input={User.name: ['Katherine Johnson']},
output=[User.fico_score],
),
OnlineQuery(
input={Merchant.name: ['Eight Sleep']},
output=[Merchant.address],
),
]
result = ChalkClient().multi_query(queries)
result[0].get_feature_value(User.fico_score)
Compute features values for many rows of inputs using online resolvers. See Chalk Clients for more information on online query.
This method is similar to query
, except it takes in list
of inputs, and produces one
output per row of inputs.
This method is appropriate if you want to fetch the same set of features for many different input primary keys.
This method contrasts with multi_query
, which executes multiple fully independent queries.
This endpoint is not available in all environments.
The time at which to evaluate the query. If not specified, the current time will be used.
The length of this list must be the same as the length of the values in input
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
An output containing results as a list[BulkOnlineQueryResult]
,
where each result contains a DataFrame
of the results of each query.
from chalk.client import ChalkClient
ChalkClient().query_bulk(
input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
Plan a query without executing it.
The features for which there are known values, mapped to those values.
For example, {User.id: 1234}
. Features can also be expressed as snakecased strings,
e.g. {"user.id": 1234}
Outputs are the features that you'd like to compute from the inputs.
For example, [User.age, User.name, User.email]
.
Maximum staleness overrides for any output features or intermediate features. See Caching for more information.
The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.
The semantic name for the query you're making, for example, "loan_application_model"
.
Typically, each query that you make from your application should have a name.
Chalk will present metrics and dashboard functionality grouped by 'query_name'.
If your query name matches a NamedQuery
, the query will automatically pull outputs
and options specified in the matching NamedQuery
.
If query_name
is specified, this specifies the version of the named query you're making.
This is only useful if you want your query to use a NamedQuery
with a specific name and a
specific version. If a query_name
has not been supplied, then this parameter is ignored.
The number of input rows that this plan will be run with. If unknown, specify None
.
The query plan, including the resolver execution order and the resolver execution plan for each resolver.
from chalk.client import ChalkClient
result = ChalkClient().plan_query(
input=[User.id],
output=[User.fico_score],
staleness={User.fico_score: "10m"},
)
result.rendered_plan
result.output_schema
Check whether expected results of a query match Chalk query ouputs.
This function should be used in integration tests.
If you're using pytest
, pytest.fail
will be executed on an error.
Otherwise, an AssertionError
will be raised.
A feature set or a mapping of {feature: value}
of givens.
All values will be encoded to the json representation.
A feature set or a mapping of {feature: value}
of expected outputs.
For values where you do not care about the result, use an ...
for the
feature value (i.e. when an error is expected).