Feature Engine
Programmatically manage data sources using the Python SDK
The Chalk Python SDK provides a programmatic API for managing data sources, allowing you to create, list, update, test, and delete integrations without using the dashboard UI.
All data source operations are available through client.api.datasources on a ChalkClient instance.
The API communicates with the Chalk API server over gRPC and reuses the same authentication as
the query client.
Data sources created through this API are environment-level configuration — they persist across deployments and appear in the dashboard, just like sources added through the UI. This is an administrative API intended for infrastructure-as-code scripts, CI/CD pipelines, or notebooks, not for use inside resolver code.
from chalk.client import ChalkClient, IntegrationKind
client = ChalkClient()List all integrations configured in the current environment.
sources = client.api.datasources.list()
for s in sources:
print(f"{s.kind} {s.name} id={s.id}")Returns a list of Datasource objects with id, name, kind, environment_id,
created_at, and updated_at fields.
Use client.api.datasources.create() to add a new integration. The kind parameter
accepts either an IntegrationKind enum or a string. Data source names must only contain
letters, numbers, and underscores.
The created source will immediately appear in the dashboard and be available for use after clicking Redeploy in the dashboard.
import base64
with open("path/to/service-account-key.json") as f:
sa_key_b64 = base64.b64encode(f.read().encode()).decode()
created = client.api.datasources.create(
kind=IntegrationKind.BIGQUERY,
name="my_bigquery_source",
config={
"BQ_PROJECT": "my-gcp-project",
"BQ_DATASET": "my_dataset",
"BQ_CREDENTIALS_BASE64": sa_key_b64,
},
)created = client.api.datasources.create(
kind=IntegrationKind.SNOWFLAKE,
name="my_snowflake_source",
config={
"SNOWFLAKE_USER": "admin",
"SNOWFLAKE_PASSWORD": "secret",
"SNOWFLAKE_ACCOUNT": "xy12345.us-east-1",
"SNOWFLAKE_DATABASE": "PROD_DB",
"SNOWFLAKE_WAREHOUSE": "COMPUTE_WH",
"SNOWFLAKE_SCHEMA": "PUBLIC",
"SNOWFLAKE_ROLE": "ANALYST_ROLE",
},
)created = client.api.datasources.create(
kind=IntegrationKind.POSTGRESQL,
name="my_postgres_source",
config={
"PGHOST": "db.example.com",
"PGPORT": "5432",
"PGDATABASE": "mydb",
"PGUSER": "admin",
"PGPASSWORD": "secret",
},
)Config values can be either literal strings or references to secrets synced from an external
cloud secret manager (e.g., AWS Secrets Manager or GCP Secret Manager). Use LinkedSecretRef
to point at a secret by its ID instead of inlining the value.
from chalk.client import ChalkClient, IntegrationKind, LinkedSecretRef
client = ChalkClient()
created = client.api.datasources.create(
kind=IntegrationKind.SNOWFLAKE,
name="my_snowflake_source",
config={
"SNOWFLAKE_USER": "admin",
"SNOWFLAKE_PRIVATE_KEY_B64": LinkedSecretRef("snowflake-prod-key"),
"SNOWFLAKE_ACCOUNT": "xy12345.us-east-1",
"SNOWFLAKE_DATABASE": "PROD_DB",
"SNOWFLAKE_WAREHOUSE": "COMPUTE_WH",
"SNOWFLAKE_SCHEMA": "PUBLIC",
"SNOWFLAKE_ROLE": "ANALYST_ROLE",
},
)LinkedSecretRef works with update() as well — pass it for any config key you want to
reference from your cloud secret manager rather than providing a literal value.
A linked secret may be referenced either by its provider-local secret name, or by a
same-store full identifier: an AWS Secrets Manager ARN in the same account and region, a GCP
Secret Manager resource name (projects/.../secrets/...) in the same project and region, or
an Azure Key Vault secret URL (https://<vault>.vault.azure.net/secrets/...) in the same
vault. Chalk normalizes full identifiers to the provider-local secret name and always reads
the latest version. Cross-account, cross-region, cross-project, and cross-vault identifiers
are not supported for injection, and explicit non-latest versions are rejected.
Fetch a data source by ID, test its connectivity, update its configuration, or remove it.
# Get a data source by ID
source = client.api.datasources.get(id=created.id)
# Test connectivity
result = client.api.datasources.test(id=created.id)
print(f"Success: {result.success}, Message: {result.message}")
# Update config or name (only provided keys are changed)
updated = client.api.datasources.update(
id=created.id,
config={"BQ_DATASET": "new_dataset"},
name="my_updated_source",
)
# Delete
client.api.datasources.delete(id=created.id)The IntegrationKind enum supports the following data source types:
| Kind | Value |
|---|---|
IntegrationKind.ATHENA | AWS Athena |
IntegrationKind.BIGQUERY | Google BigQuery |
IntegrationKind.CLICKHOUSE | ClickHouse |
IntegrationKind.DATABRICKS | Databricks |
IntegrationKind.DYNAMODB | Amazon DynamoDB |
IntegrationKind.KAFKA | Apache Kafka |
IntegrationKind.KINESIS | Amazon Kinesis |
IntegrationKind.MSSQL | Microsoft SQL Server |
IntegrationKind.MYSQL | MySQL |
IntegrationKind.POSTGRESQL | PostgreSQL |
IntegrationKind.PUBSUB | Google Pub/Sub |
IntegrationKind.REDSHIFT | Amazon Redshift |
IntegrationKind.SNOWFLAKE | Snowflake |
IntegrationKind.SPANNER | Google Spanner |
IntegrationKind.TRINO | Trino |