Integrations
Integrate with SQL-like sources.
Chalk can ingest data using a SQL interface from any of our supported SQL data source integrations. The full list of supported SQL data sources can be found in our API reference.
Chalk supports running SQL from files or from strings. When you run queries that invoke your SQL resolver, Chalk
automatically pushes your input parameters into the WHERE
clause of your query. For more details, see our section on
push-down filters.
The examples on this page use our PostgresQL data source, but can be generalized to any of our other SQL data sources.
SQL file resolvers are Chalk’s recommended method for writing SQL resolvers.
-- type: online
-- resolves: user
-- source: PG
select id, email, full_name from user_table
Here, we define an online resolver that returns some features for the User
feature class
from the user_table
table in the PostgreSQL source PG
. The comments are yaml
-parsed
to provide other metadata for Chalk to decide how to design the resolver. SQL file resolvers
can return multiple rows for aggregation operations, offline queries, and more.
It’s also possible to use SELECT *
in a SQL file resolver, but be careful!
-- type: online
-- resolves: user
-- source: PG
select * from user_table
Implicitly, this query tries to align every scalar feature from the User
feature class to a column name in
user_table
. If a feature name is misnamed or absent from the table, you’ll get a “missing columns” error.
To programmatically generate SQL file resolvers, check out Generated SQL file resolvers.
Like other resolvers, SQL file resolvers can take features as input. In this example, we want our resolver to require
EmailUser.id
as input:
-- type: online
-- resolves: user
-- source: PG
select id, email, full_name
from user_table
where id = ${user.id}
Use ${}
with snake case to reference the desired feature.
Use ${now}
in your query as a special argument representing the time of the query. For more details, see our
time documentation.
You can run SQL queries either as Python strings or from .sql
files.
When the name of the column matches the name of the feature with non-alphanumeric characters removed, the mapping from column to feature is automatic.
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_string("select full_name, email from users where id=1")
.first()
)
get_user
’s return type indicates that it expects features named full_name
and email
, which are returned as
columns from the SQL query.
If the column names don’t align exactly, you can
include the parameter fields
to specify the mapping from
the query to the fields.
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_string(
"select name, email from users where id=1",
fields=dict(name=User.full_name),
)
.first()
)
Here, the email
column of the query automatically aligns with the
expected User.email
feature, but the name
column of the query
is explicitly mapped to the User.full_name
feature.
You can parameterize queries to pass variables. Parameterize names with a colon, and pass a dictionary from parameter name to parameter value:
pg.query_string(
query="select * from users where user_id=:user_id",
args=dict(user_id="uid123"),
)
Use a backslash to escape any literal :
characters you need to use in your query:
pg.query_string(
query="select * from users where user_id=:user_id and name='\:colon'",
args=dict(user_id="uid123"),
)
Instead of passing a string directly into your Python code, you can load the SQL content from a file.
you can use the query_sql_file
function.
For example, here is a query.sql
file containing the same query from above:
select * from users where user_id=:user_id
You can reference this file in a Python resolver, either using the absolute path from the root of your project or relative to the resolver’s file.
For example, if the snippet below lived in the same directory
as query.sql
, we could refer to it as follows:
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_sql_file(
path="query.sql",
args=dict(user_id=uid)
)
.first()
)
Auto-mapping of column name to feature name also applies for
the query_sql_file
method. Parameterization also works the same way.
Chalk automatically adds query inputs to the WHERE
clause of your SQL
queries. We recommend relying on our push-down filtering logic rather than
parameterizing your queries by hand.
The following example will show a transaction feature and how we use push-down filtering to retrieve transactions belonging to a specific category.
Here’s the feature class:
pg = PostgreSQLSource(name='PG')
@features
class Transaction:
id: int
category: string
amount: float
And the SQL file resolver:
-- type: online
-- resolves: transaction
-- source: PG
SELECT
id, merchant_category AS category, amount
FROM
txn_table;
Finally, here is our query for grocery transactions:
client = ChalkClient()
client.query(
input={
Transaction.id: [1, 2, 3, 4],
Transaction.category: "Groceries",
},
output=[
Transaction.id,
Transaction.amount,
],
)
When this query is run, Chalk adds each input parameter to the WHERE
clause,
effectively running this query against your database:
SELECT
id, merchant_category AS category, amount
FROM
txn_table
WHERE
id IN (1, 2, 3, 4)
AND merchant_category = "Groceries";
If your database column names differ from your feature names, the column name must be aliased in the SELECT
clause of
your query, as shown above with the merchant_category
column.
The following are supported keys that may be included in .chalk.sql
file comments.
resolvesstr
user.email
and user.full_name
are the outputs.sourcestr
name
, as in the above example, or the type if there is only one database of that type. Thus, if you have one PostgreSQL source, you can also write source: postgresql
.type"online" | "offline" | "streaming" | null
online
is the default.incrementaldict[str, str]
mode"row" | "group" | "parameter"
"row"
.lookback_periodstring
1h
. Defaults to 0
.incremental_columnstring
row
and group
modes.incremental_timestamp"feature_time" | "resolver_execution_time"
feature_time
.countLiteral[1, "one", "one_or_none"]
.one()
.timeoutDuration | null
tagslist[str] | null
environmentlist[str] | null
cronstr | null
1h
.max_stalenessstr | null
24h
.ownerstr | null
fieldsdict[str, str]
SELECT name AS arbitrary_column_name
, we can map the arbitrary_column_name
to a Chalk feature belonging to the namespace described by the resolves
field with the mappingarbitrary_column_name: chalk_feature_name
.unique_onlist[str]
partitioned_bylist[str]
All comments must be inserted before the body of the sql query. Each comment line is parsed as
either a yaml-formatted line describing the resolver or a docstring. Below, the last comment
will appear as a docstring since it is not in key:value
format.
-- type: online
-- resolves: user
-- source: PG
-- This comment is not in yaml format, so it will be parsed as a docstring
select * from user_table
For field values that can be lists or dictionaries, such as tags
or incremental
settings,
we can either enumerate the values inline or with an extra indentation.
Remember, if your values include a colon, you must use quotes around your value in order for
the line to have valid YAML format.
Both of the following formats are valid and equivalent.
-- type: online
-- resolves: user
-- source: PG
-- tags: ["single:1", "double:2"]
select * from user_table
-- type: online
-- resolves: user
-- source: PG
-- tags:
-- - single:1
-- - double:2
select * from user_table
Chalk supports streaming with SQL file resolvers:
-- source: kafka
-- resolves: store_features
-- type: streaming
select store_id as id, sum(amount) as purchases
from topic
group by 1
If you are using SQLFluff or another SQL Linter,
you may need to set configurations to accept the variable pattern.
For SQLFluff, set the templater to placeholder
and add the following to your config file.
# Allows sqlfluff to correctly parse
# ${arbitrary_placeholder_name}
[sqlfluff:templater:placeholder]
param_regex =\$\{[^}]*\}
1 = 'arbitrary_placeholder_name'
You can programmatically generate SQL resolvers with make_sql_file_resolver
. This
function is useful if you have many SQL tables and want to automate management of their resolvers.
from chalk import make_sql_file_resolver
from chalk.sql import PostgreSQLSource
pg = PostgreSQLSource(name='PG')
definitions = [
{
resolver_name: "get_user_features",
entity_name: "User",
table: "user_table",
pkey_column: "id",
features: ["feature1", "feature2"],
},
...
]
for definition in definitions:
targets = ", ".join(definition.features)
make_sql_file_resolver(
name=definition.resolver_name,
sql=f"select {definition.pkey_column}, {targets} from {definition.table}",
source=pg, # "PG" is also acceptable
resolves=definition.entity_name,
)
make_sql_file_resolver
adds this resolver to your registry as if it were a SQL file
resolver, but without creating the .chalk.sql
file.
All SQL file resolvers require source
and resolves
. These values can be provided as SQL comments within your sql
value or as parameters. If comments and parameters are both provided, parameters will take precedence. This function
call is equivalent to the previous example:
make_sql_file_resolver(
name=definition.resolver_name,
sql=f"""
-- source: PG
-- resolves: {definition.entity_name}
select {definition.pkey_column}, {targets} from {definition.table}
""",
)
The first time that a resolver with an incremental query is executed, Chalk ingests all data from the source. On subsequent runs of the resolver, Chalk only looks for new rows in the table to ingest. Using incremental queries limits the amount of data that Chalk needs to ingest, lowering latency for updates and reducing costs.
Incremental queries are useful for ingesting immutable tables or queries, like event tables or logs. This type of data is frequently found in the offline context, as it represents logs of real-time events.
Incremental queries use the incremental_column
parameter to
page over the underlying table.
Imagine you have a login events table, where you keep track of login attempts to your website. You can ingest this table with a SQL file resolver as follows:
-- type: offline
-- resolves: login
-- source: PG
-- incremental:
-- mode: row
-- lookback_period: 60m
-- incremental_column: attempted_at
select attempted_at, status, user_id from logins
Configuration for incremental resolvers can be passed in the incremental
dictionary of the file’s comments. For more
details, see the configuration reference above.
To use incremental mode with SQL string resolvers, use .incremental
along with the incremental_column
parameter.
pg = PostgreSQLSource(name='PG')
@offline
def fn() -> DataFrame[Login.attemped_at, Login.user_id, Login.status]:
return (
pg.query_string("select attempted_at, status, user_id from logins")
.incremental(incremental_column="attempted_at")
)
If your underlying data source has “late arriving records”, you may need to use the lookback_period
argument to
incremental
. When lookback_period
is specified, Chalk subtracts the lookback_period
from the
“maximum observed timestamp” that it uses as a lower-bound for ingestion.
Concretely, if your resolver body looks like this:
db.query_string("SELECT * FROM payments")
.incremental(incremental_column="updated_at", lookback_period="30m")
then Chalk will rewrite your SQL query to:
SELECT * FROM payments
WHERE updated_at >= (<previous_max_updated_at> - <lookback_period>)
This means that rows that arrive up to 30 minutes late will be properly ingested. The trade-off is that Chalk will re-ingest some redundant data.
Each incremental resolver tracks the timestamp of its latest ingested row in an internal property called
chalk_incremental_timestamp
. This property can be referenced within your SQL query when using parameter
as your
incrementalization mode.
The Chalk CLI provides several commands for managing your incremental resolvers:
chalk incremental status
shows the status and latest timestamp of the given resolver.chalk incremental set
allows you to directly modify the internal timestamps the resolver
uses to track its state.chalk incremental drop
clears the resolver’s tracking state so that it will restart data
ingestion on its next run.The default incrementalization mode for .incremental
is mode='row'
. Three modes are supported:
incremental_column
is newer than the previously observed max timestamp.chalk_incremental_timestamp
value (including lookback_period
) to your query, and leaves your query unchanged.Group mode incremental ingestion is appropriate when you are aggregating rows in a table in order to compute features.
For example, if you are running the following query:
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM
payments
GROUP BY
1
to ingest features for this feature class:
@features
class Business:
id: int
sum_payments_amount: float
count_payments: int
ts: FeatureTime
then you can specify the following resolver:
@offline(...)
def resolve_aggregate_payment_features() -> DataFrame[Business]:
query = """
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM
payments
GROUP BY
1
"""
return db.query_string(query, fields={"business_id": Business.id}) \
.incremental(incremental_column="updated_at", mode="group")
and Chalk will automatically rewrite your query into this form:
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM payments
WHERE business_id IN (
SELECT DISTINCT(business_id) FROM payments
WHERE updated_at >= :chalk_incremental_timestamp
)
GROUP BY 1
This means that if you have a payments table like this:
| id | business_id | amount | updated_at |
| 1 | 1 | 10 | 2022-11-01T00:00:00.000Z |
| 2 | 1 | 5 | 2022-11-15T00:00:00.000Z |
| 3 | 2 | 7 | 2022-11-15T00:00:00.000Z |
| 4 | 3 | 17 | 2022-10-01T00:00:00.000Z |
and your query had previously run on 2022-11-07
, then Chalk would return the following aggregate values:
| business_id | sum_payments_amount | count_payments | ts |
| 1 | 15 | 2 | 2022-11-01T00:00:00.000Z |
| 2 | 7 | 1 | 2022-11-15T00:00:00.000Z |
Both business 1
and 2
are present, because they have at least one payment after 2022-11-07
.
Business 3
is excluded, since it has no payments that after 2022-11-07
.
In parameter
incremental mode, Chalk leaves your query untouched. Chalk will simply pass the max incremental timestamp
to your query as a bind parameter named chalk_incremental_timestamp
.
Concretely, if you write:
@offline(...)
def parameter_incremental_mode_resolver() -> DataFrame[...]:
return (
db.query_string("SELECT * FROM payments WHERE updated_at >= :chalk_incremental_timestamp")
.incremental(mode="parameter")
)
Then Chalk will execute your query verbatim, and will keep track of the appropriate value for chalk_incremental_timestamp
between executions of your resolver.
When Chalk executes an incremental query, it has to update the “max timestamp” value that it will use as the lower bound for the next query. By default, Chalk sets this value to the time at the start of the query.
If your resolver returns a FeatureTime
feature, Chalk will update the “max timestamp” value
to the “max” FeatureTime
value that is returned instead. This allows you to control the incremental
behavior more precisely.
Chalk supports applying tags to SQL sources. This allows you to define a single resolver that routes traffic to multiple different backing databases depending on tags supplied at query time. This is useful for limiting the blast-radius of traffic from different use-cases.
First, define a SQL source group:
from chalk.sql import SQLSourceGroup, PostgreSQLSource
sql_group = SQLSourceGroup(
name='primary_group',
default=PostgreSQLSource(name="default_replica"),
tagged_sources={
'risk': PostgreSQLSource(name='risk_replica'),
'analytics': PostgreSQLSource(name='analytics_replica'),
}
)
Then, define a resolver that uses the group:
@online
def users() -> DataFrame[User]:
return sql_group.query_string("select id, name from users").all()
Then, when you submit queries, the query tags will control which data source is used to execute the query:
client = ChalkClient()
# This query uses the risk datasource
client.query(input={User.id: 1}, output=[User.name], tags=['risk'])
# This query uses the analytics data source
client.query(input={User.id: 1}, output=[User.name], tags=['analytics'])
# This query uses the default datasource
client.query(input={User.id: 1}, output=[User.name])
Chalk supports SQLAlchemy:
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.email, User.full_name]:
return (
pg.query(User(email=UserSQL.email, full_name=UserSQL.name))
.filter_by(id=uid)
.first()
)
In the .query(...)
call, you map the target columns of the
SQL statement to the feature namespace.
Here, we assign User.email
to UserSQL.email
and
User.full_name
to UserSQL.name
.
To create an incremental SQLAlchemy query, use .incremental
. Chalk will page over the underlying table using the
column mapped to FeatureTime
.
pg = PostgreSQLSource(name='PG')
@offline
def fn() -> DataFrame[Login.ts, Login.attemped_at, Login.user_id, Login.status]:
return pg.query(
Login(
ts=LoginHistorySQL.created_at,
attempted_at=LoginHistorySQL.attempted_at,
user_id=LoginHistorySQL.user_id,
status=LoginHistorySQL.status,
)
).incremental()