Queries
Fetch offline feature values.
Offline queries pull data from the offline store or calculate features through resolvers that are marked as offline.
Offline queries can also execute online resolvers if no offline resolver is available for a requested feature.
In an offline query, you can request features for multiple entities at distinct time points. By default, an offline query returns a row for each primary key containing the most recent computed value for each requested output feature. The main use case for offline queries is creating datasets.
Chalk supports a number of clients for offline queries. In this section, our examples use our Python client, which integrates nicely with Jupyter notebooks.
As mentioned earlier, offline queries can be made through one of Chalk’s API clients.
from chalk.client import ChalkClient
from datetime import datetime
client = ChalkClient()
dataset = client.offline_query(
input={'user.id': [1,2,3,4]}, # Input
output=['user.name'], # Output
# tags=['test'], # Environment
# branch='branch',
# recompute_features=True, # Run Resolvers
# run_asynchronously=True, # Run Configuration
# max_samples = 10,
# lower_bound=datetime(2024, 10, 12), # Bounds
# upper_bound=datetime(2024, 10, 20),
)
df = dataset.get_data_as_pandas()
Offline queries return Chalk Datasets, which we cover in more detail in the
next section. At a high level, Chalk Dataset
s are flexible wrappers
for the results of your offline query. They can be converted to pandas or polars DataFrames for ease of use
in downstream ML tasks.
Offline queries can receive input from a DataFrame, mapping, or SQL query. Regardless of the format, the primary key feature of the feature class must be included in the input.
The input parameter accepts chalk.DataFrame, pandas.DataFrame, or a mapping (like a Python dictionary). DataFrames should include one column for each known feature in the input. Mappings should map from each feature to a list of its values:
input={
User.id: ['id1', 'id2'],
User.age: [23, 40]
}
Timestamps can be passed in the input_times argument. If none are provided, Chalk will use the current time.
input={
User.id: ['id1', 'id1'],
}
input_times=[datetime.now() - timedelta(days=1), datetime.now() - timedelta(days=2)]
offline_query
can also take spine_sql_query as an alternative
input parameter for retrieving feature values from your offline data store.
Spine SQL queries are recommended in the following scenarios:
Spine SQL queries accept features from multiple feature namespaces as columns of the result. Each column must either correspond to an existing feature or be included in the output list. For each referenced feature namespace, the feature namespace’s primary key must be included as a column.
The “ts” column is always interpreted as the query execution time for the row. See our documentation on temporal consistency for more details.
# This spine_sql_query queries the table `transactions` in the Snowflake offline store instance.
output = chalk_client.offline_query(
spine_sql_query=f"""
SELECT
t.txn_time AS ts,
t.seller_id AS "seller.id",
t.buyer_id AS "buyer.id",
t.amount AS "txn.amount",
t.payment_type AS "txn.payment_type"
FROM transactions AS t
WHERE t.update_at >= {now - timedelta(days=30)}
""",
outputs=[
Seller.id,
Buyer.id,
Buyer.account_created_date,
Txn.payment_type,
# Computed in the seller namespace from the 'seller.id' spine feature.
Seller.recent_transactions_volume,
# Computed in the buyer namespace from the 'buyer.id' spine feature.
Buyer.total_spent_last_30d,
# Passed through from the SQL query
Txn.amount,
],
)
This argument describes a list of features to sample.
output=[
User.returned_transactions_last_60,
User.user_account_name_match_score,
User.socure_score,
User.identity.has_verified_phone,
User.identity.is_voip_phone,
User.identity.account_age_days,
User.identity.email_age,
]
Users can request that features (or a subset of features) be recomputed by resolvers at query time instead of sampled from the offline store. For more information, see recompute_features.
In some cases, users may not have a list of primary keys to sample with,
and instead would like to see results within a period of time.
The user can then leave the inputs
argument empty and supply a
lower bound and an
upper bound along with the requested output features.
The parameter max_samples
can also be used to limit the number of rows returned.
dataset: Dataset = ChalkClient().offline_query(
output=[
User.id,
User.fullname,
User.email,
User.name_email_match_score,
],
lower_bound=datetime.now() - timedelta(days=7),
upper_bound=datetime.now(),
)
The user can specify
tags,
environment,
or branch
as parameters to offline_query
in the same fashion
as online query.
Like online queries,
offline queries can be debugged using the explain
flag, which logs the query execution plan, and
the store_plan_stages
flag, which stores the inputs and outputs of each of the plan stages,
which will be visible in the web dashboard’s query detail view.
In addition, the offline query output is a Dataset object that you can inspect within the dashboard using the Output Explorer. With the Output Explorer you can run SQL queries on the output to do more testing and validation. More on these and other tips here.
Offline queries that run on a large number of rows require their own compute resources and infrastructure
to ensure they complete and do not contend with resources from other workloads like the engine or branch server.
Chalk handles this by launching a Kubernetes job that runs asynchronously on dedicated compute nodes.
To do so, set the run_asynchronously
flag to True
in the offline_query
method.
from chalk.client import ChalkClient
client = ChalkClient()
client.offline_query(
input={'user.id': range(1_000_000)},
output=['user.name'],
run_asynchronously=True,
)
If you’d like to adjust the compute or memory resources for the job,
set the resources
parameter to a custom ResourceRequests
object.
Ensure that the request values are compatible with the resources available on Kubernetes cluster.
Another option to consider is to parallelize the computation of the offline query by dividing the input into shards.
If the num_shards
parameter is set,
Chalk will split the input into num_shards
shards and run each shard in a separate pod.
The num_workers
parameter sets the maximum number of pods that can run concurrently.
If the number of shards is large compared to the size of the Kubernetes cluster,
set the num_workers
parameter to a smaller number to ensure that Chalk can launch enough pods to run the query.
from chalk.client import ChalkClient
client = ChalkClient()
client.offline_query(
input={'user.id': range(1_000_000)},
output=['user.name'],
run_asynchronously=True,
num_shards=50,
num_workers=10,
)
For the above example, Chalk will split the input spine into 50 shards of 20,000 rows each. It will then launch jobs for each shard, while not exceeding 10 concurrent pods at once.
If you lose track of the state of a long-running offline query, you’ll always be able to retrieve the outputs later.
First, check the offline query dashboard: you’ll be able to download all shards of the output,
individually or as a group, there.
You can also retrieve the outputs via Python with the ChalkClient.get_dataset
method:
simply copy and paste the Query Id
from the offline query dashboard into your function call.
from chalk.client import ChalkClient
client = ChalkClient()
dataset = client.get_dataset("21d9ceb7-c1ef-4c08-a39c-f8381e825a66") # your query id
df = dataset.get_data_as_pandas()