Chalk home page
Docs
API
CLI
  1. Integrations
  2. SQL Integration

Chalk can ingest your data using a SQL interface from any of the integrations that support it. You can describe your queries using SQL strings or SQLAlchemy. Offline, event tables can be ingested incrementally.

Basic queries

There are multiple ways to construct resolvers using SQL. It’s up to you to decide which best fits your workflow.

Here, instead of creating a session and engine, you use a PostgreSQLSource (or any of the SQL-supporting sources in the docs) as the base of your queries: Suppose we have a PostgreSQL source PG defined as follows.

pg = PostgreSQLSource(name='PG')

SQL File Resolvers

This is the preferred method of specifying a resolver. Only a single .chalk.sql SQL file is needed: no Python required!

get_user.chalk.sql
-- type: online
-- resolves: user
-- source: PG
-- count: 1
select email, full_name from user_table where id=${user.id}

Here, the ${user.id} variable defines the inputs of the query, and user_table matches the PostgreSQL table name. The comments are yaml-parsed to provide other metadata for Chalk to decide how to design the resolver.

SQL file resolvers can also return multiple rows for aggregation operations, offline queries, and more. Notice below the omissions of the count argument and the where clause of the SQL query.

get_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
select email, full_name from user_table

It’s possible to use SELECT * in a SQL file resolver, but be careful!

get_all_columns_users.chalk.sql
-- type: online
-- resolves: user
-- source: PG
select * from user_table

Implicitly, this tries to align every scalar feature from the User feature set to a column name in user_table. If a feature name is misnamed or absent from the table, you’ll get a “missing columns” error.

Configuration

The following are supported keys to be included in .chalk.sql file comments.

SQL File Resolvers
resolvesstr
Describes the namespace to which the outputs belong. In the above example, user.emailand user.full_name are the outputs.
sourcestr
Describes the database by name, as in the above example, or the type if there is only one database of that type. Thus, if you have one PostgreSQL source, you can also write source: postgresql. Supported sources are ['snowflake', 'postgres', 'postgresql', 'mysql', 'bigquery', 'cloudsql', 'redshift', 'sqlite', 'kafka'].
type"online" | "offline" | "streaming" | null
The type of resolver. If not specified, online is the default.
incrementaldict[str, str]
Parameters for incremental queries. For more information, see the below section on incremental queries.
incremental query parameters
mode"row" | "group" | "parameter"
The incrementalization mode decides how to ingest new data.
lookback_periodstring
The length of the window from the last observed row that Chalk will re-ingest, e.g. 1h.
incremental_columnstring
The timestamp column in the underlying table to use as the basis for incrementalization. Must be supplied in row and group modes.
count"1" | null
Returns one. Equivalent to the common query finalizer .one().
timeoutDuration | null
The maximum time to wait before timing out the query. See [Duration](/api-docs#Duration) for more details.
tagslist[str] | null
The user tags associated with this resolver. For online and offline resolvers.
environmentlist[str] | null
The environments associated with this resolver.
cronstr | null
The schedule for a cron run, e.g. 1h.
max_stalenessstr | null
The max staleness for the resolver, e.g. 24h.
ownerstr | null
The owner of the resolver.
fieldsdict[str, str]
An optional mapping from SQL column to Chalk feature. For example, with SELECT name AS arbitrary_column_name, we can map the arbitrary_column_name to a Chalk feature belonging to the namespace described by the resolves field with the mappingarbitrary_column_name: chalk_feature_name.

Chalk also supports SQL file streaming resolvers:

get_store_features.chalk.sql
-- source: kafka
-- resolves: store_features
-- type: streaming
select store_id as id, sum(amount) as purchases
from topic
group by 1

SQL Linting Configuration

If you are using SQLFluff or another SQL Linter, you may need to set configurations to accept the variable pattern. For SQLFluff, set the templater to placeholder and add the following to your config file.

# Allows sqlfluff to correctly parse
# ${arbitrary_placeholder_name}

[sqlfluff:templater:placeholder]
param_regex =\$\{[^}]*\}
1 = 'arbitrary_placeholder_name'

Reading from SQL Sources (SQLAlchemy)

Chalk has explicit support for SQLAlchemy, letting you write queries in a familiar fashion.

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.email, User.full_name]:
    return (
        pg.query(User(email=UserSQL.email, full_name=UserSQL.name))
        .filter_by(id=uid)
        .first()
    )

In the .query(...) call, you map the target columns of the SQL statement to the feature namespace. Here, we assign User.email to UserSQL.email and User.full_name to UserSQL.name.

