Chalk home page
Docs
API
CLI

Chalk API Reference

Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:

from chalk.features import features, DataFrame
@features
class CreditCard:
   id: int
   user_id: "User.id"
   limit: float
@features
class User:
   id: int
   name: str
   email: str
   credit_cards: DataFrame[CreditCard]
   total_limit: float = _.credit_cards[_.limit].sum()

Features can be nested, as with credit_cards above. In this section, we'll dive into API components that make up the building blocks of features.

Chalk lets you spell out your features directly in Python.

Features are namespaced to a FeatureSet. To create a new FeatureSet, apply the @features decorator to a Python class with typed attributes. A FeatureSet is constructed and functions much like Python's own dataclass.

Parameters
owner: = None

The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.

tags: = None

Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Setting etl_offline_to_online on a feature class assigns it to all features on the class which do not explicitly specify etl_offline_to_online.

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Assigning a max_staleness to the feature class assigns it to all features on the class which do not explicitly specify a max_staleness value of their own.

Other Parameters
Show All
cls:
Type[T] | None
= None
name: = None
singleton: = False
@features(
    owner="andy@chalk.ai",
    max_staleness="30m",
    etl_offline_to_online=True,
    tags="user-group",
)
class User:
    id: str
    # Comments here appear in the web!
    # :tags: pii
    name: str | None
    # :owner: userteam@mycompany.com
    location: LatLng

Add metadata and configuration to a feature.

Parameters
owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags

version: = None

The maximum version for a feature. Versioned features can be referred to with the @ operator:

@features
class User:
    id: str
    score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"

See more at Versioning

The default version for a feature. When you reference a versioned feature without the @ operator, you reference the default_version. Set to 1 by default.

@features
class User:
    id: str
    score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"

See more at Default versions

max_staleness:
... | Duration | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching

When True (default), Chalk will cache all values, including nulls.

When False, Chalk will not update the null entry in the cache.

When "evict_nulls", Chalk will evict the entry that would have been null from the cache, if it exists.

Concretely, suppose the current state of a database is {a: 1, b: 2}, and you write a row {a: 2, b: None}. Here is the expected result in the db:

  • {a: 2, b: None} when cache_nulls=True (default)
  • {a: 2, b: 2} when cache_nulls=False
  • {a: 2} when cache_nulls="evict_nulls"

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Read more at Reverse ETL

min:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

A list of validations to apply to this feature. Generally, max, min, max_length, and min_length are more convenient, but the parameter strict applies to all of those parameters. Use this parameter if you want to mix strict and non-strict validations.

default:
_TRich | ...
= ...

The default value of the feature if it otherwise can't be computed. If you don't need to specify other metadata, you can also assign a default in the same way you would assign a default to a dataclass:

from chalk.features import features
@features
class User:
    num_purchases: int = 0

An underscore expression for defining the feature. Typically, this value is assigned directly to the feature without needing to use the feature(...) function. However, if you want to define other properties, like a default or max_staleness, you'll want to use the expression keyword argument.

from chalk.features import features
from chalk import _
@features
class Receipt:
    subtotal: int
    tax: int
    total: int = feature(expression=_.subtotal + _.tax, default=0)

See more at Underscore

deprecated: = False

If True, this feature is considered deprecated, which impacts the dashboard, alerts, and warnings.

Other Parameters
Show All
description: = None
name: = None
primary: = None
encoder:
TEncoder[_TPrim, _TRich] | None
= None
decoder:
TDecoder[_TPrim, _TRich] | None
= None
dtype: = None
offline_ttl:
... | Duration | None
= ...
Returns
type:
_TRich

The type of the input feature, given by _TRich.

from chalk.features import Primary, features, feature
@features
class User:
    uid: Primary[int]
    # Uses a default value of 0 when one cannot be computed.
    num_purchases: int = 0
    # Description of the name feature.
    # :owner: fraud@company.com
    # :tags: fraud, credit
    name: str = feature(
        max_staleness="10m",
        etl_offline_to_online=True
    )
    score: int = feature(
        version=2, default_version=2
    )

Specify a feature that represents a one-to-one relationship.

This function allows you to explicitly specify a join condition between two @features classes. When there is only one way to join two classes, we recommend using the foreign-key definition instead of this has_one function. For example, if you have a User class and a Card class, and each user has one card, you can define the Card and User classes as follows:

@features
class User
    id: str
@features
class Card
    id: str
    user_id: User.id
    user: User

However, if User has two cards (say, a primary and secondary), the foreign key syntax cannot be used to define the relationship, and you should use the has_one function.

Read more at Has One

Parameters
f:

The join condition between @feature classes. This argument is callable to allow for forward references to members of this class and the joined class.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    user_id: str
    balance: float
@features
class User
    id: str
    card: Card = has_one(
        lambda: User.id == Card.user_id
    )

Specify a feature that represents a one-to-many relationship.

Parameters
f:

The join condition between @features classes. This argument is callable to allow for forward references to members of this class and the joined class.

max_staleness:
Duration | None | ...
= ...

The maximum staleness of the joined feature. The items in the joined feature aggregate, storing the latest values of the joined feature for each primary key in the joined feature.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    user_id: str
    balance: float
@features
class User
    id: str
    cards: DataFrame[Card] = has_many(
        lambda: User.id == Card.user_id
    )

The function after can be used with DataFrame to compute windowed features.

after filters a DataFrame relative to the current time in context, such that if the after filter is defined as now - {time_window}, the filter will include all features with timestamps t where now - {time_window} <= t <= now. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to after take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for the DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]

The function before can be used with DataFrame to compute windowed features.

before filters a DataFrame relative to the current time in context such that if the before filter is defined as now - {time_window}, the filter will include all features with timestamps t where t <= now - {time_window}. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to before take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for a DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]

Declare a windowed feature.

Examples

@features
class User:
    failed_logins: Windowed[int] = windowed("10m", "24h")
Functions

Create a windowed feature.

See more at Windowed

Parameters
buckets: = ()

The size of the buckets for the window function. Buckets are specified as strings in the format "1d", "2h", "1h30m", etc. You may also choose to specify the buckets using the days, hours, and minutes parameters instead. The buckets parameter is helpful if you want to use multiple units to express the bucket size, like "1h30m".

