Chalk home page
Docs
SDK
CLI
  1. Integrations
  2. AWS Athena

Chalk supports AWS Athena as a SQL source. This allows users to load data from AWS Glue and other AWS data sources (Hive, DocumentDB, Iceberg, etc.) directly into Chalk features.

Adding Athena

In the settings page, you’ll find a form for adding your credentials. Note that we will perform data unload operations to the provided staging directory in S3: these intermediate results will appear under the chalk-unload folder.

Add Athena

Add an Athena integration. These parameters will also be available as environment variables.

Athena
Environment

Learn more about Chalk's Athena Integration

Single Integration

Add your Athena data source in a Python file and give it a name. This line is required for all Athena integrations. In this file, we’ll also define a User feature class with a transaction_volume feature which will be resolved with Athena.

If you have only one Athena connection that you’d like to add to Chalk, you do not need to specify any arguments to construct the source in your code.

from chalk.features import features
from chalk.sql import AthenaSource

athena_source = AthenaSource()

@features
class User:
    id: str
    transaction_volume: float

Multiple Integrations

Chalk's injects environment variables to support data integrations. But what happens when you have two data sources of the same kind? When you create a new data source from your dashboard, you have an option to provide a name for the integration. You can then reference this name in the code directly.
from chalk.sql import AthenaSource

athena_source_txns = AthenaSource(name="ATHENA_TRANSACTIONS")
athena_source_marketing = AthenaSource(name="ATHENA_MARKETING")

Then, access the Athena source in your resolvers. Note that all queries to Athena will be run with UNLOAD to handle larger-than-memory datasets.

from chalk import offline, DataFrame
from chalk.features import features
from chalk.sql import AthenaSource

athena_source = AthenaSource()

@features
class User:
    id: str
    transaction_volume: float

@offline
def get_transactions() -> DataFrame[User]:
    return athena_source.query_string(
        """
        SELECT
            id,
            transaction_volume
        FROM
            transactions
        """,
        fields={
            "id": User.id,
            "transaction_volume": User.transaction_volume,
        }
    ).all()