Integrations
Integrate with SQL-like sources.
Chalk can ingest your data using a SQL interface from any of the integrations that support it. You can describe your queries using SQL strings or SQLAlchemy. Offline, event tables can be ingested incrementally.
There are multiple ways to construct resolvers using SQL. It’s up to you to decide which best fits your workflow.
Here, instead of creating a session and engine,
you use a PostgreSQLSource
(or any of the SQL-supporting sources in the docs)
as the base of your queries:
Suppose we have a PostgreSQL source PG
defined as follows.
pg = PostgreSQLSource(name='PG')
This is the preferred method of specifying a resolver.
Only a single .chalk.sql
SQL file is needed: no Python required!
-- type: online
-- resolves: user
-- source: PG
-- count: 1
select email, full_name from user_table where id=${user.id}
Here, the ${user.id}
variable defines the inputs of the query, and user_table
matches the PostgreSQL table name. The comments are yaml
-parsed to provide other metadata for
Chalk to decide how to design the resolver.
The following are supported keys to be included in .chalk.sql
file comments.
resolvesstr
user.email
and user.full_name
are the outputs.sourcestr
source: postgresql
. Supported sources are ['snowflake', 'postgres', 'postgresql', 'mysql', 'bigquery', 'cloudsql', 'redshift', 'sqlite', 'kafka']
.type"online" | "offline" | "streaming" | null
online
is the default.incrementaldict[str, str]
mode"row" | "group" | "parameter"
lookback_periodstring
1h
.incremental_columnstring
row
and group
modes.count"1" | null
.one()
.timeoutDuration | null
tagslist[str] | null
environmentlist[str] | null
cronstr | null
1h
.max_stalenessstr | null
24h
.ownerstr | null
Chalk also supports SQL file streaming resolvers:
-- source: kafka
-- resolves: store_features
-- type: streaming
select store_id as id, sum(amount) as purchases
from topic
group by 1
If you are using SQLFluff or another SQL Linter,
you may need to set configurations to accept the variable pattern.
For SQLFluff, set the templater to placeholder
and add the following to your config file.
# Allows sqlfluff to correctly parse
# ${arbitrary_placeholder_name}
[sqlfluff:templater:placeholder]
param_regex =\$\{[^}]*\}
1 = 'arbitrary_placeholder_name'
Chalk has explicit support for SQLAlchemy, letting you write queries in a familiar fashion.
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.email, User.full_name]:
return (
pg.query(User(email=UserSQL.email, full_name=UserSQL.name))
.filter_by(id=uid)
.first()
)
In the .query(...)
call, you map the target columns of the
SQL statement to the feature namespace.
Here, we assign User.email
to UserSQL.email
and
User.full_name
to UserSQL.name
.
You can also write SQL queries as strings in Python. When the name of the column matches the name of the feature with non-alphanumeric characters removed, the mapping from column to feature is automatic.
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_string("select full_name, email from users where id=1")
.first()
)
Here, the resolver expects features named full_name
and email
,
both of which are columns in the response for the SQL query.
If the column names don’t align exactly, you can
include the parameter fields
to specify the mapping from
the query to the fields.
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_string(
"select name, email from users where id=1",
fields=dict(name=User.full_name),
)
.first()
)
Here, the email
column of the query automatically aligns with the
expected User.email
feature, but the name
column of the query
is explicitly mapped to the User.full_name
feature.
You can also use parameterized queries with Chalk. Parameterize names with a colon, and pass a dictionary from parameter name to parameter value:
pg.query_string(
query="select * from users where user_id=:user_id",
args=dict(user_id="uid123"),
)
Use a backslash to escape any literal :
characters you need to use in your query:
pg.query_string(
query="select * from users where user_id=:user_id and name='\:colon'",
args=dict(user_id="uid123"),
)
Finally, if you want to join your sql files with Python resolvers,
you can use the query_sql_file
function.
For example, if you have a sql query defined in query.sql
:
select * from users where user_id=:user_id
You can reference this file in a Python resolver, either using the absolute path from the root of your project or relative to the resolver’s file.
For example, if the snippet below lived in the same directory
as query.sql
, we could refer to it as follows:
pg = PostgreSQLSource(name='PG')
@online
def get_user(uid: User.id) -> Features[User.full_name, User.email]:
return (
pg.query_sql_file(
path="query.sql",
args=dict(user_id=uid)
)
.first()
)
Auto-mapping of column name to feature name also applies for
the query_sql_file
method.
The first time that a resolver with an incremental query is executed, Chalk ingests all data from the source. On subsequent runs of the resolver, Chalk only looks for new rows in the table to ingest. Using incremental queries limits the amount of data that Chalk needs to ingest, lowering latency for updates and reducing costs.
Incremental queries are useful for ingesting immutable tables or queries, like event tables or logs. This type of data is frequently found in the offline context, as it represents logs of real-time events.
Imagine you have a login events table, where you keep track of login attempts to your website. You can ingest this table as follows:
pg = PostgreSQLSource(name='PG')
@offline
def fn() -> DataFrame[Login.ts, Login.attemped_at, Login.user_id, Login.status]:
return pg.query(
Login(
attempted_at=LoginHistorySQL.attempted_at,
status=LoginHistorySQL.status,
ts=LoginHistorySQL.created_at,
user_id=LoginHistorySQL.user_id,
)
).incremental()
Incremental queries need to map a feature-time
feature (above as Login.ts
). Using .incremental
is equivalent to .all
,
except Chalk can page over the underlying table using the column mapped to
the feature_time
feature.
With Chalk SQL File resolvers, you can describe your incremental parameters in typical YAML format.
-- type: offline
-- resolves: login
-- source: PG
-- incremental:
-- mode: row
-- lookback_period: 60m
-- incremental_column: attempted_at
select attempted_at, status, user_id from logins
Chalk also supports incremental queries with raw SQL strings. If you use .incremental
with string queries,
you must specify the incremental_column
parameter.
pg = PostgreSQLSource(name='PG')
@offline
def fn() -> DataFrame[Login.attemped_at, Login.user_id, Login.status]:
return (
pg.query_string("select attempted_at, status, user_id from logins")
.incremental(incremental_column="attempted_at")
)
If your underlying data source has “late arriving records”, you may need to use the lookback_period
argument to
incremental
. When lookback_period
is specified, Chalk subtracts the lookback_period
from the
“maximum observed timestamp” that it uses as a lower-bound for ingestion.
Concretely, if your resolver body looks like this:
db.query_string("SELECT * FROM payments")
.incremental(incremental_column="updated_at", lookback_period="30m")
then Chalk will rewrite your SQL query to:
SELECT * FROM payments
WHERE updated_at >= (<previous_max_updated_at> - <lookback_period>)
This means that rows that arrive up to 30 minutes late will be properly ingested. The trade-off is that Chalk will re-ingest some redundant data.
The default incrementalization mode for .incremental
is mode='row'
. Three modes are supported:
incremental_column
is newer than the previously observed max timestamp.chalk_incremental_timestamp
value (including lookback_period
) to your query, and leaves your query unchanged.Group mode incremental ingestion is appropriate when you are aggregating rows in a table in order to compute features.
For example, if you are running the following query:
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM payments
GROUP BY 1
to ingest features for this feature class:
@features
class Business:
id: int
sum_payments_amount: float
count_payments: int
ts: FeatureTime
then you can specify the following resolver:
@batch(...)
def resolve_aggregate_payment_features() -> DataFrame[Business]:
query = """
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM payments
GROUP BY 1
"""
return db.query_string(query, fields={"business_id": Business.id}) \
.incremental(incremental_column="updated_at", mode="group")
and Chalk will automatically rewrite your query into this form:
SELECT
business_id,
SUM(amount) as sum_payments_amount,
COUNT(*) as count_payments,
max(updated_at) as ts
FROM payments
WHERE business_id IN (
SELECT DISTINCT(business_id) FROM payments
WHERE updated_at >= :chalk_incremental_timestamp
)
GROUP BY 1
This means that if you have a payments table like this:
| id | business_id | amount | updated_at |
| 1 | 1 | 10 | 2022-11-01T00:00:00.000Z |
| 2 | 1 | 5 | 2022-11-15T00:00:00.000Z |
| 3 | 2 | 7 | 2022-11-15T00:00:00.000Z |
| 4 | 3 | 17 | 2022-10-01T00:00:00.000Z |
and your query had previously run on 2022-11-07
, then Chalk would return the following aggregate values:
| business_id | sum_payments_amount | count_payments | ts |
| 1 | 15 | 2 | 2022-11-01T00:00:00.000Z |
| 2 | 7 | 1 | 2022-11-15T00:00:00.000Z |
Both business 1
and 2
are present, because they have at least one payment after 2022-11-07
.
Business 3
is excluded, since it has no payments that after 2022-11-07
.
In parameter
incremental mode, Chalk leaves your query untouched. Chalk will simply pass the max incremental timestamp
to your query as a bind parameter named chalk_incremental_timestamp
.
Concretely, if you write:
@batch(...)
def parameter_incremental_mode_resolver() -> DataFrame[...]:
return (
db.query_string("SELECT * FROM payments WHERE updated_at >= :chalk_incremental_timestamp")
.incremental(mode="parameter")
)
Then Chalk will execute your query verbatim, and will keep track of the appropriate value for chalk_incremental_timestamp
between executions of your resolver.
When Chalk executes an incremental query, it has to update the “max timestamp” value that it will use as the lower bound for the next query. By default, Chalk sets this value to the time at the start of the query.
If your resolver returns a FeatureTime feature, Chalk will update the “max timestamp” value
to the “max” FeatureTime
value that is returned instead. This allows you to control the incremental
behavior more precisely.