Convenience parameter for specifying the buckets in days. Using this parameter is equvalent to specifying the buckets parameter with a string like "1d".

Convenience parameter for specifying the buckets in hours. Using this parameter is equvalent to specifying the buckets parameter with a string like "1h".

Convenience parameter for specifying the buckets in minutes. Using this parameter is equvalent to specifying the buckets parameter with a string like "1m".

owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

default:
TRich | ...
= ...

The default value of the feature if it otherwise can't be computed.

max_staleness:
Duration | ... | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.

See more at Caching

offline_ttl:
Duration | ... | None
= ...

Sets a maximum age for values eligible to be retrieved from the offline store, defined in relation to the query's current point-in-time.

version: = None

Feature versions allow you to manage a feature as its definition changes over time.

The version keyword argument allows you to specify the maximum number of versions available for this feature.

See more at Versioning

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers.

See more at Reverse ETL

min:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

A list of Validations to apply to this feature.

See more at https://docs.chalk.ai/api-docs#Validation

The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum().

materialization:
MaterializationWindowConfig | True | None
= None

Configuration for aggregating data. Pass bucket_duration with a Duration to configure the bucket size for aggregation. If True, each of the windows will use a bucket duration equal to its window duration.

See more at Materialized window aggregations

Other Parameters
Show All
description: = None
name: = None
encoder:
TEncoder[TPrim, TRich] | None
= None
decoder:
TDecoder[TPrim, TRich] | None
= None
dtype: = None
Returns
type:
Windowed[TRich]

Metadata for the windowed feature, parameterized by TPrim (the primitive type of the feature) and TRich (the decoded type of the feature, if decoder is provided).

from chalk import windowed, Windowed
@features
class User:
    id: int
    email_count: Windowed[int] = windowed(days=range(1, 30))
    logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]

Create a windowed feature with grouping.

See more at Grouped materialized window aggregations

Parameters
buckets: = ()

The size of the buckets for the window function. Buckets are specified as strings in the format "1d", "2h", "1h30m", etc. You may also choose to specify the buckets using the days, hours, and minutes parameters instead. The buckets parameter is helpful if you want to use multiple units to express the bucket size, like "1h30m".

The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum().

materialization:
MaterializationWindowConfig | True

Configuration for aggregating data. Pass bucket_duration with a Duration to configure the bucket size for aggregation.

See more at Materialized window aggregations

Convenience parameter for specifying the buckets in days. Using this parameter is equvalent to specifying the buckets parameter with a string like "1d".

Convenience parameter for specifying the buckets in hours. Using this parameter is equvalent to specifying the buckets parameter with a string like "1h".

Convenience parameter for specifying the buckets in minutes. Using this parameter is equvalent to specifying the buckets parameter with a string like "1m".

owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

default:
TRich | ...
= ...

The default value of the feature if it otherwise can't be computed.

min:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

Other Parameters
Show All
description: = None
name: = None
dtype: = None
Returns
type:
from chalk import group_by_windowed, DataFrame
from chalk.features import features
@features
class Email:
    id: int
    user_id: "User.id"
@features
class User:
    id: int
    emails: DataFrame[Email]
    emails_by_category: DataFrame = group_by_windowed(
        "10m", "30m",
        expression=_.emails.group_by(_.category).count(),
    )
Primary
Class

Marks a feature as the primary feature for a feature class.

Features named id on feature classes without an explicit primary feature are declared primary keys by default, and don't need to be marked with Primary.

If you have primary key feature with a name other than id, you can use this marker to indicate the primary key.

Examples

from chalk.features import features
from chalk import Primary
@features
class User:
    username: Primary[str]
Functions
Parameters
item:
typing.Union[Type, str, int]

The type of the feature value.

Returns

The type, with a special annotation indicating that it is a primary key.

from chalk.features import features
from chalk import Primary
@features
class User:
    username: Primary[str]

Specify explicit data validation for a feature.

The feature() function can also specify these validations, but this class allows you to specify both strict and non-strict validations at the same time.

Functions

Set validation parameters for a feature.

Parameters
min:
T | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
T | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

from chalk.features import features, feature
@features
class User:
    fico_score: int = feature(
        validations=[
            Validation(min=300, max=850, strict=True),
            Validation(min=300, max=320, strict=False),
            Validation(min=840, max=850, strict=False),
        ]
    )
    # If only one set of validations were needed,
    # you can use the [`feature`](#feature) function instead:
    first_name: str = feature(
        min_length=2, max_length=64, strict=True
    )
Vector
Class

The Vector class can be used type annotation to denote a Vector feature.

Instances of this class will be provided when working with raw vectors inside of resolvers. Generally, you do not need to construct instances of this class directly, as Chalk will automatically convert list-like features into Vector instances when working with a Vector annotation.

Parameters

data: numpy.Array | list[float] | pyarrow.FixedSizeListScalar The vector values

Examples

from chalk.features import Vector, features
@features
class Document:
    embedding: Vector[1536]
Attributes
precision
'fp16' | 'fp32' | 'fp64'

The precision of the Vector

Functions

Convert a vector to a PyArrow array.

Returns
type:
pa.FixedSizeListScalar

The vector, as a PyArrow array.

Convert a vector to a PyArrow array.

Returns
type:

The vector, as a PyArrow array.

Convert the vector to a Numpy array.

Parameters
writable: = False

Whether the numpy array should be writable. If so, an extra copy of the vector data will be made.

Returns
type:
np.ndarray

The vector, as a numpy array.

Convert the vector to a Python list.

Returns
type:

The vector, as a list of Python floats

Define a nearest neighbor relationship for performing Vector similarity search.metric: "l2" | "ip" | "cos" The metric to use to compute distance between two vectors. L2 Norm ("l2"), Inner Product ("ip"), and Cosine ("cos") are supported. Defaults to "l2".

Parameters

The other vector feature. This vector must have the same dtype and dimensions.

metric:
'l2' | 'ip' | 'cos'
= 'l2'
Returns
type:
Filter

A nearest neighbor relationship filter.

Decorator to create an online resolver.

