Chalk home page
Docs
SDK
CLI
  1. Integrations
  2. SQL Integration

Chalk can ingest data using a SQL interface from any of our supported SQL data source integrations. The full list of supported SQL data sources can be found in our API reference.

Basic queries

Chalk supports running SQL from files or from strings. When you run queries that invoke your SQL resolver, Chalk automatically pushes your input parameters into the WHERE clause of your query. For more details, see our section on push-down filters.

The examples on this page use our PostgresQL data source, but can be generalized to any of our other SQL data sources.

SQL file resolvers

SQL file resolvers are Chalk’s recommended method for writing SQL resolvers.

get_user.chalk.sql
-- type: online
-- resolves: user
-- source: PG
select id, email, full_name from user_table

Here, we define an online resolver that returns some features for the User feature class from the user_table table in the PostgreSQL source PG. The comments are yaml-parsed to provide other metadata for Chalk to decide how to design the resolver. SQL file resolvers can return multiple rows for aggregation operations, offline queries, and more.

It’s also possible to use SELECT * in a SQL file resolver, but be careful!

get_all_columns_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
select * from user_table

Implicitly, this query tries to align every scalar feature from the User feature class to a column name in user_table. If a feature name is misnamed or absent from the table, you’ll get a “missing columns” error.

To programmatically generate SQL file resolvers, check out Generated SQL file resolvers.

Parameterized feature inputs

Like other resolvers, SQL file resolvers can take features as input. In this example, we want our resolver to require EmailUser.id as input:

-- type: online
-- resolves: user
-- source: PG
select id, email, full_name
from user_table
where id = ${user.id}

Use ${} with snake case to reference the desired feature.

Use ${now} in your query as a special argument representing the time of the query. For more details, see our time documentation.

SQL strings

You can run SQL queries either as Python strings or from .sql files.

When the name of the column matches the name of the feature with non-alphanumeric characters removed, the mapping from column to feature is automatic.

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string("select full_name, email from users where id=1")
        .first()
    )

get_user’s return type indicates that it expects features named full_name and email, which are returned as columns from the SQL query.

If the column names don’t align exactly, you can include the parameter fields to specify the mapping from the query to the fields.

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string(
            "select name, email from users where id=1",
            fields=dict(name=User.full_name),
        )
        .first()
    )

Here, the email column of the query automatically aligns with the expected User.email feature, but the name column of the query is explicitly mapped to the User.full_name feature.

Parameterizing string queries

You can parameterize queries to pass variables. Parameterize names with a colon, and pass a dictionary from parameter name to parameter value:

pg.query_string(
    query="select * from users where user_id=:user_id",
    args=dict(user_id="uid123"),
)

Use a backslash to escape any literal : characters you need to use in your query:

pg.query_string(
    query="select * from users where user_id=:user_id and name='\:colon'",
    args=dict(user_id="uid123"),
)

SQL string files

Instead of passing a string directly into your Python code, you can load the SQL content from a file. you can use the query_sql_file function.

For example, here is a query.sql file containing the same query from above:

query.sql
select * from users where user_id=:user_id

You can reference this file in a Python resolver, either using the absolute path from the root of your project or relative to the resolver’s file.

For example, if the snippet below lived in the same directory as query.sql, we could refer to it as follows:

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_sql_file(
            path="query.sql",
            args=dict(user_id=uid)
        )
        .first()
    )

Auto-mapping of column name to feature name also applies for the query_sql_file method. Parameterization also works the same way.

Push-down filtering

Chalk automatically adds query inputs to the WHERE clause of your SQL queries. We recommend relying on our push-down filtering logic rather than parameterizing your queries by hand.

The following example will show a transaction feature and how we use push-down filtering to retrieve transactions belonging to a specific category.

Here’s the feature class:

pg = PostgreSQLSource(name='PG')

@features
class Transaction:
    id: int
    category: string
    amount: float

And the SQL file resolver:

-- type: online
-- resolves: transaction
-- source: PG
SELECT
    id, merchant_category AS category, amount
FROM
    txn_table;

Finally, here is our query for grocery transactions:

client = ChalkClient()
client.query(
    input={
        Transaction.id: [1, 2, 3, 4],
        Transaction.category: "Groceries",
    },
    output=[
        Transaction.id,
        Transaction.amount,
    ],
)