Reading from SQL Sources (SQL strings)

You can also write SQL queries as strings in Python. When the name of the column matches the name of the feature with non-alphanumeric characters removed, the mapping from column to feature is automatic.

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string("select full_name, email from users where id=1")
        .first()
    )

Here, the resolver expects features named full_name and email, both of which are columns in the response for the SQL query.

If the column names don’t align exactly, you can include the parameter fields to specify the mapping from the query to the fields.

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_string(
            "select name, email from users where id=1",
            fields=dict(name=User.full_name),
        )
        .first()
    )

Here, the email column of the query automatically aligns with the expected User.email feature, but the name column of the query is explicitly mapped to the User.full_name feature.

Parameterizing string queries

You can also use parameterized queries with Chalk. Parameterize names with a colon, and pass a dictionary from parameter name to parameter value:

pg.query_string(
    query="select * from users where user_id=:user_id",
    args=dict(user_id="uid123"),
)

Use a backslash to escape any literal : characters you need to use in your query:

pg.query_string(
    query="select * from users where user_id=:user_id and name='\:colon'",
    args=dict(user_id="uid123"),
)

Python resolvers with SQL Sources (.sql files)

Finally, if you want to join your sql files with Python resolvers, you can use the query_sql_file function.

For example, if you have a sql query defined in query.sql:

query.sql
select * from users where user_id=:user_id

You can reference this file in a Python resolver, either using the absolute path from the root of your project or relative to the resolver’s file.

For example, if the snippet below lived in the same directory as query.sql, we could refer to it as follows:

pg = PostgreSQLSource(name='PG')

@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
    return (
        pg.query_sql_file(
            path="query.sql",
            args=dict(user_id=uid)
        )
        .first()
    )

Auto-mapping of column name to feature name also applies for the query_sql_file method.

Incremental queries

Why incremental?

The first time that a resolver with an incremental query is executed, Chalk ingests all data from the source. On subsequent runs of the resolver, Chalk only looks for new rows in the table to ingest. Using incremental queries limits the amount of data that Chalk needs to ingest, lowering latency for updates and reducing costs.

When to use

Incremental queries are useful for ingesting immutable tables or queries, like event tables or logs. This type of data is frequently found in the offline context, as it represents logs of real-time events.

Using incremental queries (SQLAlchemy)

Imagine you have a login events table, where you keep track of login attempts to your website. You can ingest this table as follows:

pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.ts, Login.attemped_at, Login.user_id, Login.status]:
    return pg.query(
        Login(
            attempted_at=LoginHistorySQL.attempted_at,
            status=LoginHistorySQL.status,
            ts=LoginHistorySQL.created_at,
            user_id=LoginHistorySQL.user_id,
        )
    ).incremental()

Incremental queries need to map a feature-time feature (above as Login.ts). Using .incremental is equivalent to .all, except Chalk can page over the underlying table using the column mapped to the feature_time feature.

Incremental queries with SQL File Resolvers

With Chalk SQL File resolvers, you can describe your incremental parameters in typical YAML format.

incremental_query.chalk.sql
-- type: offline
-- resolves: login
-- source: PG
-- incremental:
--   mode: row
--   lookback_period: 60m
--   incremental_column: attempted_at
select attempted_at, status, user_id from logins

Using incremental queries (SQL-strings)

Chalk also supports incremental queries with raw SQL strings. If you use .incremental with string queries, you must specify the incremental_column parameter.

pg = PostgreSQLSource(name='PG')

@offline
def fn() -> DataFrame[Login.attemped_at, Login.user_id, Login.status]:
    return (
        pg.query_string("select attempted_at, status, user_id from logins")
          .incremental(incremental_column="attempted_at")
    )

Handling late-arriving messages

If your underlying data source has “late arriving records”, you may need to use the lookback_period argument to incremental. When lookback_period is specified, Chalk subtracts the lookback_period from the “maximum observed timestamp” that it uses as a lower-bound for ingestion.

Concretely, if your resolver body looks like this:

db.query_string("SELECT * FROM payments")
    .incremental(incremental_column="updated_at", lookback_period="30m")

then Chalk will rewrite your SQL query to:

SELECT * FROM payments
WHERE updated_at >= (<previous_max_updated_at> - <lookback_period>)

This means that rows that arrive up to 30 minutes late will be properly ingested. The trade-off is that Chalk will re-ingest some redundant data.

Incrementalization modes