Parameters
fn:
Callable[P, T] | None
= None

The function that you're decorating as a resolver.

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like:

@online(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
    ...
owner: = None

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum Duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be returned along with each output feature.

Read more at Timeout.

name: = None

An alternative short name for the resolver, to use instead of the function name.

resource_hint:
ResourceHint | None
= None

Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.

static: = False

Whether this resolver should be invoked once during planning time to build a static computation graph. If True, all inputs will either be StaticOperators (for has-many and dataframe relationships) or StaticExpressions (for individual features). The resolver must return a StaticOperator as output.

total: = False

Whether this resolver returns all ids of a given namespace. To have this annotation, the resolver must take no arguments and return a DataFrame. Typically, this annotation would be used in a SQL-file resolver.

unique_on:
Collection[Any] | None
= None

A list of features that must be unique for each row of the output. This enables unique optimizations in the resolver execution. Only applicable to resolvers that return a DataFrame.

partitioned_by:
Collection[Any] | None
= None

A list of features that correspond to partition keys in the data source. This field indicates that this resolver executes its query against a data storage system that is partitioned by a particular set of columns. This is most common with data-warehouse sources like Snowflake, BigQuery or Databricks.

Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@online
def name_match(
    name: User.full_name,
    account_name: User.bank_account.title
) -> User.account_name_match_score:
    if name.lower() == account_name.lower():
        return 1.
    return 0.

Decorator to create an offline resolver.

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like::

@offline(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
   ...
owner: = None

Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum Duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be raised.

Read more at Timeout.

resource_hint:
ResourceHint | None
= None

Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.

static: = False

Whether this resolver should be invoked once during planning time to build a static computation graph. If True, all inputs will either be StaticOperators (for has-many and dataframe relationships) or StaticExpressions (for individual features). The resolver must return a StaticOperator as output.

total: = False

Whether this resolver returns all ids of a given namespace. To have this annotation, the resolver must take no arguments and return a DataFrame. Typically, this annotation would be used in a SQL-file resolver.

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
name: = None
unique_on:
Collection[Any] | None
= None
partitioned_by:
Collection[Any] | None
= None
Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@offline(cron="1h")
def get_fraud_score(
    email: User.email,
    name: User.name,
) -> User.fraud_score:
    return socure.get_sigma_score(email, name)
Cron
Class

Detailed options for specify the schedule and filtering functions for Chalk batch jobs.

Functions

Run an online or offline resolver on a schedule.

This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.

Parameters

The period of the cron job. Can be either a crontab ("0 * * * *") or a Duration ("2h").

filter: = None

Optionally, a function to filter down the arguments to consider.

See Filtering examples for more information.

sample: = None

Explicitly provide the sample function for the cron job.

See Custom examples for more information.

Using a filter

def only_active_filter(v: User.active):
    return v
@online(cron=Cron(schedule="1d", filter=only_active_filter))
def score_user(d: User.signup_date) -> User.score:
    return ...

Using a sample function

def s() -> DataFrame[User.id]:
    return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:

A resolver, returned from the decorators @offline and @online.

Attributes

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others

Read more at Environments

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

The docstring of the resolver.

The function name of the resolver.

The python module where the function is defined

The type annotations for the resolver

The filename in which the resolver is defined.

The name of the resolver, either given by the name of the function, or by the keyword argument name given to @offline or @online.

resource_hint
ResourceHint | None

Whether this resolver is bound by CPU or I/O

whether the resolver is static. Static resolvers are "executed" once during planning time to produce a computation graph.

The fully qualified name for the resolver

Functions

Returns the result of calling the function decorated with @offline or @online with the given arguments.

Parameters
args:
P.args
= ()

The arguments to pass to the decorated function. If one of the arguments is a DataFrame with a filter or projection applied, the resolver will only be called with the filtered or projected data. Read more at https://docs.chalk.ai/docs/unit-tests#data-frame-inputs

kwargs:
P.kwargs
= {}
Returns
type:
T_co

The result of calling the decorated function with args. Useful for unit-testing.

Read more at Unit Tests

@online
def get_num_bedrooms(
    rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
    return len(rooms)
rooms = [
    Room(id=1, name="bedroom"),
    Room(id=2, name="kitchen"),
    Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
MachineType
Type Alias

The type of machine to use.

You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.

Chalk includes a DataFrame class that models tabular data in much the same way that pandas does. However, there are some key differences that allow the Chalk DataFrame to increase type safety and performance.

Like pandas, Chalk's DataFrame is a two-dimensional data structure with rows and columns. You can perform operations like filtering, grouping, and aggregating on a DataFrame. However, there are two main differences.

  • Lazy implementation - Chalk's DataFrame is lazy and can be backed by multiple data sources, where a pandas.DataFrame executes eagerly in memory.
  • Use as a type - Chalk's DataFrame[...] can be used to represent a type of data with pre-defined filters.

Lazy Execution

Unlike pandas, the implementation of a Chalk DataFrame is lazy, and can be executed against many different backend sources of data. For example, in unit tests, a DataFrame uses an implementation backed by polars. But if your DataFrame was returned from a SQL source, filters and aggregations may be pushed down to the database for efficient execution.

Use as a Type

Each column of a Chalk DataFrame typed by a Feature type. For example, you might have a resolver returning a DataFrame containing user ids and names:

@features
class User:
   id: int
   name: str
   email: str

@online
def get_users() -> DataFrame[User.id, User.name]:
   return DataFrame([
       User(id=1, name="Alice"),
       User(id=2, name="Bob")
   ])

Note that the DataFrame type is parameterized by the columns that it contains. In this case, the DataFrame contains two columns, User.id and User.name.

The specific operations available on a DataFrame are discussed below. For a higher-level discussion, see DataFrame.

Attributes
filters
ClassVar[tuple[Filter, ...]]
columns
tuple[Feature, ...]

The maximum number of rows to return

The shape of the DataFrame as a tuple of (num_rows, num_columns).

Examples

DataFrame({User.id: [1, 2, 3, 4, 5]}).shape
(5, 1)
Functions

Construct a Chalk DataFrame.

Parameters
data: = None

The data. Can be an existing pandas.DataFrame, polars.DataFrame or polars.LazyFrame, a sequence of feature instances, or a dict mapping a feature to a sequence of values.

missing_value_strategy:
MissingValueStrategy
= 'default_or_allow'

The strategy to use to handle missing values.

A feature value is "missing" if it is an ellipsis (...), or it is None and the feature is not annotated as Optional[...].

The available strategies are:

  • 'error': Raise a TypeError if any missing values are found. Do not attempt to replace missing values with the default value for the feature.
  • 'default_or_error': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, raise a TypeError.
  • 'default_or_allow': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, leave it as None. This is the default strategy.
  • 'allow': Allow missing values to be stored in the DataFrame. This option may result non-nullable features being assigned None values.

Row-wise construction

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])

Column-wise construction

df = DataFrame({
    User.id: [1, 2],
    User.first: ["Sam", "Iris"],
    User.last: ["Wu", "Xi"]
})

Construction from polars.DataFrame

import polars
df = DataFrame(polars.DataFrame({
    "user.id": [1, 2],
    "user.first": ["Sam", "Iris"],
    "user.last": ["Wu", "Xi"]
}))

Filter the rows of a DataFrame or project out columns.

You can select columns out of a DataFrame from the set of columns already present to produce a new DataFrame scoped down to those columns.

Or, you can filter the rows of a DataFrame by using Python's built-in operations on feature columns.

Parameters

Filters and projections to apply to the DataFrame.

Returns

A DataFrame with the filters and projections in item applied.

df = DataFrame({
    User.age: [21, 22, 23],
    User.email: [...],
})

Filtering

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai"
]

Projecting

df[User.name]

Filtering & Projecting

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai",
    User.name
]

