Chalk home page
Docs
API
CLI
  1. Queries
  2. Offline Queries

Offline queries pull data from the offline store or calculate features through resolvers that are marked as offline.

Offline queries can also execute online resolvers if no offline resolver is available for a requested feature.

In an offline query, you can request features for multiple entities at distinct time points. By default, an offline query returns a row for each primary key containing the most recent computed value for each requested output feature. The main use case for offline queries is creating datasets.

Chalk supports a number of clients for offline queries. In this section, our examples use our Python client, which integrates nicely with Jupyter notebooks.

Making offline queries

As mentioned earlier, offline queries can be made through one of Chalk’s API clients.

from chalk.client import ChalkClient
from datetime import datetime

client = ChalkClient()
dataset = client.offline_query(
  input={'user.id': [1,2,3,4]},         # Input
  output=['user.name'],                 # Output
  # tags=['test'],                      # Environment
  # branch='branch',
  # recompute_features=True,            # Run Resolvers
  # run_asynchronously=True,            # Run Configuration
  # max_samples = 10,
  # lower_bound=datetime(2024, 10, 12), # Bounds
  # upper_bound=datetime(2024, 10, 20),
)
df = dataset.get_data_as_pandas()

Offline queries return Chalk Datasets, which we cover in more detail in the next section. At a high level, Chalk Datasets are flexible wrappers for the results of your offline query. They can be converted to pandas or polars DataFrames for ease of use in downstream ML tasks.

Input

Offline queries can receive input from a DataFrame, mapping, or SQL query. Regardless of the format, the primary key feature of the feature class must be included in the input.

DataFrames and mappings as input

The input parameter accepts chalk.DataFrame, pandas.DataFrame, or a mapping (like a Python dictionary). DataFrames should include one column for each known feature in the input. Mappings should map from each feature to a list of its values:

input={
    User.id: ['id1', 'id2'],
    User.age: [23, 40]
}

Input times

Timestamps can be be passed in the input_times argument. If none are provided, Chalk will use the current time.

input={
    User.id: ['id1', 'id1'],
}
input_times=[datetime.now() - timedelta(days=1), datetime.now() - timedelta(days=2)]

SQL queries as input

offline_query can also take spine_sql_query as an alternative input parameter for retrieving feature values from your offline data store.

Spine SQL queries are recommended in the following scenarios:

  1. You want to retrieve data for multiple rows at once. Chalk will compute an efficient query plan for loading multiple rows of data at once. Chalk will also reuse data between rows where appropriate.
  2. You want to query from Chalk as your offline data store. You can reduce unnecessary back-and-forth requests by having Chalk execute the SQL query and handle the result rows directly.
  3. You want to request features from multiple feature namespaces for each row of output.

Spine SQL queries accept features from multiple feature namespaces as columns of the result. Each column must either correspond to an existing feature or be included in the output list. For each referenced feature namespace, the feature namespace’s primary key must be included as a column.

The “ts” column is always interpreted as the query execution time for the row. See our documentation on temporal consistency for more details.

# This spine_sql_query queries the table `transactions` in the Snowflake offline store instance.
output = chalk_client.offline_query(
    spine_sql_query=f"""
        SELECT
            t.txn_time AS ts,
            t.seller_id AS "seller.id",
            t.buyer_id AS "buyer.id",
            t.amount AS "txn.amount",
            t.payment_type AS "txn.payment_type"
        FROM transactions AS t
        WHERE t.update_at >= {now - timedelta(days=30)}
    """,
    outputs=[
        Seller.id,
        Buyer.id,
        Buyer.account_created_date,
        Txn.payment_type,
        # Computed in the seller namespace from the 'seller.id' spine feature.
        Seller.recent_transactions_volume,
        # Computed in the buyer namespace from the 'buyer.id' spine feature.
        Buyer.total_spent_last_30d,
        # Passed through from the SQL query
        Txn.amount,
    ],
)

Output

This argument describes a list of features to sample.

output=[
    User.returned_transactions_last_60,
    User.user_account_name_match_score,
    User.socure_score,
    User.identity.has_verified_phone,
    User.identity.is_voip_phone,
    User.identity.account_age_days,
    User.identity.email_age,
]

Recompute features

Users can request that features (or a subset of features) be recomputed by resolvers at query time instead of sampled from the offline store. For more information, see recompute_features.

Timebounds

In some cases, users may not have a list of primary keys to sample with, and instead would like to see results within a period of time. The user can then leave the inputs argument empty and supply a lower bound and an upper bound along with the requested output features. The parameter max_samples can also be used to limit the number of rows returned.

dataset: Dataset = ChalkClient().offline_query(
     output=[
         User.id,
         User.fullname,
         User.email,
         User.name_email_match_score,
     ],
     lower_bound=datetime.now() - timedelta(days=7),
     upper_bound=datetime.now(),
)

Environment

The user can specify tags, environment, or branch as parameters to offline_query in the same fashion as online query.

Debugging offline queries

Like online queries, offline queries can be debugged using the explain flag, which logs the query execution plan, and the store_plan_stages flag, which stores the inputs and outputs of each of the plan stages, which will be visible in the web dashboard’s query detail view. More on these and other tips here.

Running large offline queries

Offline queries that run on a large number of rows require their own compute resources and infrastructure to ensure they complete and do not contend with resources from other workloads like the engine or branch server. Chalk handles this by launching a Kubernetes job that runs asynchronously on dedicated compute nodes. To do so, set the run_asynchronously flag to True in the offline_query method.

from chalk.client import ChalkClient

client = ChalkClient()
client.offline_query(
  input={'user.id': range(1_000_000)},
  output=['user.name'],
  run_asynchronously=True,
)

If you’d like to adjust the compute or memory resources for the job, set the resources parameter to a custom ResourceRequests object. Ensure that the request values are compatible with the resources available on Kubernetes cluster.

Another option to consider is to parallelize the computation of the offline query by dividing the input into shards. If the num_shards parameter is set, Chalk will split the input into num_shards shards and run each shard in a separate pod. The num_workers parameter sets the maximum number of pods that can run concurrently. If the number of shards is large compared to the size of the Kubernetes cluster, set the num_workers parameter to a smaller number to ensure that Chalk can launch enough pods to run the query.

from chalk.client import ChalkClient

client = ChalkClient()
client.offline_query(
  input={'user.id': range(1_000_000)},
  output=['user.name'],
  run_asynchronously=True,
  num_shards=50,
  num_workers=10,
)

For the above example, Chalk will split the input spine into 50 shards of 20,000 rows each. It will then launch jobs for each shard, while not exceeding 10 concurrent pods at once.

If you lose track of the state of a long-running offline query, you’ll always be able to retrieve the outputs later. First, check the offline query dashboard: you’ll be able to download all shards of the output, individually or as a group, there. You can also retrieve the outputs via Python with the ChalkClient.get_dataset method: simply copy and paste the Query Id from the offline query dashboard into your function call.

from chalk.client import ChalkClient

client = ChalkClient()
dataset = client.get_dataset("21d9ceb7-c1ef-4c08-a39c-f8381e825a66") # your query id
df = dataset.get_data_as_pandas()

Running offline queries on a schedule