The default incrementalization mode for .incremental is mode='row'. Three modes are supported:

  • row: Chalk ingests features from all rows whose incremental_column is newer than the previously observed max timestamp.
  • group: Chalk ingests features from all groups who are aggregating a row that has been added or changed since the previously observed max timestamp.
  • parameter: Chalk passes the chalk_incremental_timestamp value (including lookback_period) to your query, and leaves your query unchanged.

"Group" incremental mode

Group mode incremental ingestion is appropriate when you are aggregating rows in a table in order to compute features.

For example, if you are running the following query:

SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM payments
GROUP BY 1

to ingest features for this feature class:

@features
class Business:
    id: int
    sum_payments_amount: float
    count_payments: int
    ts: FeatureTime

then you can specify the following resolver:

@offline(...)
def resolve_aggregate_payment_features() -> DataFrame[Business]:
    query = """
        SELECT
            business_id,
            SUM(amount) as sum_payments_amount,
            COUNT(*) as count_payments,
            max(updated_at) as ts
        FROM payments
        GROUP BY 1
    """

    return db.query_string(query, fields={"business_id": Business.id}) \
                .incremental(incremental_column="updated_at", mode="group")

and Chalk will automatically rewrite your query into this form:

SELECT
    business_id,
    SUM(amount) as sum_payments_amount,
    COUNT(*) as count_payments,
    max(updated_at) as ts
FROM payments
WHERE business_id IN (
    SELECT DISTINCT(business_id) FROM payments
    WHERE updated_at >= :chalk_incremental_timestamp
)
GROUP BY 1

This means that if you have a payments table like this:

| id | business_id | amount | updated_at               |
| 1  | 1           | 10     | 2022-11-01T00:00:00.000Z |
| 2  | 1           | 5      | 2022-11-15T00:00:00.000Z |
| 3  | 2           | 7      | 2022-11-15T00:00:00.000Z |
| 4  | 3           | 17     | 2022-10-01T00:00:00.000Z |

and your query had previously run on 2022-11-07, then Chalk would return the following aggregate values:

| business_id | sum_payments_amount | count_payments | ts                       |
| 1           | 15                  | 2              | 2022-11-01T00:00:00.000Z |
| 2           | 7                   | 1              | 2022-11-15T00:00:00.000Z |

Both business 1 and 2 are present, because they have at least one payment after 2022-11-07. Business 3 is excluded, since it has no payments that after 2022-11-07.

"Parameter" incremental mode

In parameter incremental mode, Chalk leaves your query untouched. Chalk will simply pass the max incremental timestamp to your query as a bind parameter named chalk_incremental_timestamp.

Concretely, if you write:

@offline(...)
def parameter_incremental_mode_resolver() -> DataFrame[...]:
    return (
        db.query_string("SELECT * FROM payments WHERE updated_at >= :chalk_incremental_timestamp")
           .incremental(mode="parameter")
    )

Then Chalk will execute your query verbatim, and will keep track of the appropriate value for chalk_incremental_timestamp between executions of your resolver.

Incremental interaction with FeatureTime

When Chalk executes an incremental query, it has to update the “max timestamp” value that it will use as the lower bound for the next query. By default, Chalk sets this value to the time at the start of the query.

If your resolver returns a FeatureTime feature, Chalk will update the “max timestamp” value to the “max” FeatureTime value that is returned instead. This allows you to control the incremental behavior more precisely.

Tagged SQL sources

Chalk supports applying tags to sql sources. This allows you to define a single resolver that routes traffic to multiple different backing databases depending on tags supplied at query time. This is useful for limiting the blast-radius of traffic from different use-cases.

First, define a SQL source group:

from chalk.sql import SQLSourceGroup, PostgreSQLSource

sql_group = SQLSourceGroup(
    name='primary_group',
    default=PostgreSQLSource(name="default_replica"),
    tagged_sources={
        'risk': PostgreSQLSource(name='risk_replica'),
        'analytics': PostgreSQLSource(name='analytics_replica'),
    }
)

Then, define a resolver that uses the group:

@online
def users() -> DataFrame[User]:
    return sql_group.query_string("select id, name from users").all()

Then, when you submit queries, the query tags will control which datasource is used to execute the query:

c = ChalkClient()

# This query uses the risk datasource
c.query(input={User.id: 1}, output=[User.name], tags=['risk'])

# This query uses the analytics datasource

c.query(input={User.id: 1}, output=[User.name], tags=['analytics'])

# This query uses the default datasource
c.query(input={User.id: 1}, output=[User.name])