Aggregate the DataFrame by the specified columns.

Parameters
group:
dict[Feature | Any, Feature | Any]

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

Returns
type:

The DataFrame with the specified aggregations applied.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
310╰─────────┴──────────╯

Compute a histogram with fixed width bins.

Parameters
nbins: = None

If supplied, will be used to compute the binwidth.

If not supplied, computed from the data (actual max and min values).

base: = None

The value of the first histogram bin. Defaults to the minimum value of column.

eps: = 1e-13

Allowed floating point epsilon for histogram base

column: = None

The column to compute the histogram on. If not supplied, the DataFrame is assumed to contain a single column.

descending: = False

If True, the histogram buckets will be sorted in descending order.

Returns
type:

A list of the counts in each bin.

DataFrame({
  Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]

Group based on a time value (date or datetime).

The groups are defined by a time-based window, and optionally, other columns in the DataFrame. The "width" of the window is defined by the period parameter, and the spacing between the windows is defined by the every parameter. Note that if the every parameter is smaller than the period parameter, then the windows will overlap, and a single row may be assigned to multiple groups.

As an example, consider the following DataFrame:


val:    a  b    c   d e     f           g h
    ─────────●─────────●─────────●─────────●───────▶
time:        A         B         C         D
    ┌─────────┐
1   │   a  b  │                                    1: [a, b]
    └────┬────┴────┐
2   ◀───▶│ b    c  │                               2: [b, c]
    every└────┬────┴────┐
3   ◀────────▶│ c   d e │                          3: [c, d, e]
      period  └────┬────┴────┐
4                  │d e     f│                     4: [d, e, f]
                   └────┬────┴────┐
5                       │   f     │                5: [f]
                        └────┬────┴────┐
6                            │         │
                             └────┬────┴────┐
7                                 │     g h │      7: [g, h]
                                  └────┬────┴────┐
8                                      │g h      │ 8: [g, h]
                                       └─────────┘

In the above example, the sixth time bucket is empty, and will not be included in the resulting DataFrame.

Parameters
index:
Feature | Any

The column to use as the index for the time-based grouping.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

The spacing between the time-based windows. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration.

group:
dict[Feature | Any, Feature | Any] | None
= None

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame. This parameter is optional, and if not specified, then the resulting DataFrame groups will be determined by the index parameter alone.

period: = None

The width of the time-based window. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration. If None it is equal to every.

Other Parameters
Show All
offset: = None
start_by:
'window' | 'datapoint' | 'monday' | 'tuesday' | 'wednesday' | 'thursday' | 'friday' | 'saturday' | 'sunday'
= 'window'
Returns
type:

A new DataFrame with the specified time-based grouping applied. The resulting DataFrame will have a column for each of the keys in group", "or each of the keys inagg, and for theindex` parameter.

from chalk import DataFrame, op
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
        User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
    },
).group_by_hopping(
     index=User.ts,
     group={User.id: User.id},
     agg={User.val: op.median(User.val)},
     period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts  │ User.val │
╞═════════╪══════════╪══════════╡
12020-1-13├─────────┼──────────┼──────────┤
32020-1-310╰─────────┴──────────┴──────────╯

Vertically stack the DataFrame with another DataFrame containing the same columns. The DataFrame other will be appended to the bottom of this DataFrame.

Parameters

The other DataFrame to stack with this DataFrame.

Returns
type:

The DataFrame with the other DataFrame stacked on the bottom.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.vstack(df)

Return the number of unique values in the specified column.

Parameters
column: = None

The column to compute the number of unique values for. If None, then the number of unique values in the entire DataFrame is returned.

Returns
type:

The number of unique values in the specified column.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.num_unique(User.id)
2

Rename columns in the DataFrame.

Parameters

A mapping from the current feature for a column to the desired feature for the column.

Returns
type:

The DataFrame with the specified columns renamed.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
]).rename({User.last: User.family})

Add a column to the DataFrame.

Parameters

The name of the column to add.

The definition of the column to add. This could be a constant value (e.g. 1 or True), an expression (e.g. op.max(User.score_1, User.score_2)), or a list of values (e.g. [1, 2, 3]).

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
    User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
    User.first_name, User.first
)

Add columns to the DataFrame.

Parameters

A Mapping from the desired name of the column in the DataFrame to the definition of the new column.

Returns
type:

A new DataFrame with all the existing columns, plus those specified in this function.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
    User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
    User.first_name: User.first
})

Deprecated. Use DataFrame(...) instead.

Deprecated. Use DataFrame(...) instead.

Read a .csv file as a DataFrame.

Parameters

The path to the .csv file. This may be a S3 or GCS storage url.

Whether the .csv file has a header row as the first row.

columns:
| None
= None

A mapping of index to feature name.

Returns
type:

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_csv(
    "s3://...",
    columns={0: MyFeatures.id, 1: MyFeatures.name},
    has_header=False,
)

Read a .avro file as a DataFrame.

Parameters

The path to the .avro file. This may be a S3 or GCS storage url.

Returns
type:

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_avro(
    "s3://...",
)

Compute the max value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the max value of each column.

Returns
type:

A DataFrame with the max value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).max()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
310╰─────────┴──────────╯

Compute the mean value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the mean value of each column.

Returns
type:

A DataFrame with the mean value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).mean()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
25╰─────────┴──────────╯

Compute the median value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the median value of each column.

Returns
type:

A DataFrame with the median value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).median()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
24╰─────────┴──────────╯

Compute the min value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the min value of each column.

Returns
type:

A DataFrame with the min value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).min()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11╰─────────┴──────────╯

Compute the standard deviation of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the standard deviation of each column.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:

A DataFrame with the standard deviation of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).std()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14.5826╰─────────┴──────────╯

Compute the sum of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the sum of each column.

Returns
type:

A DataFrame with the sum of each column.

Compute the variance of each of the columns in the DataFrame.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:

A DataFrame with the variance of each column.

Returns whether any the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns whether all the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3
df.count()
3

Get the only item from the DataFrame. This method will raise an error if the DataFrame contains more than one row or more than column.

Sort the DataFrame by the given columns.

Parameters
by:
str | Feature | Any

Feature(s) to sort by. Strings are parsed as feature names.

more_by:
str | Feature | Any
= ()

Additional columns to sort by, specified as positional arguments.

descending: = False

Sort in descending order. When sorting by multiple columns, can be specified per feature by passing a sequence of booleans.

nulls_last: = False

Place null values last.

Returns
type:

A new DataFrame with the rows sorted.

df = DataFrame({
    User.a: [1, 2, 3],
    User.b: [3, 2, 1],
})
df.sort(User.a)
a  b
-----------
0     1  3
1     2  2
2     3  1

Get the underlying DataFrame as a polars.LazyFrame.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns

The underlying polars.LazyFrame.

Get the underlying DataFrame as a pyarrow.Table.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns
type:

The underlying pyarrow.Table. This format is the canonical representation of the data in Chalk.

Get the underlying DataFrame as a pandas.DataFrame.prefixed Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Parameters

If True, use strings for column names. If False, use Feature objects.

prefixed: = True
Returns

The data formatted as a pandas.DataFrame.

Get values in the DataFrame as Features instances.

df = DataFrame({
    SpaceShip.id: [1, 2],
    SpaceShip.volume: [4_000, 5_000]
})
df.to_features()
[
    SpaceShip(id=1, volume=4000),
    SpaceShip(id=2, volume=5000)
]

Slice the DataFrame.

Parameters
offset: = 0

The offset to start at.

length: = None

The number of rows in the slice. If None (the default), include all rows from offset to the end of the DataFrame.

Returns
type:

The dataframe with the slice applied.

op
Class

Operations for aggregations in DataFrame.

The class methods on this class are used to create aggregations for use in DataFrame.group_by.

Functions

Add together the values of col and *cols in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

There must be at least one column to aggregate.

cols:
Feature | FeatureWrapper | str | Any
= ()

Subsequent columns to aggregate.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14.5├─────────┼──────────┤
310╰─────────┴──────────╯

Multiply together the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column to aggregate. Used in DataFrame.group_by.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
        User.active: [True, True, False],
    }
).group_by(
     group={User.id: User.id}
     agg={
        User.val: op.product(User.val),
        User.active: op.product(User.active),
     }
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
121├─────────┼──────────┼─────────────┤
3100╰─────────┴──────────┴─────────────╯

Find the maximum of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the maximum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14├─────────┼──────────┤
310╰─────────┴──────────╯

Find the minimum of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the minimum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
10.5├─────────┼──────────┤
310╰─────────┴──────────╯

Find the median of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the median value. In the case of an even number of elements, the median is the mean of the two middle elements.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
310╰─────────┴──────────╯

Find the mean of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the mean value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
36.5╰─────────┴──────────╯

Find the standard deviation of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the standard deviation.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.std(User.val)}
)

Find the variance of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the variance.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.variance(User.val)}
)

Find the count of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the count.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
12├─────────┼──────────┤
31╰─────────┴──────────╯

Concatenate the string values of col and col2 in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the last value.

col2:
Feature | FeatureWrapper | str | Any

The column with which to concatenate col.

sep: = ''

The separator to use when concatenating col and col2.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val='a'),
        User(id=1, val='b'),
        User(id=3, val='c'),
        User(id=3, val='d'),
    ]
).group_by(
    group={User.id: User.id},
    agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
1"ab"├─────────┼──────────┤
3"cd"╰─────────┴──────────╯

Deprecated. Use concat instead.

Find the last value of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the last value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=True,
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11├─────────┼──────────┤
35╰─────────┴──────────╯

Find the first value of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the first value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=False
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11├─────────┼──────────┤
35╰─────────┴──────────╯

A class for refining an aggregation defined by op.

Attributes
filters
list[Filter]
Functions

Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.

Parameters
f:
Filter | Any
= ()

A set of filters to apply to the aggregation. Each of the filters must be true to apply the aggregation.

Returns

The aggregation, allowing you to continue to chain methods.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
1      │ null     │
├─────────┼──────────┤
310╰─────────┴──────────╯

Create a Snowflake data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a PostgreSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a MySQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a DynamoDB data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details. DynamoDBSources can be queried via PartiQL SQL resolvers.

You may override the ambient AWS credentials by providing either a client ID and secret, or a role ARN.

Create a BigQuery data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a Redshift data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a CloudSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a SQLite source for a file.

Parameters
filename:
str | PathLike

The name of the file.

name: = None

The name to use in testing

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

Testing SQL source.

If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.

Parameters
name: = None

The name of the integration.

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

source = SQLiteInMemorySource(name="RISK")

Create a Databricks data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a Spanner data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Incremental settings for Chalk SQL queries.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s. This will incrementalize your query using the timestamp of the latest row that has been ingested.

Chalk will also include another query parameter named "chalk_last_execution_timestamp" that can be used instead. This will incrementalize your query using the last time the query was executed.

incremental_timestamp:

If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.

If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.

Attributes
mode
'row' | 'group' | 'parameter'

The amount of overlap to check for late-arriving rows.

The column on which to incrementalize.

incremental_timestamp
'feature_time' | 'resolver_execution_time'

The timestamp to set as the lower bound

Functions

Run a query from a SQL string.

Parameters

The query that you'd like to run.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql string specified by query need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Run a query from a SQL file.

This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL file resolvers for more information.

Parameters
path:
str | bytes | PathLike

The path to the file with the sql file, relative to the caller's file, or to the directory that your chalk.yaml file lives in.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql file specified by path need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Query using a SQLAlchemy model.

Parameters

Arguments as would normally be passed to a SQLAlchemy.

Returns

A query that can be returned from a resolver.

Get an SQLAlchemy Engine. The engine will be created and cached on the first call of this method.

Returns

A SQLAlchemy engine.

Functions

Automatically ingest a table.

Parameters

The name of the table to ingest.

The feature class that this table should be mapping to, e.g. User.

Columns in the table that should be ignored, and not mapped to features, even if there is a matching name.

Features on the feature class that should be ignored, and not mapped to columns, even if there is a matching name.

Columns that must exist in the mapping.

Features that must exist in the mapping.

Explicit mapping of columns to features for names that do not match.

Settings for incrementally ingesting the table.

from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
    name="users",
    features=User,
).with_table(
    name="accounts",
    features=Account,
    # Override one of the column mappings.
    column_to_feature={
        "acct_id": Account.id,
    },
)
Functions

Return the first result of this Query or None if the result doesn't contain any row.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a DataFrame.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_column:
str | Feature | None
= None

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

params: = None

Apply the given filtering criterion to a copy of this Query, using keyword expressions.

Parameters
kwargs: = {}

The column names assigned to the desired values (i.e. name="Maria").

Returns

A query that can be returned from a resolver or further filtered.

from chalk.sql import PostgreSQLSource
session = PostgreSQLSource()
session.query(UserFeatures(id=UserSQL.id)).filter_by(name="Maria")

Apply the given filtering criterion to a copy of this Query, using SQL expressions.

Parameters

SQLAlchemy filter criterion

Returns

A query that can be returned from a resolver or further filtered.

Apply one or more ORDER BY criteria to the query and return the newly resulting Query.

Parameters
clauses: = ()

SQLAlchemy columns.

Returns

A query that can be returned from a resolver or further filtered.

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Note: this requires the usage of the fields={...} argument when used in conjunction with query_string or query_sql_file.

Returns

The raw result of executing the query. For .all(), returns a DataFrame. For .one() or .one_or_none(), returns a Features instance corresponding to the relevant feature class.

Functions

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Returns
type:

A DataFrame with the results of the query.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a list.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

Generate a Chalk SQL file resolver from a filepath and a sql string. This will generate a resolver in your web dashboard that can be queried, but will not output a .chalk.sql file.

The optional parameters are overrides for the comment key-value pairs at the top of the sql file resolver. Comment key-value pairs specify important resolver information such as the source, feature namespace to resolve, and other details. Note that these will override any values specified in the sql string. See Configuration for more information.

See SQL file resolvers for more information on SQL file resolvers.

Parameters

The name of your resolver

The sql string for your query.

Can either be a BaseSQLSource or a string. If a string is provided, it will be used to infer the source by first scanning for a source with the same name, then inferring the source if it is a type, e.g. snowflake if there is only one database of that type. Optional if source is specified in sql.

resolves: = None

Describes the feature namespace to which the outputs belong. Optional if resolves is specified in sql.

kind:
'online' | 'offline' | 'streaming' | None
= None

The type of resolver. If not specified, defaults to "online".

Other Parameters
Show All
count:
1 | 'one' | 'one_or_none' | None
= None
timeout: = None
cron: = None
owner: = None
tags: = None
unique_on:
Collection[FeatureReference] | None
= None
partitioned_by:
Collection[Any] | None
= None
from chalk import make_sql_file_resolver
from chalk.features import features
@features
class User:
    id: int
    name: str
make_sql_file_resolver(
    name="my_resolver",
    sql="SELECT user_id as id, name FROM users",
    source="snowflake",
    resolves=User,
    kind="offline",
)

Decorator to create a stream resolver.

Parameters

The streaming source, e.g. KafkaSource(...) or KinesisSource(...) or PubSubSource(...).

mode:
'continuous' | 'tumbling' | None
= None

This parameter is defined when the streaming resolver returns a windowed feature. Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged bins of equal width. Tumbling windows are most often used alongside max_staleness to allow the features to be sent to the online store and offline store after each window period.

Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.

See more at Window modes

parse:
Callable[[T], V] | None
= None

A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.

See more at Parsing

keys: = None

A mapping from input BaseModel attribute to Chalk feature attribute to support continuous streaming re-keying. This parameter is required for continuous resolvers. Features that are included here do not have to be explicitly returned in the stream resolver: the feature will automatically be set to the key value used for aggregation. See more at Keys

timestamp: = None

An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom event timestamping

Other Parameters
Show All
owner: = None
Returns
type:

A callable function! You can unit-test stream resolvers as you would unit-test any other code.

Decorator to create a sink. Read more at Sinks

Parameters
fn:
Callable[P, T] | None
= None

The function that you're decorating as a resolver.

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.

buffer_size: = None

Count of updates to buffer.

upsert: = False
integration:
BaseSQLSourceProtocol | SinkIntegrationProtocol | None
= None
owner: = None

The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

name: = None

An alternative short name for the resolver, to use instead of the function name.

Returns

A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests

@sink
def process_updates(
    uid: User.id,
    email: User.email,
    phone: User.phone,
):
    user_service.update(
        uid=uid,
        email=email,
        phone=phone
    )
process_updates(123, "sam@chalk.ai", "555-555-5555")
Attributes

The URL of one of your Kafka brokers from which to fetch initial metadata about your Kafka cluster

The name of the topic to subscribe to.

An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

security_protocol
'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'

Protocol used to communicate with brokers. Valid values are "PLAINTEXT", "SSL", "SASL_PLAINTEXT", and "SASL_SSL". Defaults to "PLAINTEXT".

sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'

Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are "PLAIN", "GSSAPI", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER". Defaults to "PLAIN".

Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kafka topic to send messages when message processing fails

Functions
Attributes

The name of your stream. Either this or the stream_arn must be specified

The ARN of your stream. Either this or the stream_name must be specified

AWS region string, e.g. "us-east-2"

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kinesis stream name to send messages when message processing fails

AWS access key id credential

AWS secret access key credential

AWS access key id credential

optional endpoint to hit Kinesis server

Optional role ARN for the consumer to assume

Functions
Attributes

The project id of your pubsub source

The subscription id of your stream

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Pubsub topic id to send messages when message processing fails

Functions

Base class for all stream sources generated from @stream.

Attributes

e.g. 'kafka' or 'kinesis' or 'pubsub'

Identifier for the dead-letter queue (DLQ) for the stream. If not specified, failed messages will be dropped. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub.

Identifier for the stream to consume. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub

Functions

The ChalkClient is the primary Python interface for interacting with Chalk.

You can use it to query data, trigger resolver runs, gather offline data, and more.

Functions

Create a ChalkClient with the given credentials.

Parameters
client_id: = None

The client ID to use to authenticate. Can either be a service token id or a user token id.

The client secret to use to authenticate. Can either be a service token secret or a user token secret.

The ID or name of the environment to use for this client. Not necessary if your client_id and client_secret are for a service token scoped to a single environment. If not present, the client will use the environment variable CHALK_ENVIRONMENT.

api_server: = None

The API server to use for this client. Required if you are using a Chalk Dedicated deployment. If not present, the client will check for the presence of the environment variable CHALK_API_SERVER, and use that if found.

branch: = None

If specified, Chalk will route all requests from this client instance to the relevant branch. Some methods allow you to override this instance-level branch configuration by passing in a branch argument.

If specified, Chalk will route all requests from this client instance to the relevant tagged deployment. This cannot be used with the branch argument.

preview_deployment_id:
DeploymentId | None
= None

If specified, Chalk will route all requests from this client instance to the relevant preview deployment.

A requests.Session to use for all requests. If not provided, a new session will be created.

The query server to use for this client. Required if you are using a standalone Chalk query engine deployment. If not present, the client will default to the value of api_server.

A map of additional HTTP headers to pass with each request.

The default wait timeout, in seconds, to wait for long-running jobs to complete when accessing query results. Jobs will not time out if this timeout elapses. For no timeout, set to None. The default is no timeout.

The default wait timeout, in seconds, to wait for network requests to complete. If not specified, the default is no timeout.

local: = False

If True, point the client at a local version of the code.

ssl_context:
ssl.SSLContext | None
= None

A ssl.SSLContext that can be loaded with self-signed certificates so that requests requests to servers hosted with self-signed certificates succeed.

Compute features values using online resolvers. See Chalk Clients for more information.

Parameters

The features for which there are known values, mapped to those values. For example, {User.id: 1234}. Features can also be expressed as snakecased strings, e.g. {"user.id": 1234}

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

If an empty sequence, the output will be set to all features on the namespace of the query. For example, if you pass as input {"user.id": 1234}, then the query is defined on the User namespace, and all features on the User namespace (excluding has-one and has-many relationships) will be used as outputs.

now: = None

The time at which to evaluate the query. If not specified, the current time will be used. This parameter is complex in the context of online_query since the online store only stores the most recent value of an entity's features. If now is in the past, it is extremely likely that None will be returned for cache-only features.

This parameter is primarily provided to support:

  • controlling the time window for aggregations over cached has-many relationships
  • controlling the time wnidow for aggregations over has-many relationships loaded from an external database

If you are trying to perform an exploratory analysis of past feature values, prefer offline_query.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'. If your query name matches a NamedQuery, the query will automatically pull outputs and options specified in the matching NamedQuery.'

If query_name is specified, this specifies the version of the named query you're making. This is only useful if you want your query to use a NamedQuery with a specific name and a specific version. If a query_name has not been supplied, then this parameter is ignored.

Returns metadata about the query execution under OnlineQueryResult.meta. This could make the query slightly slower. For more information, see Chalk Clients.

explain: = False

Log the query execution plan. Requests using explain=True will be slower than requests using explain=False.

If True, 'include_meta' will be set to True as well.

If True, the output of each of the query plan stages will be stored. This option dramatically impacts the performance of the query, so it should only be used for debugging.

encoding_options:
FeatureEncodingOptions | None
= None

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

Other Parameters
Show All
meta: = None
planner_options: = None
headers: = None
Returns
type:
OnlineQueryResult

Wrapper around the output features and any query metadata and errors encountered while running the resolvers.

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

from chalk.client import ChalkClient
result = ChalkClient().query(
    input={
        User.name: "Katherine Johnson"
    },
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)

Execute multiple queries (represented by queries= argument) in a single request. This is useful if the queries are "rooted" in different @features classes -- i.e. if you want to load features for User and Merchant and there is no natural relationship object which is related to both of these classes, multi_query allows you to submit two independent queries.

Returns a BulkOnlineQueryResponse, which is functionally a list of query results. Each of these result can be accessed by index. Individual results can be further checked for errors and converted to pandas or polars DataFrames.

In contrast, query_bulk executes a single query with multiple inputs/outputs.

Parameters

A list of the OnlineQueries you'd like to execute.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
compression: = 'uncompressed'
Returns

An output containing results: list[BulkOnlineQueryResult], where each result contains dataframes of the results of each query or any errors.

from chalk.client import ChalkClient, OnlineQuery
queries = [
    OnlineQuery(input={User.name: ['Katherine Johnson'], output=[User.fico_score]}),
    OnlineQuery(input={Merchant.name': ['Myrrh Chant'], output=['Merchant.address']}),
    OnlineQuery(input={NonFeature.wrong': ['Wrong!'], output=['NonFeature.wrong']}),
]
result = ChalkClient().multi_query(
    queries=queries,
)
result[0].get_feature_value(User.fico_score)
queries_with_errors = [q for q, r in zip(queries, result) if r.errors is not None]

Compute features values for many rows of inputs using online resolvers. See Chalk Clients for more information on online query.

This method is similar to query, except it takes in list of inputs, and produces one output per row of inputs.

This method is appropriate if you want to fetch the same set of features for many different input primary keys.

This method contrasts with multi_query, which executes multiple fully independent queries.

This endpoint is not available in all environments.

Parameters

The features for which there are known values, mapped to a list of the values.

Outputs are the features that you'd like to compute from the inputs.

The time at which to evaluate the query. If not specified, the current time will be used. The length of this list must be the same as the length of the values in input.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
explain: = False
headers: = None
from chalk.client import ChalkClient
ChalkClient().query_bulk(
    input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

Plan a query without executing it.

Parameters

The features for which there are known values, mapped to those values. For example, {User.id: 1234}. Features can also be expressed as snakecased strings, e.g. {"user.id": 1234}

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'. If your query name matches a NamedQuery, the query will automatically pull outputs and options specified in the matching NamedQuery.'

If query_name is specified, this specifies the version of the named query you're making. This is only useful if you want your query to use a NamedQuery with a specific name and a specific version. If a query_name has not been supplied, then this parameter is ignored.

meta: = None

Arbitrary key:value pairs to associate with a query.

If true, the plan will store the intermediate values at each stage in the plan

explain: = False

If true, the plan will emit additional output to assist with debugging.

The number of input rows that this plan will be run with. If unknown, specify None.

headers: = None

Additional headers to provide with the request

from chalk.client import ChalkClient
result = ChalkClient().plan_query(
    input=[User.id],
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)
result.rendered_plan
result.output_schema

Get a Chalk Dataset containing data from a previously created dataset.

If an offline query has been created with a dataset name, .get_dataset will return a Chalk Dataset. The Dataset wraps a lazily-loading Chalk DataFrame that enables us to analyze our data without loading all of it directly into memory. See Offline Queries for more information.

Parameters

The name of the Dataset to return. Previously, you must have supplied a dataset name upon an offline query. Dataset names are unique for each environment. If 'dataset_name' is provided, then 'job_id' should not be provided.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

dataset_id:
str | uuid.UUID | None
= None

A UUID returned in the Dataset object from an offline query. Dataset ids are unique for each environment. If 'dataset_id' is provided, then 'dataset_name' and 'revision_id' should not be provided.

revision_id:
str | uuid.UUID | None
= None

The unique id of the DatasetRevision to return. If a previously-created dataset did not have a name, you can look it up using its unique job id instead. If 'revision_id' is provided, then 'dataset_name' and 'dataset_id' should not be provided.

Other Parameters
Show All
job_id:
str | uuid.UUID | None
= None
Returns
type:

A Dataset that lazily loads your query data.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(timezone.utc)
X = ChalkClient().offline_query(
    input={
        User.id: uids,
    },
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset='my_dataset_name'
)

Some time later...

dataset = ChalkClient().get_dataset(
    dataset_name='my_dataset_name'
)
...

or

dataset = ChalkClient().get_dataset(
    job_id='00000000-0000-0000-0000-000000000000'
)
...

If memory allows:

df: pd.DataFrame = dataset.get_data_as_pandas()

Compute feature values from the offline store or by running offline/online resolvers. See Dataset for more information.

Parameters

The features for which there are known values. It can be a mapping of features to a list of values for each feature, or an existing DataFrame. Each element in the DataFrame or list of values represents an observation in line with the timestamp in input_times.

The time at which the given inputs should be observed for point-in-time correctness. If given a list of times, the list must match the length of the input lists. Each element of input_time corresponds with the feature values at the same index of the input lists. See Temporal Consistency for more information.

The features that you'd like to sample, if they exist. If an output feature was never computed for a sample (row) in the resulting DataFrame, its value will be None.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

A unique name that if provided will be used to generate and save a Dataset constructed from the list of features computed from the inputs.

If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

max_samples: = None

The maximum number of samples to include in the DataFrame. If not specified, all samples will be returned.

wait: = False

Whether to wait for job completion.

If True, progress bars will be shown while the query is running. Primarily intended for use in a Jupyter-like notebook environment. This flag will also be propagated to the methods of the resulting Dataset.

timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Used to control whether resolvers are allowed to run in order to compute feature values.

If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.

A list of features that will always be sampled, and thus always excluded from recompute. Should not overlap with any features used in "recompute_features" argument.

If specified, the query will only be run on data observed after this timestamp. Accepts strings in ISO 8601 format.

If specified, the query will only be run on data observed before this timestamp. Accepts strings in ISO 8601 format.

If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.

explain: = False
tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

A SQL query that will query your offline store and use the result as input. See Input for more information.

Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs. See ResourceRequests for more information.

Boots a kubernetes job to run the queries in their own pods, separate from the engine and branch servers. This is useful for large datasets and jobs that require a long time to run.

If True, the output of the query will be stored in the online store.

If True, the output of the query will be stored in the offline store.

Returns
type:

A Chalk Dataset.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
    },
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()

Triggers a resolver to run. See Triggered Runs for more information.

Parameters

The fully qualified name of the resolver to trigger.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, the resolver will only ingest data observed before this timestamp. Accepts strings in ISO 8601 format.

If specified, the resolver will only ingest data observed after this timestamp. Accepts strings in ISO 8601 format.

If True, the resolver run output will be stored in the online store.

If True, the resolver run output will be stored in the offline store.

timestamping_mode:
'feature_time' | 'online_store_write_time'
= 'feature_time'
Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.

The ID of the resolver run.

status
ResolverRunStatus

The current status of the resolver run.

from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
    resolver_fqn="mymodule.fn"
)

Retrieves the status of a resolver run. See Triggered Runs for more information.

Parameters

ID of the resolver run to check.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.