When this query is run, Chalk adds each input parameter to the WHERE clause, effectively running this query against your database:

SELECT
  id, merchant_category AS category, amount
FROM
  txn_table
WHERE
    id IN (1, 2, 3, 4)
    AND merchant_category = "Groceries";

If your database column names differ from your feature names, the column name must be aliased in the SELECT clause of your query, as shown above with the merchant_category column.

Configuration

Supported SQL file resolver comment keys

The following are supported keys that may be included in .chalk.sql file comments.

SQL File Resolvers
resolvesstr
Describes the namespace to which the outputs belong. In the above example, user.emailand user.full_name are the outputs.
sourcestr
Describes the database by name, as in the above example, or the type if there is only one database of that type. Thus, if you have one PostgreSQL source, you can also write source: postgresql.
type"online" | "offline" | "streaming" | null
The type of resolver. If not specified, online is the default.
incrementaldict[str, str]
Parameters for incremental queries. For more information, see the below section on incremental queries.
incremental query parameters
mode"row" | "group" | "parameter"
The incrementalization mode decides how to ingest new data. Defaults to "row".
lookback_periodstring
The length of the window from the last observed row that Chalk will re-ingest, e.g. 1h. Defaults to 0.
incremental_columnstring
The timestamp column in the underlying table to use as the basis for incrementalization. Must be supplied in row and group modes.
incremental_timestamp"feature_time" | "resolver_execution_time"
The timestamp used for timedelta calculations. Defaults to feature_time.
countLiteral[1, "one", "one_or_none"]
Returns one. Equivalent to the common query finalizer .one().
timeoutDuration | null
The maximum time to wait before timing out the query. See Duration for more details.
tagslist[str] | null
The user tags associated with this resolver. For online and offline resolvers.
environmentlist[str] | null
The environments associated with this resolver.
cronstr | null
The schedule for a cron run, e.g. 1h.
max_stalenessstr | null
The max staleness for the resolver, e.g. 24h.
ownerstr | null
The owner of the resolver.
fieldsdict[str, str]
An optional mapping from SQL column to Chalk feature. For example, with SELECT name AS arbitrary_column_name, we can map the arbitrary_column_name to a Chalk feature belonging to the namespace described by the resolves field with the mappingarbitrary_column_name: chalk_feature_name.
unique_onlist[str]
A list of features that must be unique for each row of the output. This enables unique optimizations in the resolver execution. Only applicable to resolvers that return a DataFrame.
partitioned_bylist[str]
A list of features that correspond to partition keys in the data source. This field indicates that this resolver executes its query against a data storage system that is partitioned by a particular set of columns. This is most common with data-warehouse sources like Snowflake, BigQuery or Databricks.

Proper comment formatting

All comments must be inserted before the body of the sql query. Each comment line is parsed as either a yaml-formatted line describing the resolver or a docstring. Below, the last comment will appear as a docstring since it is not in key:value format.

get_all_columns_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
-- This comment is not in yaml format, so it will be parsed as a docstring
select * from user_table

For field values that can be lists or dictionaries, such as tags or incremental settings, we can either enumerate the values inline or with an extra indentation. Remember, if your values include a colon, you must use quotes around your value in order for the line to have valid YAML format.

Both of the following formats are valid and equivalent.

get_all_columns_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
-- tags: ["single:1", "double:2"]
select * from user_table
get_all_columns_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
-- tags:
--    - single:1
--    - double:2
select * from user_table

Streaming SQL file resolvers

Chalk supports streaming with SQL file resolvers:

get_store_features.chalk.sql
-- source: kafka
-- resolves: store_features
-- type: streaming
select store_id as id, sum(amount) as purchases
from topic
group by 1

SQL linting configuration

If you are using SQLFluff or another SQL Linter, you may need to set configurations to accept the variable pattern. For SQLFluff, set the templater to placeholder and add the following to your config file.

# Allows sqlfluff to correctly parse
# ${arbitrary_placeholder_name}

[sqlfluff:templater:placeholder]
param_regex =\$\{[^}]*\}
1 = 'arbitrary_placeholder_name'

Generated SQL file resolvers

You can programmatically generate SQL resolvers with make_sql_file_resolver. This function is useful if you have many SQL tables and want to automate management of their resolvers.

from chalk import make_sql_file_resolver
from chalk.sql import PostgreSQLSource

