Metaplanning is a feature that automates shard assignments for select offline queries.

This feature is automatically enabled by default for scheduled queries that have num_shards set. Contact our support team to enable metaplanning for all scheduled queries in your environments, including ones without an explicit num_shards set.

Any async offline query can be metaplanned by setting use_metaplaner=True in the API invocation.


How It Works

Offline queries selected for metaplanning go through a metaplanning workflow:

Offline Query Submitted
Step 1: Metaplan
Shard on:
Has inputs?
Inputs
OR
Has input SQL?
Results of input SQL
OR
No inputs?
Primary keys
Step 2: Autoshard
Input Data
100,000 rows
Count & Divide
by 10,000
Job 1
Job 2
...
Job 10
Step 3: Sharded Query
Shard 1
10k rows
Shard 2
10k rows
...
Shard N
10k rows
All shards run simultaneously

Example: A query with 100,000 rows and the default target of 10,000 rows per shard creates 10 parallel shard jobs.


Configuration

The shard size can be controlled via the CHALK_AUTOSHARDER_TARGET_ROWS_PER_SHARD environment variable (default: 10,000 rows per shard).

For scheduled queries:

  • If num_shards is set, metaplanning is automatically engaged.
  • If num_shards is not set, metaplanning can still be engaged as an environment setting.

For async offline queries:

  • If use_metaplanner=True, the query will be metaplanned.

Scheduled query example

ScheduledQuery(
    name="daily_user_scores",
    outputs=[User.id, User.score],
    schedule="0 0 * * *",
)

With metaplanning enabled, this query will:

  1. Compute shard keys to find all User IDs (since no input is specified)
  2. Autoshard the IDs based on the configured target
  3. Execute all shards in parallel

Async offline query examples

client.offline_query(
    input_sql='SELECT "user.id" FROM "chalk.resolvers.list_users"',
    outputs=[User.id, User.score],
    recompute_features=True,
    run_asynchronously=True,
    use_metaplanner=True,
)

This query will:

  1. Compute shard keys to find all User IDs by executing the input_sql.
  2. Autoshard the IDs based on the configured target
  3. Execute all shards in parallel
client.offline_query(
    max_samples=1000,
    outputs=[User.id, User.score],
    recompute_features=True,
    run_asynchronously=True,
    use_metaplanner=True,
)

This query will:

  1. Compute shard keys to find 1000 User IDs.
  2. Autoshard the IDs based on the configured target
  3. Execute all shards in parallel

See Also