pg = PostgreSQLSource(name='PG')

definitions = [
    {
        resolver_name: "get_user_features",
        entity_name: "User",
        table: "user_table",
        pkey_column: "id",
        features: ["feature1", "feature2"],
    },
    ...
]

for definition in definitions:
    targets = ", ".join(definition.features)

    make_sql_file_resolver(
        name=definition.resolver_name,
        sql=f"select {definition.pkey_column}, {targets} from {definition.table}",
        source=pg, # "PG" is also acceptable
        resolves=definition.entity_name,
    )

make_sql_file_resolver adds this resolver to your registry as if it were a SQL file resolver, but without creating the .chalk.sql file.

All SQL file resolvers require source and resolves. These values can be provided as SQL comments within your sql value or as parameters. If comments and parameters are both provided, parameters will take precedence. This function call is equivalent to the previous example:

make_sql_file_resolver(
    name=definition.resolver_name,
    sql=f"""
        -- source: PG
        -- resolves: {definition.entity_name}
        select {definition.pkey_column}, {targets} from {definition.table}
    """,
)

Incremental queries

Overview

The first time that a resolver with an incremental query is executed, Chalk ingests all data from the source. On subsequent runs of the resolver, Chalk only looks for new rows in the table to ingest. Using incremental queries limits the amount of data that Chalk needs to ingest, lowering latency for updates and reducing costs.

Incremental queries are useful for ingesting immutable tables or queries, like event tables or logs. This type of data is frequently found in the offline context, as it represents logs of real-time events.

Incremental queries use the incremental_column parameter to page over the underlying table.

Incremental queries with SQL file resolvers

Imagine you have a login events table, where you keep track of login attempts to your website. You can ingest this table with a SQL file resolver as follows:

incremental_query.chalk.sql
-- type: offline
-- resolves: login
-- source: PG
-- incremental:
--   mode: row
--   lookback_period: 60m
--   incremental_column: attempted_at
select attempted_at, status, user_id from logins

Configuration for incremental resolvers can be passed in the incremental dictionary of the file’s comments. For more details, see the configuration reference above.

Incremental queries with SQL strings

To use incremental mode with SQL string resolvers, use .incremental along with the incremental_column parameter.

pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.attemped_at, Login.user_id, Login.status]:
    return (
        pg.query_string("select attempted_at, status, user_id from logins")
          .incremental(incremental_column="attempted_at")
    )

Handling late-arriving messages

If your underlying data source has “late arriving records”, you may need to use the lookback_period argument to incremental. When lookback_period is specified, Chalk subtracts the lookback_period from the “maximum observed timestamp” that it uses as a lower-bound for ingestion.

Concretely, if your resolver body looks like this:

db.query_string("SELECT * FROM payments")
    .incremental(incremental_column="updated_at", lookback_period="30m")

then Chalk will rewrite your SQL query to:

SELECT * FROM payments
WHERE updated_at >= (<previous_max_updated_at> - <lookback_period>)

This means that rows that arrive up to 30 minutes late will be properly ingested. The trade-off is that Chalk will re-ingest some redundant data.

Managing incremental resolvers

Each incremental resolver tracks the timestamp of its latest ingested row in an internal property called chalk_incremental_timestamp. This property can be referenced within your SQL query when using parameter as your incrementalization mode.

The Chalk CLI provides several commands for managing your incremental resolvers:

Incrementalization modes

The default incrementalization mode for .incremental is mode='row'. Three modes are supported:

  • row: Chalk ingests features from all rows whose incremental_column is newer than the previously observed max timestamp.
  • group: Chalk ingests features from all groups who are aggregating a row that has been added or changed since the previously observed max timestamp.
  • parameter: Chalk passes the chalk_incremental_timestamp value (including lookback_period) to your query, and leaves your query unchanged.

"Group" incremental mode

Group mode incremental ingestion is appropriate when you are aggregating rows in a table in order to compute features.

For example, if you are running the following query:

SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM
    payments
GROUP BY
    1

to ingest features for this feature class:

@features
class Business:
    id: int
    sum_payments_amount: float
    count_payments: int
    ts: FeatureTime

then you can specify the following resolver:

@offline(...)
def resolve_aggregate_payment_features() -> DataFrame[Business]:
    query = """
        SELECT
            business_id,
            SUM(amount) as sum_payments_amount,
            COUNT(*) as count_payments,
            max(updated_at) as ts
        FROM
            payments
        GROUP BY
            1
    """

    return db.query_string(query, fields={"business_id": Business.id}) \
                .incremental(incremental_column="updated_at", mode="group")

and Chalk will automatically rewrite your query into this form:

SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM payments
WHERE business_id IN (
    SELECT DISTINCT(business_id) FROM payments
    WHERE updated_at >= :chalk_incremental_timestamp
)
GROUP BY 1

This means that if you have a payments table like this:

| id | business_id | amount | updated_at               |
| 1  | 1           | 10     | 2022-11-01T00:00:00.000Z |
| 2  | 1           | 5      | 2022-11-15T00:00:00.000Z |
| 3  | 2           | 7      | 2022-11-15T00:00:00.000Z |
| 4  | 3           | 17     | 2022-10-01T00:00:00.000Z |

and your query had previously run on 2022-11-07, then Chalk would return the following aggregate values:

| business_id | sum_payments_amount | count_payments | ts                       |
| 1           | 15                  | 2              | 2022-11-01T00:00:00.000Z |
| 2           | 7                   | 1              | 2022-11-15T00:00:00.000Z |

Both business 1 and 2 are present, because they have at least one payment after 2022-11-07. Business 3 is excluded, since it has no payments that after 2022-11-07.

"Parameter" incremental mode

In parameter incremental mode, Chalk leaves your query untouched. Chalk will simply pass the max incremental timestamp to your query as a bind parameter named chalk_incremental_timestamp.

Concretely, if you write:

@offline(...)
def parameter_incremental_mode_resolver() -> DataFrame[...]:
    return (
        db.query_string("SELECT * FROM payments WHERE updated_at >= :chalk_incremental_timestamp")
           .incremental(mode="parameter")
    )

Then Chalk will execute your query verbatim, and will keep track of the appropriate value for chalk_incremental_timestamp between executions of your resolver.

Incremental interaction with FeatureTime

When Chalk executes an incremental query, it has to update the “max timestamp” value that it will use as the lower bound for the next query. By default, Chalk sets this value to the time at the start of the query.

If your resolver returns a FeatureTime feature, Chalk will update the “max timestamp” value to the “max” FeatureTime value that is returned instead. This allows you to control the incremental behavior more precisely.

Tagged SQL sources

Chalk supports applying tags to SQL sources. This allows you to define a single resolver that routes traffic to multiple different backing databases depending on tags supplied at query time. This is useful for limiting the blast-radius of traffic from different use-cases.

First, define a SQL source group:

from chalk.sql import SQLSourceGroup, PostgreSQLSource

sql_group = SQLSourceGroup(
    name='primary_group',
    default=PostgreSQLSource(name="default_replica"),
    tagged_sources={
        'risk': PostgreSQLSource(name='risk_replica'),
        'analytics': PostgreSQLSource(name='analytics_replica'),
    }
)

Then, define a resolver that uses the group:

@online
def users() -> DataFrame[User]:
    return sql_group.query_string("select id, name from users").all()

Then, when you submit queries, the query tags will control which data source is used to execute the query:

client = ChalkClient()

# This query uses the risk datasource
client.query(input={User.id: 1}, output=[User.name], tags=['risk'])

# This query uses the analytics data source

client.query(input={User.id: 1}, output=[User.name], tags=['analytics'])

# This query uses the default datasource
client.query(input={User.id: 1}, output=[User.name])

Additional query options

SQLAlchemy

Chalk supports SQLAlchemy:

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.email, User.full_name]:
    return (
        pg.query(User(email=UserSQL.email, full_name=UserSQL.name))
        .filter_by(id=uid)
        .first()
    )

In the .query(...) call, you map the target columns of the SQL statement to the feature namespace. Here, we assign User.email to UserSQL.email and User.full_name to UserSQL.name.

Incremental queries with SQLAlchemy

To create an incremental SQLAlchemy query, use .incremental. Chalk will page over the underlying table using the column mapped to FeatureTime.

pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.ts, Login.attemped_at, Login.user_id, Login.status]:
    return pg.query(
        Login(
            ts=LoginHistorySQL.created_at,
            attempted_at=LoginHistorySQL.attempted_at,
            user_id=LoginHistorySQL.user_id,
            status=LoginHistorySQL.status,
        )
    ).incremental()