Chalk home page
Docs
API
CLI

Chalk API Reference

Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:

from chalk.features import features, DataFrame
@features
class CreditCard:
   id: int
   user_id: "User.id"
   limit: float
@features
class User:
   id: int
   name: str
   email: str
   credit_cards: DataFrame[CreditCard]
   total_limit: float = _.credit_cards[_.limit].sum()

Features can be nested, as with credit_cards above. In this section, we'll dive into API components that make up the building blocks of features.

Chalk lets you spell out your features directly in Python.

Features are namespaced to a FeatureSet. To create a new FeatureSet, apply the @features decorator to a Python class with typed attributes. A FeatureSet is constructed and functions much like Python's own dataclass.

Parameters
owner: = None

The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.

tags: = None

Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Setting etl_offline_to_online on a feature class assigns it to all features on the class which do not explicitly specify etl_offline_to_online.

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Assigning a max_staleness to the feature class assigns it to all features on the class which do not explicitly specify a max_staleness value of their own.

Other Parameters
Show All
cls:
Type[T] | None
= None
name: = None
singleton: = False
@features(
    owner="andy@chalk.ai",
    max_staleness="30m",
    etl_offline_to_online=True,
    tags="user-group",
)
class User:
    id: str
    # Comments here appear in the web!
    # :tags: pii
    name: str | None
    # :owner: userteam@mycompany.com
    location: LatLng

Add metadata and configuration to a feature.

Parameters
owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner

from chalk.features import features, feature
from datetime import date
@features
class User:
    id: str
    # :owner: user-team@company.com
    name: str
    dob: date = feature(owner="user-team@company.com")
tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags

from chalk.features import features, feature
@features
class User:
    id: str
    # :tags: pii
    name: str
    dob: date = feature(tags=["pii"])
version: = None

The maximum version for a feature. Versioned features can be referred to with the @ operator:

@features
class User:
    id: str
    score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"

See more at Versioning

The default version for a feature. When you reference a versioned feature without the @ operator, you reference the default_version. Set to 1 by default.

@features
class User:
    id: str
    score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"

See more at Default versions

max_staleness:
... | Duration | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching

When True (default), Chalk will cache all values, including nulls.

When False, Chalk will not update the null entry in the cache.

When "evict_nulls", Chalk will evict the entry that would have been null from the cache, if it exists.

Concretely, suppose the current state of a database is {a: 1, b: 2}, and you write a row {a: 2, b: None}. Here is the expected result in the db:

  • {a: 2, b: None} when cache_nulls=True (default)
  • {a: 2, b: 2} when cache_nulls=False
  • {a: 2} when cache_nulls="evict_nulls"

If cache_defaults is set, this will override the value of cache_nulls

When True (default), Chalk will cache all values, including default values.

When False, Chalk will not update the default entry in the cache.

When "evict_defaults", Chalk will evict the entry that would have been a default value from the cache, if it exists.

Concretely, suppose the current state of a database is {a: 1, b: 2}, and you write a row {a: 2, b: "default"}, and the default value for feature b is "default". Here is the expected result in the db:

  • {a: 2, b: "default"} when cache_defaults=True
  • {a: 2, b: 2} when cache_defaults=False
  • {a: 2} when cache_defaults="evict_defaults"

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Read more at Reverse ETL

min:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

from chalk.features import features, feature
@features
class User:
    id: str
    fico_score: int = feature(min=300, max=850)
max:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

from chalk.features import features, feature
@features
class User:
    id: str
    fico_score: int = feature(min=300, max=850)
min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

from chalk.features import features, feature
@features
class User:
    id: str
    name: str = feature(min_length=1)
max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

from chalk.features import features, feature
@features
class User:
    id: str
    name: str = feature(min_length=1000)
strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

A list of validations to apply to this feature. Generally, max, min, max_length, and min_length are more convenient, but the parameter strict applies to all of those parameters. Use this parameter if you want to mix strict and non-strict validations.

The backing pyarrow.DataType for the feature. This parameter can be used to control the storage format of data. For example, if you have a lot of data that could be represented as smaller data types, you can use this parameter to save space.

import pyarrow as pa
@features
class WatchEvent:
    id: str
    duration_hours: float = feature(dtype=pa.float8())
default:
_TRich | ...
= ...

The default value of the feature if it otherwise can't be computed. If you don't need to specify other metadata, you can also assign a default in the same way you would assign a default to a dataclass:

from chalk.features import features
@features
class User:
    num_purchases: int = 0

An underscore expression for defining the feature. Typically, this value is assigned directly to the feature without needing to use the feature(...) function. However, if you want to define other properties, like a default or max_staleness, you'll want to use the expression keyword argument.

from chalk.features import features
from chalk import _
@features
class Receipt:
    subtotal: int
    tax: int = 0  # Default value, without other metadata
    total: int = feature(expression=_.subtotal + _.tax, default=0)

See more at Underscore

deprecated: = False

If True, this feature is considered deprecated, which impacts the dashboard, alerts, and warnings.

from chalk.features import features, feature
@features
class User:
    id: str
    name: str = feature(deprecated=True)
Other Parameters
Show All
description: = None
name: = None
primary: = None
encoder:
TEncoder[_TPrim, _TRich] | None
= None
decoder:
TDecoder[_TPrim, _TRich] | None
= None
offline_ttl:
... | Duration | None
= ...
Returns
type:
_TRich

The type of the input feature, given by _TRich.

from chalk.features import Primary, features, feature
@features
class User:
    uid: Primary[int]
    # Uses a default value of 0 when one cannot be computed.
    num_purchases: int = 0
    # Description of the name feature.
    # :owner: fraud@company.com
    # :tags: fraud, credit
    name: str = feature(
        max_staleness="10m",
        etl_offline_to_online=True
    )
    score: int = feature(
        version=2, default_version=2
    )

Specify a feature that represents a one-to-one relationship.

This function allows you to explicitly specify a join condition between two @features classes. When there is only one way to join two classes, we recommend using the foreign-key definition instead of this has_one function. For example, if you have a User class and a Card class, and each user has one card, you can define the Card and User classes as follows:

@features
class User
    id: str
@features
class Card
    id: str
    user_id: User.id
    user: User

However, if User has two cards (say, a primary and secondary), the foreign key syntax cannot be used to define the relationship, and you should use the has_one function.

Read more at Has One

Parameters
f:

The join condition between @feature classes. This argument is callable to allow for forward references to members of this class and the joined class.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    user_id: str
    balance: float
@features
class User
    id: str
    card: Card = has_one(
        lambda: User.id == Card.user_id
    )

Specify a feature that represents a one-to-many relationship.

Parameters
f:

The join condition between @features classes. This argument is callable to allow for forward references to members of this class and the joined class.

max_staleness:
Duration | None | ...
= ...

The maximum staleness of the joined feature. The items in the joined feature aggregate, storing the latest values of the joined feature for each primary key in the joined feature.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    user_id: str
    balance: float
@features
class User
    id: str
    cards: DataFrame[Card] = has_many(
        lambda: User.id == Card.user_id
    )

The function after can be used with DataFrame to compute windowed features.

after filters a DataFrame relative to the current time in context, such that if the after filter is defined as now - {time_window}, the filter will include all features with timestamps t where now - {time_window} <= t <= now. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to after take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for the DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]

The function before can be used with DataFrame to compute windowed features.

before filters a DataFrame relative to the current time in context such that if the before filter is defined as now - {time_window}, the filter will include all features with timestamps t where t <= now - {time_window}. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to before take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for a DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]

Declare a windowed feature.

Examples

@features
class User:
    failed_logins: Windowed[int] = windowed("10m", "24h")
Functions

Create a windowed feature.

See more at Windowed

Parameters
buckets: = ()

The size of the buckets for the window function. Buckets are specified as strings in the format "1d", "2h", "1h30m", etc. You may also choose to specify the buckets using the days, hours, and minutes parameters instead. The buckets parameter is helpful if you want to use multiple units to express the bucket size, like "1h30m".

Convenience parameter for specifying the buckets in days. Using this parameter is equvalent to specifying the buckets parameter with a string like "1d".

Convenience parameter for specifying the buckets in hours. Using this parameter is equvalent to specifying the buckets parameter with a string like "1h".

Convenience parameter for specifying the buckets in minutes. Using this parameter is equvalent to specifying the buckets parameter with a string like "1m".

owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

default:
TRich | ...
= ...

The default value of the feature if it otherwise can't be computed.

max_staleness:
Duration | ... | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.

See more at Caching

offline_ttl:
Duration | ... | None
= ...

Sets a maximum age for values eligible to be retrieved from the offline store, defined in relation to the query's current point-in-time.

version: = None

Feature versions allow you to manage a feature as its definition changes over time.

The version keyword argument allows you to specify the maximum number of versions available for this feature.

See more at Versioning

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers.

See more at Reverse ETL

min:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

A list of Validations to apply to this feature.

See more at https://docs.chalk.ai/api-docs#Validation

The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum().

materialization:
MaterializationWindowConfig | True | None
= None

Configuration for aggregating data. Pass bucket_duration with a Duration to configure the bucket size for aggregation. If True, each of the windows will use a bucket duration equal to its window duration.

See more at Materialized window aggregations

Other Parameters
Show All
description: = None
name: = None
encoder:
TEncoder[TPrim, TRich] | None
= None
decoder:
TDecoder[TPrim, TRich] | None
= None
dtype: = None
Returns
type:
Windowed[TRich]

Metadata for the windowed feature, parameterized by TPrim (the primitive type of the feature) and TRich (the decoded type of the feature, if decoder is provided).

from chalk import windowed, Windowed
@features
class User:
    id: int
    email_count: Windowed[int] = windowed(days=range(1, 30))
    logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]

Create a windowed feature with grouping.

See more at Grouped materialized window aggregations

Parameters
buckets: = ()

The size of the buckets for the window function. Buckets are specified as strings in the format "1d", "2h", "1h30m", etc. You may also choose to specify the buckets using the days, hours, and minutes parameters instead. The buckets parameter is helpful if you want to use multiple units to express the bucket size, like "1h30m".

The expression to compute the feature. This is an underscore expression, like _.transactions[_.amount].sum().

materialization:
MaterializationWindowConfig | True

Configuration for aggregating data. Pass bucket_duration with a Duration to configure the bucket size for aggregation.

See more at Materialized window aggregations

Convenience parameter for specifying the buckets in days. Using this parameter is equvalent to specifying the buckets parameter with a string like "1d".

Convenience parameter for specifying the buckets in hours. Using this parameter is equvalent to specifying the buckets parameter with a string like "1h".

Convenience parameter for specifying the buckets in minutes. Using this parameter is equvalent to specifying the buckets parameter with a string like "1m".

owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

default:
TRich | ...
= ...

The default value of the feature if it otherwise can't be computed.

min:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

The backing pyarrow.DataType for the feature. This parameter can be used to control the storage format of data. For example, if you have a lot of data that could be represented as smaller data types, you can use this parameter to save space.

import pyarrow as pa
from chalk.features import features
@features
class User:
    id: str
    email_count: Windowed[int] = windowed(
        "10m", "30m",
        dtype=pa.int16(),
    )
Other Parameters
Show All
description: = None
name: = None
Returns
type:
from chalk import group_by_windowed, DataFrame
from chalk.features import features
@features
class Email:
    id: int
    user_id: "User.id"
@features
class User:
    id: int
    emails: DataFrame[Email]
    emails_by_category: DataFrame = group_by_windowed(
        "10m", "30m",
        expression=_.emails.group_by(_.category).count(),
    )
Primary
Class

Marks a feature as the primary feature for a feature class.

Features named id on feature classes without an explicit primary feature are declared primary keys by default, and don't need to be marked with Primary.

If you have primary key feature with a name other than id, you can use this marker to indicate the primary key.

Examples

from chalk.features import features
from chalk import Primary
@features
class User:
    username: Primary[str]
Functions
Parameters
item:
typing.Union[Type, str, int]

The type of the feature value.

Returns

The type, with a special annotation indicating that it is a primary key.

from chalk.features import features
from chalk import Primary
@features
class User:
    username: Primary[str]

Specify explicit data validation for a feature.

The feature() function can also specify these validations, but this class allows you to specify both strict and non-strict validations at the same time.

Functions

Set validation parameters for a feature.

Parameters
min:
T | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
T | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

from chalk.features import features, feature
@features
class User:
    fico_score: int = feature(
        validations=[
            Validation(min=300, max=850, strict=True),
            Validation(min=300, max=320, strict=False),
            Validation(min=840, max=850, strict=False),
        ]
    )
    # If only one set of validations were needed,
    # you can use the [`feature`](#feature) function instead:
    first_name: str = feature(
        min_length=2, max_length=64, strict=True
    )
Vector
Class

The Vector class can be used type annotation to denote a Vector feature.

Instances of this class will be provided when working with raw vectors inside of resolvers. Generally, you do not need to construct instances of this class directly, as Chalk will automatically convert list-like features into Vector instances when working with a Vector annotation.

Parameters

data: numpy.Array | list[float] | pyarrow.FixedSizeListScalar The vector values

Examples

from chalk.features import Vector, features
@features
class Document:
    embedding: Vector[1536]
Attributes
precision
'fp16' | 'fp32' | 'fp64'

The precision of the Vector

Functions

Convert a vector to a PyArrow array.

Returns
type:
pa.FixedSizeListScalar

The vector, as a PyArrow array.

Convert a vector to a PyArrow array.

Returns
type:

The vector, as a PyArrow array.

Convert the vector to a Numpy array.

Parameters
writable: = False

Whether the numpy array should be writable. If so, an extra copy of the vector data will be made.

Returns
type:
np.ndarray

The vector, as a numpy array.

Convert the vector to a Python list.

Returns
type:

The vector, as a list of Python floats

Define a nearest neighbor relationship for performing Vector similarity search.metric: "l2" | "ip" | "cos" The metric to use to compute distance between two vectors. L2 Norm ("l2"), Inner Product ("ip"), and Cosine ("cos") are supported. Defaults to "l2".

Parameters

The other vector feature. This vector must have the same dtype and dimensions.

metric:
'l2' | 'ip' | 'cos'
= 'l2'
Returns
type:
Filter

A nearest neighbor relationship filter.

Decorator to create an online resolver.

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like:

@online(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
    return ...
owner: = None

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum Duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be returned along with each output feature.

Read more at Timeout.

resource_hint:
ResourceHint | None
= None

Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.

static: = False

Whether this resolver should be invoked once during planning time to build a static computation graph. If True, all inputs will either be StaticOperators (for has-many and DataFrame relationships) or StaticExpressions (for individual features). The resolver must return a StaticOperator as output.

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
name: = None
total: = False
unique_on:
Collection[Any] | None
= None
partitioned_by:
Collection[Any] | None
= None
Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@online
def name_match(
    name: User.full_name,
    account_name: User.bank_account.title
) -> User.account_name_match_score:
    if name.lower() == account_name.lower():
        return 1.
    return 0.

Decorator to create an offline resolver.

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like::

@offline(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
   ...
owner: = None

Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum Duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be raised.

Read more at Timeout.

resource_hint:
ResourceHint | None
= None

Whether this resolver is bound by CPU or I/O. Chalk uses the resource hint to optimize resolver execution.

static: = False

Whether this resolver should be invoked once during planning time to build a static computation graph. If True, all inputs will either be StaticOperators (for has-many and dataframe relationships) or StaticExpressions (for individual features). The resolver must return a StaticOperator as output.

total: = False

Whether this resolver returns all ids of a given namespace. To have this annotation, the resolver must take no arguments and return a DataFrame. Typically, this annotation would be used in a SQL-file resolver.

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
name: = None
unique_on:
Collection[Any] | None
= None
partitioned_by:
Collection[Any] | None
= None
Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@offline(cron="1h")
def get_fraud_score(
    email: User.email,
    name: User.name,
) -> User.fraud_score:
    return socure.get_sigma_score(email, name)
Cron
Class

Detailed options for specify the schedule and filtering functions for Chalk batch jobs.

Functions

Run an online or offline resolver on a schedule.

This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.

Parameters

The period of the cron job. Can be either a crontab ("0 * * * *") or a Duration ("2h").

filter: = None

Optionally, a function to filter down the arguments to consider.

See Filtering examples for more information.

sample: = None

Explicitly provide the sample function for the cron job.

See Custom examples for more information.

Using a filter

def only_active_filter(v: User.active):
    return v
@online(cron=Cron(schedule="1d", filter=only_active_filter))
def score_user(d: User.signup_date) -> User.score:
    return ...

Using a sample function

def s() -> DataFrame[User.id]:
    return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:

A resolver, returned from the decorators @offline and @online.

Attributes

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others

Read more at Environments

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

The docstring of the resolver.

The function name of the resolver.

The python module where the function is defined

The type annotations for the resolver

The filename in which the resolver is defined.

The name of the resolver, either given by the name of the function, or by the keyword argument name given to @offline or @online.

resource_hint
ResourceHint | None

Whether this resolver is bound by CPU or I/O

whether the resolver is static. Static resolvers are "executed" once during planning time to produce a computation graph.

The fully qualified name for the resolver

Functions

Returns the result of calling the function decorated with @offline or @online with the given arguments.

Parameters
args:
P.args
= ()

The arguments to pass to the decorated function. If one of the arguments is a DataFrame with a filter or projection applied, the resolver will only be called with the filtered or projected data. Read more at https://docs.chalk.ai/docs/unit-tests#data-frame-inputs

kwargs:
P.kwargs
= {}
Returns
type:
T_co

The result of calling the decorated function with args. Useful for unit-testing.

Read more at Unit Tests

@online
def get_num_bedrooms(
    rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
    return len(rooms)
rooms = [
    Room(id=1, name="bedroom"),
    Room(id=2, name="kitchen"),
    Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
MachineType
Type Alias

The type of machine to use.

You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.

Chalk includes a DataFrame class that models tabular data in much the same way that pandas does. However, there are some key differences that allow the Chalk DataFrame to increase type safety and performance.

Like pandas, Chalk's DataFrame is a two-dimensional data structure with rows and columns. You can perform operations like filtering, grouping, and aggregating on a DataFrame. However, there are two main differences.

  • Lazy implementation - Chalk's DataFrame is lazy and can be backed by multiple data sources, where a pandas.DataFrame executes eagerly in memory.
  • Use as a type - Chalk's DataFrame[...] can be used to represent a type of data with pre-defined filters.

Lazy Execution

Unlike pandas, the implementation of a Chalk DataFrame is lazy, and can be executed against many different backend sources of data. For example, in unit tests, a DataFrame uses an implementation backed by polars. But if your DataFrame was returned from a SQL source, filters and aggregations may be pushed down to the database for efficient execution.

Use as a Type

Each column of a Chalk DataFrame typed by a Feature type. For example, you might have a resolver returning a DataFrame containing user ids and names:

@features
class User:
   id: int
   name: str
   email: str

@online
def get_users() -> DataFrame[User.id, User.name]:
   return DataFrame([
       User(id=1, name="Alice"),
       User(id=2, name="Bob")
   ])

Note that the DataFrame type is parameterized by the columns that it contains. In this case, the DataFrame contains two columns, User.id and User.name.

The specific operations available on a DataFrame are discussed below. For a higher-level discussion, see DataFrame.

Attributes
filters
ClassVar[tuple[Filter, ...]]
columns
tuple[Feature, ...]

The maximum number of rows to return

The shape of the DataFrame as a tuple of (num_rows, num_columns).

Examples

DataFrame({User.id: [1, 2, 3, 4, 5]}).shape
(5, 1)
Functions

Construct a Chalk DataFrame.

Parameters
data: = None

The data. Can be an existing pandas.DataFrame, polars.DataFrame or polars.LazyFrame, a sequence of feature instances, or a dict mapping a feature to a sequence of values.

missing_value_strategy:
MissingValueStrategy
= 'default_or_allow'

The strategy to use to handle missing values.

A feature value is "missing" if it is an ellipsis (...), or it is None and the feature is not annotated as Optional[...].

The available strategies are:

  • 'error': Raise a TypeError if any missing values are found. Do not attempt to replace missing values with the default value for the feature.
  • 'default_or_error': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, raise a TypeError.
  • 'default_or_allow': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, leave it as None. This is the default strategy.
  • 'allow': Allow missing values to be stored in the DataFrame. This option may result non-nullable features being assigned None values.

Row-wise construction

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])

Column-wise construction

df = DataFrame({
    User.id: [1, 2],
    User.first: ["Sam", "Iris"],
    User.last: ["Wu", "Xi"]
})

Construction from polars.DataFrame

import polars
df = DataFrame(polars.DataFrame({
    "user.id": [1, 2],
    "user.first": ["Sam", "Iris"],
    "user.last": ["Wu", "Xi"]
}))

Filter the rows of a DataFrame or project out columns.

You can select columns out of a DataFrame from the set of columns already present to produce a new DataFrame scoped down to those columns.

Or, you can filter the rows of a DataFrame by using Python's built-in operations on feature columns.

Parameters

Filters and projections to apply to the DataFrame.

Returns

A DataFrame with the filters and projections in item applied.

df = DataFrame({
    User.age: [21, 22, 23],
    User.email: [...],
})

Filtering

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai"
]

Projecting

df[User.name]

Filtering & Projecting

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai",
    User.name
]

Aggregate the DataFrame by the specified columns.

Parameters
group:
dict[Feature | Any, Feature | Any]

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

Returns
type:

The DataFrame with the specified aggregations applied.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
310╰─────────┴──────────╯

Compute a histogram with fixed width bins.

Parameters
nbins: = None

If supplied, will be used to compute the binwidth.

If not supplied, computed from the data (actual max and min values).

base: = None

The value of the first histogram bin. Defaults to the minimum value of column.

eps: = 1e-13

Allowed floating point epsilon for histogram base

column: = None

The column to compute the histogram on. If not supplied, the DataFrame is assumed to contain a single column.

descending: = False

If True, the histogram buckets will be sorted in descending order.

Returns
type:

A list of the counts in each bin.

DataFrame({
  Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]

Group based on a time value (date or datetime).

The groups are defined by a time-based window, and optionally, other columns in the DataFrame. The "width" of the window is defined by the period parameter, and the spacing between the windows is defined by the every parameter. Note that if the every parameter is smaller than the period parameter, then the windows will overlap, and a single row may be assigned to multiple groups.

As an example, consider the following DataFrame:


val:    a  b    c   d e     f           g h
    ─────────●─────────●─────────●─────────●───────▶
time:        A         B         C         D
    ┌─────────┐
1   │   a  b  │                                    1: [a, b]
    └────┬────┴────┐
2   ◀───▶│ b    c  │                               2: [b, c]
    every└────┬────┴────┐
3   ◀────────▶│ c   d e │                          3: [c, d, e]
      period  └────┬────┴────┐
4                  │d e     f│                     4: [d, e, f]
                   └────┬────┴────┐
5                       │   f     │                5: [f]
                        └────┬────┴────┐
6                            │         │
                             └────┬────┴────┐
7                                 │     g h │      7: [g, h]
                                  └────┬────┴────┐
8                                      │g h      │ 8: [g, h]
                                       └─────────┘

In the above example, the sixth time bucket is empty, and will not be included in the resulting DataFrame.

Parameters
index:
Feature | Any

The column to use as the index for the time-based grouping.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

The spacing between the time-based windows. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration.

group:
dict[Feature | Any, Feature | Any] | None
= None

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame. This parameter is optional, and if not specified, then the resulting DataFrame groups will be determined by the index parameter alone.

period: = None

The width of the time-based window. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration. If None it is equal to every.

Other Parameters
Show All
offset: = None
start_by:
'window' | 'datapoint' | 'monday' | 'tuesday' | 'wednesday' | 'thursday' | 'friday' | 'saturday' | 'sunday'
= 'window'
Returns
type:

A new DataFrame with the specified time-based grouping applied. The resulting DataFrame will have a column for each of the keys in group", "or each of the keys inagg, and for theindex` parameter.

from chalk import DataFrame, op
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
        User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
    },
).group_by_hopping(
     index=User.ts,
     group={User.id: User.id},
     agg={User.val: op.median(User.val)},
     period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts  │ User.val │
╞═════════╪══════════╪══════════╡
12020-1-13├─────────┼──────────┼──────────┤
32020-1-310╰─────────┴──────────┴──────────╯

Vertically stack the DataFrame with another DataFrame containing the same columns. The DataFrame other will be appended to the bottom of this DataFrame.

Parameters

The other DataFrame to stack with this DataFrame.

Returns
type:

The DataFrame with the other DataFrame stacked on the bottom.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.vstack(df)

Return the number of unique values in the specified column.

Parameters
column: = None

The column to compute the number of unique values for. If None, then the number of unique values in the entire DataFrame is returned.

Returns
type:

The number of unique values in the specified column.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.num_unique(User.id)
2

Rename columns in the DataFrame.

Parameters

A mapping from the current feature for a column to the desired feature for the column.

Returns
type:

The DataFrame with the specified columns renamed.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
]).rename({User.last: User.family})

Add a column to the DataFrame.

Parameters

The name of the column to add.

The definition of the column to add. This could be a constant value (e.g. 1 or True), an expression (e.g. op.max(User.score_1, User.score_2)), or a list of values (e.g. [1, 2, 3]).

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
    User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
    User.first_name, User.first
)

Add columns to the DataFrame.

Parameters

A Mapping from the desired name of the column in the DataFrame to the definition of the new column.

Returns
type:

A new DataFrame with all the existing columns, plus those specified in this function.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
    User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
    User.first_name: User.first
})

Deprecated. Use DataFrame(...) instead.

Deprecated. Use DataFrame(...) instead.

Read a .csv file as a DataFrame.

Parameters

The path to the .csv file. This may be a S3 or GCS storage url.

Whether the .csv file has a header row as the first row.

columns:
| None
= None

A mapping of index to feature name.

Returns
type:

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_csv(
    "s3://...",
    columns={0: MyFeatures.id, 1: MyFeatures.name},
    has_header=False,
)

Read a .avro file as a DataFrame.

Parameters

The path to the .avro file. This may be a S3 or GCS storage url.

Returns
type:

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_avro(
    "s3://...",
)

Compute the max value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the max value of each column.

Returns
type:

A DataFrame with the max value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).max()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
310╰─────────┴──────────╯

Compute the mean value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the mean value of each column.

Returns
type:

A DataFrame with the mean value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).mean()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
25╰─────────┴──────────╯

Compute the median value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the median value of each column.

Returns
type:

A DataFrame with the median value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).median()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
24╰─────────┴──────────╯

Compute the min value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the min value of each column.

Returns
type:

A DataFrame with the min value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).min()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11╰─────────┴──────────╯

Compute the standard deviation of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the standard deviation of each column.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:

A DataFrame with the standard deviation of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).std()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14.5826╰─────────┴──────────╯

Compute the sum of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the sum of each column.

Returns
type:

A DataFrame with the sum of each column.

Compute the variance of each of the columns in the DataFrame.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:

A DataFrame with the variance of each column.

Returns whether any the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns whether all the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3
df.count()
3

Get the only item from the DataFrame. This method will raise an error if the DataFrame contains more than one row or more than column.

Sort the DataFrame by the given columns.

Parameters
by:
str | Feature | Any

Feature(s) to sort by. Strings are parsed as feature names.

more_by:
str | Feature | Any
= ()

Additional columns to sort by, specified as positional arguments.

descending: = False

Sort in descending order. When sorting by multiple columns, can be specified per feature by passing a sequence of booleans.

nulls_last: = False

Place null values last.

Returns
type:

A new DataFrame with the rows sorted.

df = DataFrame({
    User.a: [1, 2, 3],
    User.b: [3, 2, 1],
})
df.sort(User.a)
a  b
-----------
0     1  3
1     2  2
2     3  1

Get the underlying DataFrame as a polars.LazyFrame.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns

The underlying polars.LazyFrame.

Get the underlying DataFrame as a pyarrow.Table.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns
type:

The underlying pyarrow.Table. This format is the canonical representation of the data in Chalk.

Get the underlying DataFrame as a pandas.DataFrame.prefixed Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Parameters

If True, use strings for column names. If False, use Feature objects.

prefixed: = True
Returns

The data formatted as a pandas.DataFrame.

Get values in the DataFrame as Features instances.

df = DataFrame({
    SpaceShip.id: [1, 2],
    SpaceShip.volume: [4_000, 5_000]
})
df.to_features()
[
    SpaceShip(id=1, volume=4000),
    SpaceShip(id=2, volume=5000)
]

Slice the DataFrame.

Parameters
offset: = 0

The offset to start at.

length: = None

The number of rows in the slice. If None (the default), include all rows from offset to the end of the DataFrame.

Returns
type:

The dataframe with the slice applied.

op
Class

Operations for aggregations in DataFrame.

The class methods on this class are used to create aggregations for use in DataFrame.group_by.

Functions

Add together the values of col and *cols in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

There must be at least one column to aggregate.

cols:
Feature | FeatureWrapper | str | Any
= ()

Subsequent columns to aggregate.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14.5├─────────┼──────────┤
310╰─────────┴──────────╯

Multiply together the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column to aggregate. Used in DataFrame.group_by.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
        User.active: [True, True, False],
    }
).group_by(
     group={User.id: User.id}
     agg={
        User.val: op.product(User.val),
        User.active: op.product(User.active),
     }
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
121├─────────┼──────────┼─────────────┤
3100╰─────────┴──────────┴─────────────╯

Find the maximum of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the maximum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
14├─────────┼──────────┤
310╰─────────┴──────────╯

Find the minimum of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the minimum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
10.5├─────────┼──────────┤
310╰─────────┴──────────╯

Find the median of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the median value. In the case of an even number of elements, the median is the mean of the two middle elements.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
310╰─────────┴──────────╯

Find the mean of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the mean value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
13├─────────┼──────────┤
36.5╰─────────┴──────────╯

Find the standard deviation of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the standard deviation.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.std(User.val)}
)

Find the variance of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the variance.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.variance(User.val)}
)

Find the count of the values of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the count.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
12├─────────┼──────────┤
31╰─────────┴──────────╯

Concatenate the string values of col and col2 in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the last value.

col2:
Feature | FeatureWrapper | str | Any

The column with which to concatenate col.

sep: = ''

The separator to use when concatenating col and col2.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val='a'),
        User(id=1, val='b'),
        User(id=3, val='c'),
        User(id=3, val='d'),
    ]
).group_by(
    group={User.id: User.id},
    agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
1"ab"├─────────┼──────────┤
3"cd"╰─────────┴──────────╯

Deprecated. Use concat instead.

Find the last value of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the last value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=True,
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11├─────────┼──────────┤
35╰─────────┴──────────╯

Find the first value of col in a DataFrame.

Parameters
col:
Feature | FeatureWrapper | str | Any

The column along which to find the first value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=False
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
11├─────────┼──────────┤
35╰─────────┴──────────╯

A class for refining an aggregation defined by op.

Attributes
filters
list[Filter]
Functions

Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.

Parameters
f:
Filter | Any
= ()

A set of filters to apply to the aggregation. Each of the filters must be true to apply the aggregation.

Returns

The aggregation, allowing you to continue to chain methods.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
1      │ null     │
├─────────┼──────────┤
310╰─────────┴──────────╯

Create a Snowflake data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a PostgreSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a MySQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a DynamoDB data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details. DynamoDBSources can be queried via PartiQL SQL resolvers.

You may override the ambient AWS credentials by providing either a client ID and secret, or a role ARN.

Create a BigQuery data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a Redshift data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a CloudSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a SQLite source for a file.

Parameters
filename:
str | PathLike

The name of the file.

name: = None

The name to use in testing

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

Testing SQL source.

If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.

Parameters
name: = None

The name of the integration.

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

source = SQLiteInMemorySource(name="RISK")

Create a Databricks data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a Spanner data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Incremental settings for Chalk SQL queries.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s. This will incrementalize your query using the timestamp of the latest row that has been ingested.

Chalk will also include another query parameter named "chalk_last_execution_timestamp" that can be used instead. This will incrementalize your query using the last time the query was executed.

incremental_timestamp:

If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.

If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.

Attributes
mode
'row' | 'group' | 'parameter'

The amount of overlap to check for late-arriving rows.

The column on which to incrementalize.

incremental_timestamp
'feature_time' | 'resolver_execution_time'

The timestamp to set as the lower bound

Functions

Run a query from a SQL string.

Parameters

The query that you'd like to run.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql string specified by query need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Run a query from a SQL file.

This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL file resolvers for more information.

Parameters
path:
str | bytes | PathLike

The path to the file with the sql file, relative to the caller's file, or to the directory that your chalk.yaml file lives in.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql file specified by path need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Query using a SQLAlchemy model.

Parameters

Arguments as would normally be passed to a SQLAlchemy.

Returns

A query that can be returned from a resolver.

Get an SQLAlchemy Engine. The engine will be created and cached on the first call of this method.

Returns

A SQLAlchemy engine.

Functions

Automatically ingest a table.

Parameters

The name of the table to ingest.

The feature class that this table should be mapping to, e.g. User.

Columns in the table that should be ignored, and not mapped to features, even if there is a matching name.

Features on the feature class that should be ignored, and not mapped to columns, even if there is a matching name.

Columns that must exist in the mapping.

Features that must exist in the mapping.

Explicit mapping of columns to features for names that do not match.

Settings for incrementally ingesting the table.

from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
    name="users",
    features=User,
).with_table(
    name="accounts",
    features=Account,
    # Override one of the column mappings.
    column_to_feature={
        "acct_id": Account.id,
    },
)
Functions

Return the first result of this Query or None if the result doesn't contain any row.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a DataFrame.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_column:
str | Feature | None
= None

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

params: = None
Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Apply the given filtering criterion to a copy of this Query, using keyword expressions.

Parameters
kwargs: = {}

The column names assigned to the desired values (i.e. name="Maria").

Returns

A query that can be returned from a resolver or further filtered.

from chalk.sql import PostgreSQLSource
session = PostgreSQLSource()
session.query(UserFeatures(id=UserSQL.id)).filter_by(name="Maria")

Apply the given filtering criterion to a copy of this Query, using SQL expressions.

Parameters

SQLAlchemy filter criterion

Returns

A query that can be returned from a resolver or further filtered.

Apply one or more ORDER BY criteria to the query and return the newly resulting Query.

Parameters
clauses: = ()

SQLAlchemy columns.

Returns

A query that can be returned from a resolver or further filtered.

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Note: this requires the usage of the fields={...} argument when used in conjunction with query_string or query_sql_file.

Returns

The raw result of executing the query. For .all(), returns a DataFrame. For .one() or .one_or_none(), returns a Features instance corresponding to the relevant feature class.

Functions

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Returns
type:

A DataFrame with the results of the query.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a list.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Generate a Chalk SQL file resolver from a filepath and a sql string. This will generate a resolver in your web dashboard that can be queried, but will not output a .chalk.sql file.

The optional parameters are overrides for the comment key-value pairs at the top of the sql file resolver. Comment key-value pairs specify important resolver information such as the source, feature namespace to resolve, and other details. Note that these will override any values specified in the sql string. See Configuration for more information.

See SQL file resolvers for more information on SQL file resolvers.

Parameters

The name of your resolver

The sql string for your query.

Can either be a BaseSQLSource or a string. If a string is provided, it will be used to infer the source by first scanning for a source with the same name, then inferring the source if it is a type, e.g. snowflake if there is only one database of that type. Optional if source is specified in sql.

resolves: = None

Describes the feature namespace to which the outputs belong. Optional if resolves is specified in sql.

kind:
'online' | 'offline' | 'streaming' | None
= None

The type of resolver. If not specified, defaults to "online".

Other Parameters
Show All
count:
1 | 'one' | 'one_or_none' | None
= None
timeout: = None
cron: = None
owner: = None
tags: = None
unique_on:
Collection[FeatureReference] | None
= None
partitioned_by:
Collection[Any] | None
= None
from chalk import make_sql_file_resolver
from chalk.features import features
@features
class User:
    id: int
    name: str
make_sql_file_resolver(
    name="my_resolver",
    sql="SELECT user_id as id, name FROM users",
    source="snowflake",
    resolves=User,
    kind="offline",
)

Decorator to create a stream resolver.

Parameters

The streaming source, e.g. KafkaSource(...) or KinesisSource(...) or PubSubSource(...).

mode:
'continuous' | 'tumbling' | None
= None

This parameter is defined when the streaming resolver returns a windowed feature. Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged bins of equal width. Tumbling windows are most often used alongside max_staleness to allow the features to be sent to the online store and offline store after each window period.

Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.

See more at Window modes

parse:
Callable[[T], V] | None
= None

A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.

See more at Parsing

keys: = None

A mapping from input BaseModel attribute to Chalk feature attribute to support continuous streaming re-keying. This parameter is required for continuous resolvers. Features that are included here do not have to be explicitly returned in the stream resolver: the feature will automatically be set to the key value used for aggregation. See more at Keys

timestamp: = None

An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom event timestamping

Other Parameters
Show All
owner: = None
Returns
type:

A callable function! You can unit-test stream resolvers as you would unit-test any other code.

Decorator to create a sink. Read more at Sinks

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

buffer_size: = None

Count of updates to buffer.

owner: = None

The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
debounce: = None
max_delay: = None
upsert: = False
integration:
BaseSQLSourceProtocol | SinkIntegrationProtocol | None
= None
name: = None
Returns

A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests

@sink
def process_updates(
    uid: User.id,
    email: User.email,
    phone: User.phone,
):
    user_service.update(
        uid=uid,
        email=email,
        phone=phone
    )
process_updates(123, "sam@chalk.ai", "555-555-5555")
Attributes

The URL of one of your Kafka brokers from which to fetch initial metadata about your Kafka cluster

The name of the topic to subscribe to.

An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

security_protocol
'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'

Protocol used to communicate with brokers. Valid values are "PLAINTEXT", "SSL", "SASL_PLAINTEXT", and "SASL_SSL". Defaults to "PLAINTEXT".

sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'

Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are "PLAIN", "GSSAPI", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER". Defaults to "PLAIN".

Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kafka topic to send messages when message processing fails

Functions
Attributes

The name of your stream. Either this or the stream_arn must be specified

The ARN of your stream. Either this or the stream_name must be specified

AWS region string, e.g. "us-east-2"

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kinesis stream name to send messages when message processing fails

AWS access key id credential

AWS secret access key credential

AWS access key id credential

optional endpoint to hit Kinesis server

Optional role ARN for the consumer to assume

Functions
Attributes

The project id of your PubSub source

The subscription id of your PubSub topic from which you want to consume messages. To enable permission for consuming this screen, ensure that the service account has the permissions 'pubsub.subscriptions.consume' and 'pubsub.subscriptions.get'.

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

PubSub topic id to send messages when message processing fails. Add the permission 'pubsub.topics.publish' if this is set.

Functions

Base class for all stream sources generated from @stream.

Attributes

e.g. 'kafka' or 'kinesis' or 'pubsub'

Identifier for the dead-letter queue (DLQ) for the stream. If not specified, failed messages will be dropped. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub.

Identifier for the stream to consume. Stream name for Kinesis, topic name for Kafka, subscription id for PubSub

Functions

The ChalkClient is the primary Python interface for interacting with Chalk.

You can use it to query data, trigger resolver runs, gather offline data, and more.

Functions

Create a ChalkClient with the given credentials.

Parameters
client_id: = None

The client ID to use to authenticate. Can either be a service token id or a user token id.

The client secret to use to authenticate. Can either be a service token secret or a user token secret.

The ID or name of the environment to use for this client. Not necessary if your client_id and client_secret are for a service token scoped to a single environment. If not present, the client will use the environment variable CHALK_ENVIRONMENT.

api_server: = None

The API server to use for this client. Required if you are using a Chalk Dedicated deployment. If not present, the client will check for the presence of the environment variable CHALK_API_SERVER, and use that if found.

branch: = None

If specified, Chalk will route all requests from this client instance to the relevant branch. Some methods allow you to override this instance-level branch configuration by passing in a branch argument.

If True, the client will pick up the branch from the current git branch.

If specified, Chalk will route all requests from this client instance to the relevant tagged deployment. This cannot be used with the branch argument.

preview_deployment_id:
DeploymentId | None
= None

If specified, Chalk will route all requests from this client instance to the relevant preview deployment.

A requests.Session to use for all requests. If not provided, a new session will be created.

The query server to use for this client. Required if you are using a standalone Chalk query engine deployment. If not present, the client will default to the value of api_server.

A map of additional HTTP headers to pass with each request.

The default wait timeout, in seconds, to wait for long-running jobs to complete when accessing query results. Jobs will not time out if this timeout elapses. For no timeout, set to None. The default is no timeout.

The default wait timeout, in seconds, to wait for network requests to complete. If not specified, the default is no timeout.

local: = False

If True, point the client at a local version of the code.

ssl_context:
ssl.SSLContext | None
= None

A ssl.SSLContext that can be loaded with self-signed certificates so that requests requests to servers hosted with self-signed certificates succeed.

Raises

If client_id or client_secret are not provided, there is no ~/.chalk.yml file with applicable credentials, and the environment variables CHALK_CLIENT_ID and CHALK_CLIENT_SECRET are not set.

Compute features values using online resolvers. See Chalk Clients for more information.

Parameters

The features for which there are known values, mapped to those values. For example, {User.id: 1234}. Features can also be expressed as snakecased strings, e.g. {"user.id": 1234}

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

If an empty sequence, the output will be set to all features on the namespace of the query. For example, if you pass as input {"user.id": 1234}, then the query is defined on the User namespace, and all features on the User namespace (excluding has-one and has-many relationships) will be used as outputs.

now: = None

The time at which to evaluate the query. If not specified, the current time will be used. This parameter is complex in the context of online_query since the online store only stores the most recent value of an entity's features. If now is in the past, it is extremely likely that None will be returned for cache-only features.

This parameter is primarily provided to support:

  • controlling the time window for aggregations over cached has-many relationships
  • controlling the time wnidow for aggregations over has-many relationships loaded from an external database

If you are trying to perform an exploratory analysis of past feature values, prefer offline_query.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'. If your query name matches a NamedQuery, the query will automatically pull outputs and options specified in the matching NamedQuery.

If query_name is specified, this specifies the version of the named query you're making. This is only useful if you want your query to use a NamedQuery with a specific name and a specific version. If a query_name has not been supplied, then this parameter is ignored.

Returns metadata about the query execution under OnlineQueryResult.meta. This could make the query slightly slower. For more information, see Chalk Clients.

explain: = False

Log the query execution plan. Requests using explain=True will be slower than requests using explain=False.

If True, 'include_meta' will be set to True as well.

If True, the output of each of the query plan stages will be stored. This option dramatically impacts the performance of the query, so it should only be used for debugging.

encoding_options:
FeatureEncodingOptions | None
= None

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

Other Parameters
Show All
meta: = None
planner_options: = None
headers: = None
Returns
type:
OnlineQueryResult

Wrapper around the output features and any query metadata and errors encountered while running the resolvers.

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

from chalk.client import ChalkClient
result = ChalkClient().query(
    input={
        User.name: "Katherine Johnson"
    },
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)

Execute multiple queries (represented by queries= argument) in a single request. This is useful if the queries are "rooted" in different @features classes -- i.e. if you want to load features for User and Merchant and there is no natural relationship object which is related to both of these classes, multi_query allows you to submit two independent queries.

Returns a BulkOnlineQueryResponse, which is functionally a list of query results. Each of these result can be accessed by index. Individual results can be further checked for errors and converted to pandas or polars DataFrames.

In contrast, query_bulk executes a single query with multiple inputs/outputs.

Parameters

A list of the OnlineQuery objects you'd like to execute.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
compression: = 'uncompressed'
Returns

An output containing results as a list[BulkOnlineQueryResult], where each result contains a DataFrame of the results of each query or any errors.

from chalk.client import ChalkClient, OnlineQuery
queries = [
    OnlineQuery(
        input={User.name: 'Katherine Johnson'},
        output=[User.fico_score],
    ),
    OnlineQuery(
        input={Merchant.name: 'Eight Sleep'},
        output=[Merchant.address],
    ),
]
result = ChalkClient().multi_query(queries)
result[0].get_feature_value(User.fico_score)

Compute features values for many rows of inputs using online resolvers. See Chalk Clients for more information on online query.

This method is similar to query, except it takes in list of inputs, and produces one output per row of inputs.

This method is appropriate if you want to fetch the same set of features for many different input primary keys.

This method contrasts with multi_query, which executes multiple fully independent queries.

This endpoint is not available in all environments.

Parameters

The features for which there are known values, mapped to a list of the values.

Outputs are the features that you'd like to compute from the inputs.

The time at which to evaluate the query. If not specified, the current time will be used. The length of this list must be the same as the length of the values in input.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
explain: = False
headers: = None
Returns

An output containing results as a list[BulkOnlineQueryResult], where each result contains a DataFrame of the results of each query.

from chalk.client import ChalkClient
ChalkClient().query_bulk(
    input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

Plan a query without executing it.

Parameters

The features for which there are known values, mapped to those values. For example, {User.id: 1234}. Features can also be expressed as snakecased strings, e.g. {"user.id": 1234}

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'. If your query name matches a NamedQuery, the query will automatically pull outputs and options specified in the matching NamedQuery.

If query_name is specified, this specifies the version of the named query you're making. This is only useful if you want your query to use a NamedQuery with a specific name and a specific version. If a query_name has not been supplied, then this parameter is ignored.

meta: = None

Arbitrary key:value pairs to associate with a query.

If true, the plan will store the intermediate values at each stage in the plan

explain: = False

If true, the plan will emit additional output to assist with debugging.

The number of input rows that this plan will be run with. If unknown, specify None.

headers: = None

Additional headers to provide with the request

Returns

The query plan, including the resolver execution order and the resolver execution plan for each resolver.

from chalk.client import ChalkClient
result = ChalkClient().plan_query(
    input=[User.id],
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)
result.rendered_plan
result.output_schema

Check whether expected results of a query match Chalk query ouputs. This function should be used in integration tests. If you're using pytest, pytest.fail will be executed on an error. Otherwise, an AssertionError will be raised.

Parameters

A feature set or a mapping of {feature: value} of givens. All values will be encoded to the json representation.

A feature set or a mapping of {feature: value} of expected outputs. For values where you do not care about the result, use an ... for the feature value (i.e. when an error is expected).

A list of the features that you expect to be read from the online store, e.g.

cache_hits=[Actor.name, Actor.num_appearances]

A map from the expected feature name to the expected errors for that feature, e.g.

expected_feature_errors={
    User.id: [ChalkError(...), ChalkError(...)]
}
errors={
    "user.id": [ChalkError(...), ChalkError(...)]
}
query_errors:
Collection[ChalkError] | None
= None

A list of the expected query error.

now: = None

The time at which to evaluate the query. If not specified, the current time will be used. This parameter is complex in the context of online_query since the online store only stores the most recent value of an entity's features. If now is in the past, it is extremely likely that None will be returned for cache-only features.

This parameter is primarily provided to support:

  • controlling the time window for aggregations over cached has-many relationships
  • controlling the time window for aggregations over has-many relationships loaded from an external database

If you are trying to perform an exploratory analysis of past feature values, prefer offline_query.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'. If your query name matches a NamedQuery, the query will automatically pull outputs and options specified in the matching NamedQuery.

If query_name is specified, this specifies the version of the named query you're making. This is only useful if you want your query to use a NamedQuery with a specific name and a specific version. If a query_name has not been supplied, then this parameter is ignored.

encoding_options:
FeatureEncodingOptions | None
= None

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

show_table: = False

Print the feature value table even if no errors were found.

The relative tolerenance to allow for float equality. If you specify both float_rel_tolerance and float_abs_tolerance, the numbers will be considered equal if either tolerance is met. Equivalent to:

abs(a - b) <= float_rel_tolerance * max(abs(a), abs(b))

The absolute tolerenance to allow for float equality. If you specify both float_rel_tolerance and float_abs_tolerance, the numbers will be considered equal if either tolerance is met. Equivalent to:

abs(a - b) <= float_abs_tolerance
prefix: = True

Whether to show the prefix for feature names in the table.

Other Parameters
Show All
planner_options: = None
headers: = None
from chalk.client import ChalkClient
result = ChalkClient().check(
    input={Actor.id: "nm0000001"},
    assertions={Actor.num_movies: 40},
)
Chalk Feature Value Mismatch
┏━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Kind   ┃ Name                 ┃ Value     ┃
┡━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Expect │ actor.id             │ nm0000001 │
│ Actual │ actor.id             │ nm0000001 │
│ Expect │ actor.num_appearanc… │ 40│ Actual │ actor.num_appearanc… │ 41└────────┴──────────────────────┴───────────┘

Get a Chalk Dataset containing data from a previously created dataset.

If an offline query has been created with a dataset name, .get_dataset will return a Chalk Dataset. The Dataset wraps a lazily-loading Chalk DataFrame that enables us to analyze our data without loading all of it directly into memory. See Offline Queries for more information.

Parameters

The name of the Dataset to return. Previously, you must have supplied a dataset name upon an offline query. Dataset names are unique for each environment. If 'dataset_name' is provided, then 'job_id' should not be provided.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

dataset_id:
str | uuid.UUID | None
= None

A UUID returned in the Dataset object from an offline query. Dataset ids are unique for each environment. If 'dataset_id' is provided, then 'dataset_name' and 'revision_id' should not be provided.

revision_id:
str | uuid.UUID | None
= None

The unique id of the DatasetRevision to return. If a previously-created dataset did not have a name, you can look it up using its unique job id instead. If 'revision_id' is provided, then 'dataset_name' and 'dataset_id' should not be provided.

Other Parameters
Show All
job_id:
str | uuid.UUID | None
= None
Returns
type:

A Dataset that lazily loads your query data.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(timezone.utc)
X = ChalkClient().offline_query(
    input={
        User.id: uids,
    },
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset='my_dataset_name'
)

Some time later...

dataset = ChalkClient().get_dataset(
    dataset_name='my_dataset_name'
)
...

or

dataset = ChalkClient().get_dataset(
    job_id='00000000-0000-0000-0000-000000000000'
)
...

If memory allows:

df: pd.DataFrame = dataset.get_data_as_pandas()

Compute feature values from the offline store or by running offline/online resolvers. See Dataset for more information.

Parameters

The features for which there are known values. It can be a mapping of features to a list of values for each feature, or an existing DataFrame. Each element in the DataFrame or list of values represents an observation in line with the timestamp in input_times.

The time at which the given inputs should be observed for point-in-time correctness. If given a list of times, the list must match the length of the input lists. Each element of input_time corresponds with the feature values at the same index of the input lists. See Temporal Consistency for more information.

The features that you'd like to sample, if they exist. If an output feature was never computed for a sample (row) in the resulting DataFrame, its value will be None.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

A unique name that if provided will be used to generate and save a Dataset constructed from the list of features computed from the inputs.

If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

max_samples: = None

The maximum number of samples to include in the DataFrame. If not specified, all samples will be returned.

wait: = False

Whether to wait for job completion.

If True, progress bars will be shown while the query is running. Primarily intended for use in a Jupyter-like notebook environment. This flag will also be propagated to the methods of the resulting Dataset.

timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Used to control whether resolvers are allowed to run in order to compute feature values.

If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.

A list of features that will always be sampled, and thus always excluded from recompute. Should not overlap with any features used in "recompute_features" argument.

If specified, the query will only be run on data observed after this timestamp. Accepts strings in ISO 8601 format.

If specified, the query will only be run on data observed before this timestamp. Accepts strings in ISO 8601 format.

If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.

explain: = False
tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

A SQL query that will query your offline store and use the result as input. See Input for more information.

Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs. See ResourceRequests for more information.

Boots a kubernetes job to run the queries in their own pods, separate from the engine and branch servers. This is useful for large datasets and jobs that require a long time to run.

If True, the output of the query will be stored in the online store.

If True, the output of the query will be stored in the offline store.

num_shards: = None

If specified, the query will be run asynchronously, splitting the input across num_shards shards.

num_workers: = None

If specified, the query will be run asynchronously across a maximum num_workers pod workers at any time. This parameter is useful if you have a large number of shards and would like to limit the number of pods running at once.

Returns
type:

A Chalk Dataset.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
    },
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()

Triggers a resolver to run. See Triggered Runs for more information.

Parameters

The fully qualified name of the resolver to trigger.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, the resolver will only ingest data observed before this timestamp. Accepts strings in ISO 8601 format.

If specified, the resolver will only ingest data observed after this timestamp. Accepts strings in ISO 8601 format.

If True, the resolver run output will be stored in the online store.

If True, the resolver run output will be stored in the offline store.

timestamping_mode:
'feature_time' | 'online_store_write_time'
= 'feature_time'

If specified, the resolver run will be idempotent with respect to the key.

Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.

The ID of the resolver run.

status
ResolverRunStatus

The current status of the resolver run.

from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
    resolver_fqn="mymodule.fn"
)

Retrieves the status of a resolver run. See Triggered Runs for more information.

Parameters

ID of the resolver run to check.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.

The ID of the resolver run.

status
ResolverRunStatus

The current status of the resolver run.

from chalk.client import ChalkClient
ChalkClient().get_run_status(
    run_id="3",
)
ResolverRunResponse(
    id="3",
    status=ResolverRunStatus.SUCCEEDED
)

Checks the identity of your client.

Useful as a sanity test of your configuration.

Returns
type:
WhoAmIResponse

The identity of your client.

The ID of the user or service token making the query.

The environment under which the client's queries will be run, unless overridden

The team ID pertaining to the client

from chalk.client import ChalkClient
ChalkClient().whoami()
WhoAmIResponse(user="...", environment_id='...', team_id='...')

Targets feature observation values for deletion and performs deletion online and offline.

Parameters

The namespace in which the target features reside.

An optional list of the feature names of the features that should be deleted for the targeted primary keys. Not specifying this and not specifying the "tags" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "tags" parameter are mutually exclusive.

An optional list of tags that specify features that should be targeted for deletion. If a feature has a tag in this list, its observations for the primary keys you listed will be targeted for deletion. Not specifying this and not specifying the "features" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "features" parameter are mutually exclusive.

The primary keys of the observations that should be targeted for deletion.

Returns

Holds any errors (if any) that occurred during the drop request. Deletion of a feature may partially-succeed.

from chalk.client import ChalkClient
ChalkClient().delete_features(
    namespace="user",
    features=["name", "email", "age"],
    primary_keys=[1, 2, 3]
)

Performs a drop on features, which involves a deletes all their data (both online and offline). Once the feature is reset in this manner, its type can be changed.

Parameters

The namespace in which the target features reside.

A list of the feature names of the features that should be dropped.

Returns

Holds any errors (if any) that occurred during the drop request. Dropping a feature may partially-succeed.

from chalk.client import ChalkClient
ChalkClient().drop_features(
    namespace="user",
    features=["name", "email", "age"],
)

Upload data to Chalk for use in offline resolvers or to prime a cache.

Parameters

The features for which there are known values, mapped to those values.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment

query_name: = None

Optionally associate this upload with a query name. See .query for more information.

Other Parameters
Show All
branch: = ...
correlation_id: = None
meta: = None
Returns

The errors encountered from uploading features.

from chalk.client import ChalkClient
ChalkClient().upload_features(
    input={
        User.id: 1,
        User.name: "Katherine Johnson"
    }
)

Upload data to Chalk for use in offline resolvers or to prime a cache.

Parameters

One of three types:

  • A list of mappings, each of which includes the features for which there are known values mapped to those values. Each mapping can have different keys, but each mapping must have the same root features class.
  • A mapping where each feature key is mapped to a list of the values for that feature. You can consider this a mapping that describes columns (keys, i.e. features) and rows (the list of values in the map for each feature). Each list must be the same length.
  • A pandas, polars, or chalk.DataFrame.

The environment under which to run the upload. API tokens can be scoped to an environment. If no environment is specified in the upload, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment

Other Parameters
Show All
correlation_id: = None
meta: = None
Returns

The errors encountered from uploading features.

from chalk.client import ChalkClient
ChalkClient().multi_upload_features(
    input=[
        {
            User.id: 1,
            User.name: "Katherine Johnson"
        },
        {
            User.id: 2,
            User.name: "Eleanor Roosevelt"
        }
    ]
)

Get the most recent feature values from the offline store.

See Offline Queries for more information.

Parameters

The features that you'd like to sample, if they exist. If an output feature was never computed for a sample (row) in the resulting DataFrame, its value will be None.

max_samples: = None

The maximum number of rows to return.

dataset: = None

The Dataset name under which to save the output.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

Other Parameters
Show All
output_id: = False
output_ts: = False
branch: = None
Returns

A pandas.DataFrame with columns equal to the names of the features in output, and values representing the value of the most recent observation.

from chalk.client import ChalkClient
sample_df = ChalkClient().sample(
    output=[
        Account.id,
        Account.title,
        Account.user.full_name
    ],
    max_samples=10
)

Create a new branch based off of a deployment from the server. By default, uses the latest live deployment.

Parameters

The name of the new branch to create.

create_only: = False

If True, will raise an error if a branch with the given name already exists. If False and the branch exists, then that branch will be deployed to.

switch: = True

If True, will switch the client to the newly created branch. Defaults to True.

The specific deployment ID to use for the branch. If not specified, the latest live deployment on the server will be used. You can see which deployments are available by clicking on the 'Deployments' tab on the project page in the Chalk dashboard.

The environment under which to create the branch. API tokens can be scoped to an environment. If no environment is specified in the query, the environment will be taken from the client's cached token.

Returns

A response object containing metadata about the branch.

from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")

Lists the current branches for this environment.

Returns
type:

A list of the names of branches available on this environment.

from chalk.client import ChalkClient
ChalkClient().get_branches()
["testing", "feat/new-feature"]

Displays the current branch this client is pointed at.

If the current environment does not support branch deployments or no branch is set, this method returns None.

Returns
type:

The name of the current branch or None.

from chalk.client import ChalkClient
client = ChalkClient(branch="my-branch")
assert client.get_branch() == "my-branch"

Point the ChalkClient at the given branch. If branch_name is None, this points the client at the active non-branch deployment.

If the branch does not exist or if branch deployments are not enabled for the current environment, this method raises an error.

Parameters

The name of the branch to use, or None

from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
client.set_branch("my-new-branch")
client.set_branch(None)

Returns a BranchGraphSummary object that contains the state of the branch server: Which resolver/features are defined, and the history of live notebook updates on the server.

Parameters
branch:
BranchId | ...
= ...

The branch to query. If not specified, the branch is expected to be included in the constructor for ChalkClient.

Optionally override the environment under which to query the branch state.

Sets the incremental cursor for a resolver or scheduled query.

Parameters
resolver:
str | Resolver | None
= None

The resolver. Can be a function or the string name of a function. Exactly one of resolver and scheduled_query is required.

The name of the scheduled query. Exactly one of resolver and scheduled_query is required.

Set the maximum timestamp of the data ingested by the resolver.

Override the last execution timestamp of the resolver.

from chalk.client import ChalkClient
client = ChalkClient()
client.set_incremental_cursor(
    resolver="my_resolver",
    max_ingested_timestamp=datetime.now(),
)

Gets the incremental cursor for a resolver or scheduled query.

Parameters
resolver:
str | Resolver | None
= None

The resolver. Can be a function or the string name of a function. Exactly one of resolver and scheduled_query is required.

If updating incremental status of a resolver in the context of a scheduled query, the name of the scheduled query. Exactly one of resolver and scheduled_query is required.

Returns

An object containing the max_ingested_timestamp and incremental_timestamp.

from chalk.client import ChalkClient
client = ChalkClient()
client.get_incremental_cursor(resolver="my_resolver")

Tests a streaming resolver and its ability to parse and resolve messages. See Streams for more information.

Parameters
resolver:
str | Resolver

The streaming resolver or its string name.

The number of messages to digest from the stream source. As messages may not be incoming into the stream, this action may time out.

A filepath from which test messages will be ingested. This file should be newline delimited json as follows:

{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}

Each line may optionally contain a timezone string as a value to the key "message_timestamp".

Alternatively, keys can be supplied in code along with the "test_message_bodies" argument. Both arguments must be the same length.

message_bodies:
list[str | bytes | pydantic.BaseModel] | None
= None

Message bodies can be supplied in code as strings, bytes, or Pydantic models along with the "test_message_keys" argument. Both arguments must be the same length.

Optionally, timestamps can be provided for each message,

Other Parameters
Show All
branch:
BranchId | ...
= ...
Returns

A simple wrapper around a status and optional error message. Inspecting StreamResolverTestResponse.features will return the test results, if they exist. Otherwise, check StreamResolverTestResponse.errors and StreamResolverTestResponse.message for errors.

from chalk.streams import stream, KafkaSource
from chalk.client import ChalkClient
from chalk.features import Features, features
import pydantic
# This code is an example of a simple streaming feature setup. Define the source
stream_source=KafkaSource(...)
# Define the features
@features(etl_offline_to_online=True, max_staleness="7d")
class StreamingFeature:
    id: str
    user_id: str
    card_id: str
# Define the streaming message model
class StreamingMessage(pydantic.BaseModel):
    card_id: str
    user_id: str
# Define the mapping resolver
@stream(source=stream_source)
def our_stream_resolver(
    m: StreamingMessage,
) -> Features[StreamingFeature.id, StreamingFeature.card_id, StreamingFeature.user_id]:
   return StreamingFeature(
       id=f"{m.card_id}-{m.user_id}",
       card_id=m.card_id,
       user_id=m.user_id,
   )
# Once you have done a `chalk apply`, you can test the streaming resolver with custom messages as follows
client = ChalkClient()
keys = ["my_key"] * 10
messages = [StreamingMessage(card_id="1", user_id=str(i)).json() for i in range(10)]
resp = client.test_streaming_resolver(
    resolver="our_stream_resolver",
    message_keys=keys,
    message_bodies=messages,
)
print(resp.features)
Attributes

The name of the feature requested, e.g. 'user.identity.has_voip_phone'.

The value of the requested feature. If an error was encountered in resolving this feature, this field will be empty.

The primary key of the resolved feature.

The error code encountered in resolving this feature. If no error occurred, this field is empty.

Whether the feature was resolved successfully.

The time at which this feature was computed. This value could be significantly in the past if you're using caching.

meta
FeatureResolutionMeta | None

Detailed information about how this feature was computed.

The ChalkError describes an error from running a resolver or from a feature that can't be validated.

Attributes

The type of the error.

The category of the error, given in the type field for the error codes. This will be one of "REQUEST", "NETWORK", and "FIELD".

A readable description of the error message.

A human-readable hint that can be used to identify the entity that this error is associated with.

If provided, can be used to add additional context to 'display_primary_key'.

The exception that caused the failure, if applicable.

The fully qualified name of the failing feature, e.g. user.identity.has_voip_phone.

The fully qualified name of the failing resolver, e.g. my.project.get_fraud_score.

Functions

Returns True if the error indicates an issue with user's resolver, rather than an internal Chalk failure.

Class wrapper around revisions for Datasets.

Attributes
revision_id
uuid.UUID

UUID for the revision job.

UUID for the creator of the job.

Output features for the dataset revision.

Location of the givens stored for the dataset.

Status of the revision job.

Filters performed on the dataset.

Number of partitions for revision job.

Location of the outputs stored fo the dataset.

Storage version of the outputs.

Number of bytes of the output, updated upon success.

Timestamp for creation of revision job.

Timestamp for start of revision job.

Timestamp for end of revision job.

Name of revision, if given.

dataset_id
uuid.UUID | None

ID of revision, if name is given.

url linking to relevant dashboard page

Number of computers this query ran on.

Name of branch

partitions
list[DatasetPartition]

url linking to relevant dashboard page

Functions

Loads a pl.DataFrame containing the output. Use .to_polars_lazyframe() if you want a LazyFrame instead, which allows local filtering of datasets that are larger than memory.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pl.LazyFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pl.LazyFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A polars.DataFrame materializing query output data.

Loads a pl.LazyFrame containing the output. This method is appropriate for working with larger-than-memory datasets. Use .to_polars() if you want a DataFrame instead.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pl.LazyFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pl.LazyFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pl.LazyFrame materializing query output data.

Loads a pl.LazyFrame containing the output.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pl.LazyFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pl.LazyFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pl.LazyFrame materializing query output data.

Loads a pd.DataFrame containing the output.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pd.DataFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pd.DataFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pd.DataFrame materializing query output data.

Loads a Chalk DataFrame containing the output.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting DataFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting DataFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data show_progress Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns
type:

A DataFrame materializing query output data.

Returns a list of the output uris for the revision. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Returns an object that loads the summary statistics of a dataset revision. The dataframe can be retrieved by calling to_polars() or to_pandas() on the return object. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Returns an object that loads a preview of a dataset revision. The dataframe can be retrieved by calling to_polars() or to_pandas() on the return object. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Waits for an offline query job to complete. Raises if the query is unsuccessful, otherwise returns itself on success.

Parameters
timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Whether to show a progress bar

Downloads output files pertaining to the revision to given path.

Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.

Parameters

A directory where the Parquet files from the dataset will be downloaded.

output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting DataFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting DataFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

executor:
ThreadPoolExecutor | None
= None

The executor to use for parallelizing downloads. If None, the default executor will be used.

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Loads a pl.LazyFrame containing the inputs.

Parameters

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pl.LazyFrame materializing query input data.

Returns and opens a url that opens the offline query page in the Chalk dashboard. Must be logged in.

Parameters

If True, does not open url in browser. Default is False.

Returns
type:

A url redirecting to the Chalk dashboard.

Waits for the revision job to complete.

ChalkClient.offline_query returns a DatasetRevision instance immediately after submitting the revision job. This method can be used to wait for the revision job to complete.

Once the revision job is complete, the status attribute of the DatasetRevision instance will be updated to reflect the status of the revision job.

If the revision job was successful, you can then use methods such as get_data_as_pandas() without having to wait for the revision job to complete.

Parameters

Whether to show a progress bar

timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Saves this revision to Chalk's online and offline storage.

Parameters

Whether to store the revision in Chalk's online storage.

Whether to store the revision in Chalk's offline storage.

Downloads the resolver replay data for the given resolver in the revision, provided the revision had store_plan_stages enabled.

The replay data is functionally similar to viewing the intermediate results on the plan explorer.

If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.

Parameters

The resolver to download the replay data for, or its fqn.

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Dataset
Class

Wrapper around Offline Query results.

Datasets are obtained by invoking ChalkClient.offline_query(). Dataset instances store important metadata and enable the retrieval of offline query outputs.

Examples

from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset: Dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
    },
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
df.recompute(features=[User.fraud_score], branch="feature/testing")
Attributes

Whether the most recent DatasetRevision is finished or still pending.

Storage version number of outputs.

A list of all DatasetRevision instances belonging to this dataset.

The unique name for this dataset, if given.

dataset_id
uuid.UUID | None

The unique UUID for this dataset.

A list of errors in loading the dataset, if they exist.

Functions

Loads a pl.DataFrame containing the output. Use .to_polars_lazyframe() if you want a LazyFrame instead, which allows local filtering of datasets that are larger than memory.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
ignore_errors: = False
show_progress: = True
timeout:
float | timedelta | None | ...
= ...
Returns

A pl.DataFrame materializing query output data.

Loads a pl.LazyFrame containing the output. This method is appropriate for working with larger-than-memory datasets. Use .to_polars() if you want a DataFrame instead.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
ignore_errors: = False
show_progress: = True
timeout:
float | timedelta | None | ...
= ...
Returns

A pl.LazyFrame materializing query output data.

Loads a pl.LazyFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
ignore_errors: = False
show_progress: = True
timeout:
float | timedelta | None | ...
= ...
Returns

A pl.LazyFrame materializing query output data.

Loads a pd.DataFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
ignore_errors: = False
show_progress: = True
timeout:
float | timedelta | None | ...
= ...
Returns

A pd.DataFrame materializing query output data.

Loads a Chalk DataFrame containing the output. Requires the pertinent Chalk features to be accessible via import

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
ignore_errors: = False
show_progress: = True
timeout:
float | timedelta | None | ...
= ...
Returns
type:

A DataFrame materializing query output data.

Loads a pd.DataFrame containing the output of the most recent revision.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pd.DataFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pd.DataFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pd.DataFrame materializing query output data.

Returns a list of the output uris for the revision. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Parameters
output_id: = False

Whether to return the primary key feature in a column named "__chalk__.__id__" in the resulting pd.DataFrame.

output_ts: = False

Whether to return the input-time feature in a column named "__chalk__.CHALK_TS" in the resulting pd.DataFrame. If set to a non-empty str, used as the input-time column name.

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Waits for an offline query job to complete. Returns a list of errors if unsuccessful, or None if successful.

Parameters
timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Whether to show a progress bar

Downloads output files pertaining to the revision to the given path.

Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.

Parameters

A directory where the Parquet files from the dataset will be downloaded.

executor:
ThreadPoolExecutor | None
= None

An executor to use to download the data in parallel. If not specified, the default executor will be used.

Whether to ignore query errors upon fetching data.

Whether to show a progress bar.

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

from chalk.client import ChalkClient, Dataset
from datetime import datetime, timezone
uids = [1, 2, 3, 4]
at = datetime.now(tz=timezone.utc)
dataset = ChalkClient().offline_query(
    input={User.id: uids},
    input_times=[at] * len(uids),
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset',
)
dataset.download_data('my_directory')

Returns an object that loads the summary statistics of a dataset revision. The dataframe can be retrieved by calling to_polars() or to_pandas() on the return object. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Returns an object that loads a preview of a dataset revision. The dataframe can be retrieved by calling to_polars() or to_pandas() on the return object. Data will be stored in .Parquet format. The URIs should be considered temporary, and will expire after a server-defined time period.

Loads a pl.LazyFrame containing the inputs that were used to create the dataset.

Parameters

Whether to ignore query errors upon fetching data

Whether to show a progress bar

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Returns

A pl.LazyFrame materializing query input data.

Returns and opens a url that opens the offline query page in the Chalk dashboard. Must be logged in.

Parameters

If True, does not open url in browser. Default is False.

Returns
type:

A url redirecting to the Chalk dashboard.

Creates a new revision of this Dataset by recomputing the specified features.

Carries out the new computation on the branch specified when constructing the client.

Parameters

A list of specific features to recompute. Features that don't exist in the dataset will be added. Features that already exist in the dataset will be recomputed. If not provided, all the existing features in the dataset will be recomputed.

branch: = None

If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.

wait: = True

If True, progress bars will be shown while recomputation is running. This flag will also be propogated to the methods of the resulting Dataset.

If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

explain: = False
tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, all required_resolver_tags must be present on a resolver for it to be considered eligible to execute. See Tags for more information.

Boots a kubernetes job to run the queries in their own pods, separate from the engine and branch servers. This is useful for large datasets and jobs that require a long time to run. This must be specified as True to run this job asynchronously, even if the previous revision was run asynchronously.

timeout:
float | timedelta | None | ...
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

Raises
error:

If no branch was provided to the Chalk Client.

from chalk.client import ChalkClient
dataset = ChalkClient(branch="data_science").offline_query(...)
df = dataset.get_data_as_polars()
# make changes to resolvers in your project
dataset.recompute()
new_df = dataset.get_data_as_polars() # receive newly computed data

Saves the latest revision of this dataset to Chalk's online and offline storage.

Parameters

Whether to store the revision in Chalk's online storage.

Whether to store the revision in Chalk's offline storage.

Downloads the resolver replay data for the given resolver in the latest revision of the dataset.

The replay data is functionally similar to viewing the intermediate results on the plan explorer.

If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.

Parameters

The resolver to download the replay data for, or its fqn.

Whether to show progress bars

timeout:
float | timedelta | ... | None
= ...

How long to wait, in seconds, for job completion before raising a TimeoutError. Jobs will continue to run in the background if they take longer than this timeout. For no timeout, set to None. If no timeout is specified, the client's default timeout is used.

The category of an error.

For more detailed error information, see ErrorCode

Values

Request errors are raised before execution of your resolver code. They may occur due to invalid feature names in the input or a request that cannot be satisfied by the resolvers you have defined.

Field errors are raised while running a feature resolver for a particular field. For this type of error, you'll find a feature and resolver attribute in the error type. When a feature resolver crashes, you will receive null value in the response. To differentiate from a resolver returning a null value and a failure in the resolver, you need to check the error schema.

Network errors are thrown outside your resolvers. For example, your request was unauthenticated, connection failed, or an error occurred within Chalk.

The detailed error code.

For a simpler category of error, see ErrorCodeCategory.

Values

The query contained features that do not exist.

A resolver was required as part of running the dependency graph that could not be found.

The query is invalid. All supplied features need to be rooted in the same top-level entity.

A feature value did not match the expected schema (e.g. incompatible type "int"; expected "str")

The resolver for a feature errored.

The resolver for a feature timed out.

A crash in a resolver that was to produce an input for the resolver crashed, and so the resolver could not run crashed, and so the resolver could not run.

The request was submitted with an invalid authentication header.

The supplied credentials do not provide the right authorization to execute the request.

An unspecified error occurred.

The operation was cancelled, typically by the caller.

The deadline expired before the operation could complete.

Information about an exception from a resolver run.

Attributes

The name of the class of the exception.

The message taken from the exception.

The stacktrace produced by the code.

The stacktrace produced by the code, full detail.

Override resource requests for processes with isolated resources, e.g., offline queries and cron jobs. Note that making these too large could prevent your job from being scheduled, so please test before using these in a recurring pipeline.

Attributes

CPU requests: Increasing this will make some Chalk operations that are parallel and CPU-bound faster. Default unit is physical CPU cores, i.e. "8" means 8 CPU cores, "0.5" means half of a CPU core. An alternative unit is "millicore", which is one-thousandth of a CPU core, i.e. 500m is half of a CPU core.

Memory requests: you can use these to give your pod more memory, i.e. to prevent especially large jobs from OOMing. Default unit is bytes, i.e. 1000000000 is 1 gigabyte of memory. You can also specify a suffix such as K, M, or G for kilobytes, megabytes, and gigabytes, respectively. It's also possible to use the power of two equivalents, such as Ki, Mi, and Gi.

Chalk can use this for spilling intermediate state of some large computations, i.e. joins, aggregations, and sorting. Default unit is bytes, i.e. 1000000000 is 1 gigabyte of memory. You can also specify a suffix such as K, M, or G for kilobytes, megabytes, and gigabytes, respectively. It's also possible to use the power of two equivalents, such as Ki, Mi, and Gi.

Ephemeral storage for miscellaneous file system access. Should probably not be below 1Gi to ensure there's enough space for the Docker image, etc. Should also not be too high or else the pod will not be scheduled.

Resource group to use for this job. If not specified, the default resource group will be used.

Functions
Attributes

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

Functions

Convenience method for accessing feature result from the data response.

Parameters

The feature or its string representation.

Returns

The FeatureResult for the feature, if it exists.

from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature(User.name).ts
datetime.datetime(2023, 2, 5, 23, 25, 26, 427605)
data.get_feature("user.name").meta.cache_hit
False

Convenience method for accessing feature values from the data response.

Parameters

The feature or its string representation.

Returns
type:

The value of the feature.

from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature_value(User.name)
"Katherine Johnson"
data.get_feature_value("user.name")
"Katherine Johnson"

Converts the output features to a dictionary. Errors are not included in the dictionary.

Returns
type:

A dictionary of the output features.

from chalk.client import ChalkClient
result = ChalkClient().query(...)
result.to_dict()
{
    "user.name": "Katherine Johnson",
    "user.email": "katherine@nasa.com"
}
Functions

Create a named query.

Named queries are aliases for specific queries that can be used by API clients.

Parameters

A name for the named query—this can be versioned with the version parameter, but must otherwise be unique. The name of the named query shows up in the dashboard and is used to specify the outputs for a query.

version: = None

A string specifying the version of the named query: version is not required, but if specified it must be a valid "semantic version".

The features which will be provided by callers of this query. For example, [User.id]. Features can also be expressed as snakecased strings, e.g. ["user.id"].

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

If an empty sequence, the output will be set to all features on the namespace of the query. For example, if you pass as input {"user.id": 1234}, then the query is defined on the User namespace, and all features on the User namespace (excluding has-one and has-many relationships) will be used as outputs.

tags: = None

Allows selecting resolvers with these tags.

description: = None

A description of the query. Rendered in the Chalk UI and used for search indexing.

owner: = None

The owner of the query. This should be a Slack username or email address. This is used to notify the owner in case of incidents

meta: = None

Additional metadata for the query.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

Dictionary of additional options to pass to the Chalk query engine. Values may be provided as part of conversations with Chalk support to enable or disable specific functionality.

from chalk import NamedQuery
# this query's name and version can be used to specify query outputs in an API request.
NamedQuery(
    name="fraud_model",
    version="1.0.0",
    input=[User.id],
    output=[User.age, User.fraud_score, User.credit_report.fico],
)
Attributes

The environment under which to run the resolvers. API tokens can be scoped to an # environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

Context in which to execute a query.

Attributes

The environment under which to run the resolvers. API tokens can be scoped to an # environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

The tags used to scope the resolvers. More information at Tags

Raised when constructing a ChalkClient without valid credentials.

When this exception is raised, no explicit client_id and client_secret were provided, there was no ~/.chalk.yml file with applicable credentials, and the environment variables CHALK_CLIENT_ID and CHALK_CLIENT_SECRET were not set.

You may need to run chalk login from your command line, or check that your working directory is set to the root of your project.

Duration
Type Alias
str | timedelta | Literal['infinity' | 'all']

Duration is used to describe time periods in natural language. To specify using natural language, write the count of the unit you would like, followed by the representation of the unit.

Chalk supports the following units:

Signifier Meaning
w Weeks
d Days
h Hours
m Minutes
s Seconds
ms Milliseconds

As well as the special keywords "infinity" and "all".

Examples:

Signifier Meaning
"10h" 10 hours
"1w 2m" 1 week and 2 minutes
"1h 10m 2s" 1 hour, 10 minutes, and 2 seconds
"infinity" Unbounded time duration
CronTab
Type Alias

A schedule defined using the Unix-cron string format (* * * * *). Values are given in the order below:

Field Values
Minute 0-59
Hour 0-23
Day of Month 1-31
Month 1-12
Day of Week 0-6

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environments can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

See more at Environments

Tags
Type Alias

Tags allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

Like Environments, tags control when resolvers run based on the Online Context or Training Context matching the tags provided to the resolver decorator. Resolvers optionally take a keyword argument named tags that can take one of three types:

  • None (default) - The resolver will be a candidate to run for every set of tags.
  • str - The resolver will run only if this tag is provided.
  • list[str] - The resolver will run in all of the specified tags match.

See more at Tags

Get the owner for a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The owner for a feature or feature class, if it exists. Note that the owner of a feature could be inherited from the feature class.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

@features(owner="ship")
class RocketShip:
    id: int
    software_version: str
owner(RocketShip.software_version)
'ship'

Get the tags for a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The tags for a feature, feature class, or resolver, if it exists. Note that the tags of a feature could be inherited from the feature class.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

Feature tags

@features(tags="group:risk")
class User:
    id: str
    # :tags: pii
    email: str
tags(User.id)
['group:risk']

Feature class tags

tags(User)
['group:risk']

Feature + feature class tags

tags(User.email)
['pii', 'group:risk']

Get the description of a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The description for a feature, feature class, or resolver, if it exists.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

@features
class RocketShip:
    # Comments above a feature become
    # descriptions for the feature!
    software_version: str
description(RocketShip.software_version)
'Comments above a feature become descriptions for the feature!'

Determine whether a feature is a primary key.

Parameters

A feature (i.e. User.email)

Returns
type:

True if f is primary and False otherwise.

Raises
error:

If f is not a feature.

from chalk.features import features
from chalk import Primary
@features
class User:
    uid: Primary[int]
    email: str
assert is_primary(User.uid)
assert not is_primary(User.email)

Determine whether a feature is a feature time. See Time for more details on FeatureTime.

Parameters

A feature (i.e. User.ts)

Returns
type:

True if the feature is a FeatureTime and False otherwise.

from chalk.features import features
@features
class User:
    id: str
    updated_at: datetime = feature_time()
assert is_feature_time(User.updated_at) is True
assert is_feature_time(User.id) is False
AnyDataclass
Type Alias

Any class decorated by @dataclass.

There isn't a base class for dataclass, so we use this TypeAlias to refer to indicate any class decorated with @dataclass.

The base type for Chalk exceptions.

This exception makes error handling easier, as you can look only for this exception class.

Attributes

A message describing the specific type of exception raised.

A message that describes the specific type of exception raised and contains the readable representation of each error in the errors attribute.

Also includes the trace ID if one is available.

Use chalk.functions to apply common conversions to your features.

Return the first non-null entry

Parameters
vals: = ()

Expressions to coalesce. They can be a combination of underscores and literals, though types must be compatible (ie do not coalesce int and string).

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   nickname: str | None
   name: str | None
   name_or_nickname: str = F.coalesce(_.name, _.nickname, "")

Check if a value is null.

Parameters

The value to check if it is null.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   nickname: str | None
   missing_nickname: bool = F.is_null(_.nickname)

Create a conditional expression, roughly equivalent to

if condition:
    return if_true
else:
    return if_false

Unlike a Python if/else, all three inputs (condition, if_true, if_false) are evaluated in parallel for all rows, and then the correct side is selected based on the result of the condition expression.

from chalk import _
from chalk.features import features
@features
class Transaction:
   id: int
   amount: int
   risk_score: bool = _.if_then_else(
       _.amount > 10_000,
       _.amount * 0.1,
       _.amount * 0.05,
   )

Map a key to a value in a dictionary.

Parameters

The dictionary to map the key to a value in.

The key to look up in the dictionary.

The default value to return if the key is not found in the dictionary.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: int
   merchant: str
   merchant_risk_score: float = F.map_dict(
       {"Amazon": 0.1, "Walmart": 0.08},
       _.merchant,
       default=0.,
   )

Build a conditional expression.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   age: float
   age_group: str = (
       F.when(_.age < 1)
        .then("baby")
        .when(_.age < 3)
        .then("toddler")
        .when(_.age < 13)
        .then("child")
        .when(_.age < 18)
        .then("teen")
        .otherwise(F.cast(F.cast(F.floor(_.age / 10), int), str) + "0s")
    )

Evaluates if the string ends with the specified suffix.

Parameters

The string to check against the suffix.

The suffix or feature to check if the string ends with.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   category: str
   is_food: bool = F.ends_with(_.name, "Food")

Compute the Levenshtein distance between two strings.

Parameters

The first string.

The second string.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   email: str
   name_email_sim: int = F.levenshtein_distance(_.name, _.email)

Evaluates if the string matches the pattern.

Patterns can contain regular characters as well as wildcards. Wildcard characters can be escaped using the single character specified for the escape parameter. Matching is case-sensitive.

Note: The wildcard % represents 0, 1 or multiple characters and the wildcard _ represents exactly one character.

For example, the pattern John% will match any string that starts with John, such as John, JohnDoe, JohnSmith, etc.

The pattern John_ will match any string that starts with John and is followed by exactly one character, such as JohnD, JohnS, etc. but not John, JohnDoe, JohnSmith, etc.

Parameters

The string to check against the pattern.

The pattern to check the string against.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   is_john: bool = F.like(_.name, "John%")

Convert a string to lowercase.

Parameters

The string to convert to lowercase.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   normalized: str = F.trim(F.lower(_.name))

Finds the first occurrence of the regular expression pattern in the string and returns the capturing group number group.

Parameters

The string to check against the pattern.

The regular expression pattern to check the string against.

The number of the capturing group to extract from the string.

import chalk.functions as F
from chalk.features import _, features
@features
class HiddenNumber:
   id: str
   hidden_number: str = "O0OOO",
   number: str = F.regexp_extract(_.time,  r"([0-9]+)", 1)

Finds all occurrences of the regular expression pattern in string and returns the capturing group number group.

Parameters

The string to check against the pattern.

The regular expression pattern to check the string against.

The number of the capturing group to extract from the string.

import chalk.functions as F
from chalk.features import _, features
@features
class Time:
   id: str
   time: str = "1y 342d 20h 60m 6s",
   processed_time: list[str] = F.regexp_extract_all(_.time, "([0-9]+)([ydhms])", 2)

Evaluates the regular expression pattern and determines if it is contained within string.

This function is similar to the like function, except that the pattern only needs to be contained within string, rather than needing to match all the string. In other words, this performs a contains operation rather than a match operation. You can match the entire string by anchoring the pattern using ^ and $.

Parameters

The string to check against the pattern.

The regular expression pattern to check the string against.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   is_john: bool = F.regexp_like(_.name, "^John.*$")

Replace all occurrences of a substring in a string with another substring.

Parameters

The string to replace the substring in.

The substring to replace.

The substring to replace the old substring with.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   normalized_name: str = F.replace(_.name, " ", "_")

Reverse the order of a string.

Parameters

The string to reverse.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   reversed_name: str = F.reverse(_.name)

Splits string by delimiter and returns the index'th element (0-indexed). If the index is larger than the number of fields, returns None.

Parameters

The string to split.

The delimiter to split the string on.

The index of the the split to return.

import chalk.functions as F
from chalk.features import _, features
@features
class CSVRow:
   id: str
   data: str
   first_element: str = F.split_part(_.data, delimiter=",", index=0)

Evaluates if the string starts with the specified prefix.

Parameters

The string to check against the prefix.

The prefix or feature to check if the string starts with.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   category: str
   is_food: bool = F.starts_with(_.name, "Food")

Extract a substring from a string.

Parameters

The string to extract the substring from.

The starting index of the substring (0-indexed).

length: = None

The length of the substring. If None, the substring will extend to the end of the string.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   category: str
   cat_first_three: str = F.substr(_.category, 0, 3)

Remove leading and trailing whitespace from a string.

Parameters

The string to trim.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   trimmed_name: str = F.trim(_.name)

Convert a string to uppercase.

Parameters

The string to convert to uppercase.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   category: str
   normalized: str = F.trim(F.upper(_.category))

Convert bytes to a string using the specified encoding.

Parameters

A bytes feature to convert to a string.

encoding:
'utf-8' | 'hex' | 'base64'

The encoding to use when converting the bytes to a string.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   hashed_name: bytes
   decoded_name: str = F.bytes_to_string(_.hashed_name, encoding="utf-8")

Decompress a GZIP-compressed bytes feature.

Parameters

The GZIP-compressed bytes feature to decompress.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   compressed_data: bytes
   decompressed_data: bytes = F.gunzip(_.compressed_data)

Extract a scalar from a JSON feature using a JSONPath expression. The value of the referenced path must be a JSON scalar (boolean, number, string).

Parameters

The JSON feature to query.

The JSONPath-like expression to extract the scalar from the JSON feature.

import chalk.functions as F
from chalk import JSON
from chalk.features import _, features
@features
class User:
   id: str
   profile: JSON
   favorite_color: str = F.json_value(_.raw, "$.prefs.color")

Compute the MD5 hash of some bytes.

Parameters

A bytes feature to hash.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   bytes_feature: bytes
   md5_bytes: bytes = F.md5(_.bytes_feature)

Convert a string to bytes using the specified encoding.

Parameters

An underscore expression for a feature to a string feature that should be converted to bytes.

encoding:
'utf-8' | 'hex' | 'base64'

The encoding to use when converting the string to bytes.

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   name: str
   hashed_name: bytes = F.string_to_bytes(_.name, encoding="utf-8")

Compute the absolute value of a number.

Parameters

The number to compute the absolute value of.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   amount: float
   amount_abs: float = F.abs(_.amount)

Compute the arcsine of an angle in radians.

Parameters

The angle in radians.

import chalk.functions as F
from chalk.features import _, features
@features
class Triangle:
   id: str
   sin_angle: float
   angle: float = F.asin(_.sin_angle)

Compute the ceiling of a number.

Parameters

The number to compute the ceiling of.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   amount: float
   amount_ceil: float = F.ceil(_.amount)

Compute the cosine of an angle in radians.

Parameters

The angle in radians.

import chalk.functions as F
from chalk.features import _, features
@features
class Triangle:
   id: str
   angle: float
   cos_angle: float = F.cos(_.angle)

Returns Euler’s number raised to the power of x.

Parameters

The exponent to raise Euler's number to.

import chalk.functions as F
from chalk.features import _, features
@features
class Triangle:
   id: str
   x: float
   e_to_x: float = F.exp(_.x)

Compute the floor of a number.

Parameters

The number to compute the floor of.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   amount: float
   amount_floor: float = F.floor(_.amount)

Compute the haversine distance (in kilometers) between two points on the Earth.

Parameters

The latitude of the first point.

The longitude of the first point.

The latitude of the second point.

The longitude of the second point.

import chalk.functions as F
from chalk.features import _, features
@features
class Location:
   id: str
   lat1: float
   lon1: float
   lat2: float
   lon2: float
   distance: float = F.haversine(_.lat1, _.lon1, _.lat2, _.lon2)

Compute the natural logarithm of a number.

Parameters

The number to compute the natural logarithm of.

import chalk.functions as F
from chalk.features import _, features
@features
class Triangle:
   id: str
   hypotenuse: float
   log_hypotenuse: float = F.ln(_.hypotenuse)

Compute the remainder of a division.

Parameters

The dividend.

The divisor.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: datetime
   day_of_week_monday: int = F.day_of_week(_.date)
   day_of_week_sunday: int = F.mod(_.day_of_week_monday, 7) + 1

Raise a to the power of b. Alias for a ** b.

Parameters

The base.

The exponent.

import chalk.functions as F
from chalk.features import _, features
@features
class Merchant:
   id: str
   amount_std: float
   amount_var: float = F.power(_.amount_std, 2)

Compute the sigmoid of a number.

Parameters

The number to compute the sigmoid of.

import chalk.functions as F
from chalk.features import _, features
@features
class Sigmoid:
   id: str
   x: float
   sigmoid_of_x: float = F.sigmoid(_.x)

Compute the sine of an angle in radians.

Parameters

The angle in radians.

import chalk.functions as F
from chalk.features import _, features
@features
class Triangle:
   id: str
   angle: float
   sin_angle: float = F.sin(_.angle)

Compute the square root of a number.

Parameters

The number to compute the square root of.

import chalk.functions as F
from chalk.features import _, features
@features
class Merchant:
   id: str
   amount_var: float
   amount_std: float = F.sqrt(_.amount_var)

Extract the day of the month from a date.

The supported types for x are date and datetime.

Ranges from 1 to 31 inclusive.

Parameters

The date to extract the day of the month from.

from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction
   id: str
   date: date
   day: int = F.day_of_month(_.date)

Returns the ISO day of the week from x. The value ranges from 1 (start_of_week, default MONDAY) to 7 (start_of_week + 6, default SUNDAY).

Parameters

The date to extract the day of the week from.

start_of_week: = DayOfWeek.MONDAY

The day of the week that the week starts on. Defaults to Monday.

from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction
   id: str
   date: date
   day: int = F.day_of_week(_.date)

Extract the day of the year from a date.

The value ranges from 1 to 366.

Parameters

The date to extract the day of the year from.

from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: date
   day: int = F.day_of_year(_.date)

Extract the hour of the day from a datetime.

The value ranges from 0 to 23.

Parameters

The datetime to extract the hour of the day from.

from datetime import datetime
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: datetime
   hour: int = F.hour_of_day(_.date)

Extract the month of the year from a date.

The value ranges from 1 to 12.

Parameters

The date to extract the month of the year from.

from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: date
   month: int = F.month_of_year(_.date)

Compute the total number of seconds covered in a duration.

Parameters

The duration to convert to seconds.

from datetime import date
from chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   signup: date
   last_login: date
   signup_to_last_login_days: float = F.total_seconds(_.las_login - _.signup) / (60 * 60 * 24)

Extract the number of milliseconds since the Unix epoch. Returned as a float.

Parameters

The datetime to extract the number of milliseconds since the Unix epoch from.

from datetime import datetime
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: datetime
   unix_milliseconds: float = F.unix_milliseconds(_.date)

Extract the number of seconds since the Unix epoch. Returned as a float.

Parameters

The datetime to extract the number of seconds since the Unix epoch from.

from datetime import datetime
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: datetime
   unix_seconds: float = F.unix_seconds(_.date)

Extract the week of the year from a date.

The value ranges from 1 to 53.

Parameters

The date to extract the week of the year from.

from datetime import date
import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   date: date
   week: int = F.week_of_year(_.date)

Extract the host from a URL.

For example, the host of https://www.google.com/cats is www.google.com.

import chalk.functions as F
from chalk.features import _, features
@features
class Company:
    id: int
    website: str
    host: str = F.url_extract_host(_.website)

Extract the path from a URL.

For example, the host of https://www.google.com/cats is /cats.

import chalk.functions as F
from chalk.features import _, features
@features
class Company:
    id: int
    website: str
    path: str = F.url_extract_path(_.website)

Extract the protocol from a URL.

For example, the protocol of https://www.google.com/cats is https.

Parameters

The URL to extract the protocol from.

import chalk.functions as F
from chalk.features import _, features
@features
class Company:
    id: int
    website: str
    protocol: str = F.url_extract_protocol(_.website)

Extract a single-column DataFrame into a list of values for that column.

Parameters

The expression to extract into a list.

from datetime import datetime
import chalk.functions as F
from chalk import DataFrame
from chalk.features import _, features
@features
class Merchant:
    id: str
    events: "DataFrame[FraudEvent]"
    fraud_codes: list[str] = F.array_agg(_.events[_.is_fraud == True, _.tag])
@features
class FraudEvent:
    id: int
    tag: str
    is_fraud: bool
    mer_id: Merchant.id

Returns the first n items from a dataframe or has-many

Parameters

the has-many from which the first n items are taken

how many items to take

from datetime import datetime
import chalk.functions as F
from chalk import windowed, DataFrame, Windowed
from chalk.features import _, features, Primary
@features
class Merchant:
    id: str
@features
class ConfirmedFraud:
    id: int
    trn_dt: datetime
    is_fraud: int
    mer_id: Merchant.id
@features
class MerchantFraud:
    mer_id: Primary[Merchant.id]
    merchant: Merchant
    confirmed_fraud: DataFrame[ConfirmedFraud] = dataframe(
        lambda: ConfirmedFraud.mer_id == MerchantFraud.mer_id,
    )
    first_five_merchant_window_fraud: Windowed[list[int]] = windowed(
        "1d",
        "30d",
        expression=F.head(_.confirmed_fraud[_.trn_dt > _.chalk_window, _.id, _.is_fraud == 1], 5)
    )

Returns a subset of the original array

Parameters

The array to slice

Starting index of the slice (0-indexed). If negative, slice starts from the end of the array

The length of the slice.

from datetime import datetime
import chalk.functions as F
from chalk.features import _, features
@features
class Wordle:
   id: str
   words: list[str] = ["crane", "kayak", "plots", "fight", "exact", "zebra", "hello", "world"]
   three_most_recent_words: list[str] = F.slice(_.words, -3, 3) # computes ["zebra", "hello", "world"]

Returns whether the array contains the value.

Parameters

The array to check for the value.

The value to check for in the array.

import chalk.functions as F
from chalk.features import _, features
@features
class APIRequest:
   id: str
   headers: list[str]
   has_user_agent: bool = F.contains(_.headers, "User-Agent")

Cast an expression to a different type.

Parameters

The expression to cast.

The type to cast the expression to.

import chalk.functions as F
from chalk.features import _, features
@features
class Transaction:
   id: str
   user_id: "User.id"
   merchant_id: "Merchant.id"
   user_merchant_id: "UserMerchant.id" = (
       F.cast(_.user_id, str) + "_" +
       F.cast(_.merchant_id, str)
   )

Runs a sagemaker prediction on the specified endpoint, passing in the serialized bytes as a feature.

Parameters

Bytes feature to be passed as the serialized input to the sagemaker endpoint.

The name of the sagemaker endpoint.

The content type of the input data. If not specified, the content type will be inferred from the endpoint.

An optional argument which specifies the target model for the prediction. This should only be used for multimodel sagemaker endpoints.

An optional argument which specifies the target variant for the prediction. This should only be used for multi variant sagemaker endpoints.

An optional argument which specifies the AWS access key ID to use for the prediction.

An optional argument which specifies the AWS secret access key to use for the prediction.

An optional argument which specifies the AWS session token to use for the prediction.

An optional argument which specifies the AWS region to use for the prediction.

An optional argument which specifies the AWS profile name to use for the prediction

import chalk.functions as F
from chalk.features import _, features
@features
class User:
   id: str
   encoded_sagemaker_data: bytes
   prediction: float = F.sagemaker_predict(
       _.encoded_sagemaker_data,
       endpoint="prediction-model_1.0.1_2024-09-16",
       target_model="model_v2.tar.gz",
       target_variant="blue"
   )
Functions

Create an offline query which runs on a schedule.

Scheduled queries do not produce datasets, but persist their results in the online and/or offline feature stores.

By default, scheduled queries use incrementalization to only ingest data that has been updated since the last run.

Parameters

A unique name for the scheduled query. The name of the scheduled query will show up in the dashboard and will be uset to set the incremetalization metadata.

A cron schedule or a Duration object representing the interval at which the query should run.

The features that this query will compute. Namespaces are exploded into all features in the namespace.

Whether to recompute all features or load from the feature store. If True, all features will be recomputed. If False, all features will be loaded from the feature store. If a list of features, only those features will be recomputed, and the rest will be loaded from the feature store.

max_samples: = None

The maximum number of samples to compute.

A hard-coded lower bound for the query. If set, the query will not use incrementalization.

A hard-coded upper bound for the query. If set, the query will not use incrementalization.

tags:
Collection[str] | None
= None

Allows selecting resolvers with these tags.

required_resolver_tags:
Collection[str] | None
= None

Requires that resolvers have these tags.

Whether to store the results of this query in the online store.

Whether to store the results of this query in the offline store.

incremental_resolvers:
Collection[str] | None
= None

If set to None, Chalk will incrementalize resolvers in the query's root namespaces. If set to a list of resolvers, this set will be used for incrementalization. Incremental resolvers must return a feature time in its output, and must return a DataFrame. Most commonly, this will be the name of a SQL file resolver. Chalk will ingest all new data from these resolvers and propagate changes to values in the root namespace.

Returns

A scheduled query object.

from chalk.features import ScheduledQuery
# this scheduled query will automatically run every 5 minutes after `chalk apply`
ScheduledQuery(
    name="ingest_users",
    schedule="*/5 * * * *",
    output=[User],
    store_online=True,
    store_offline=True,
)
Chart
Class

Class describing a single visual metric.

Attributes
registry
ClassVar[set[Chart]]
Functions

Create a chart for monitoring or alerting on the Chalk dashboard.

Parameters

The name of the chart. If a name is not provided, the chart will be named according to the series and formulas it contains.

The length of the window, e.g. "20m" or "1h".

Other Parameters
Show All
keep: = False
Returns

A chart for viewing in the Chalk dashboard.

from chalk.monitoring import Chart, Series
Chart(name="Request count").with_trigger(
    Series
        .feature_null_ratio_metric()
        .where(feature=User.fico_score) > 0.2,
)

Override the name of a chart.

Parameters

A new name for a chart.

Returns
type:

A copy of your Chart with the new name.

Change the window period for a Chart.

Parameters

A new window period for a chart, e.g. "20m" or "1h".

Returns
type:

A copy of your Chart with the new window period.

Attaches a Series to your Chart instance.

Parameters
series:
SeriesBase

A Series instance to attach to the Chart. A Chart can have any number of Series.

Returns
type:

A copy of your chart with the new name

Get a Series from your Chart by series name.

It is advised to use different series names within your charts.

Parameters

The name of the Series.

Returns
type:
SeriesBase

The first series added to your Chart with the given series name.

Attaches a Trigger to your Chart. Your Chart may optionally have one Trigger.

Parameters
expression:
ThresholdFunction

Triggers are applied when a certain series is above or below a given value. The expression specifies the series, operand, and value as follows

  • the left-hand side of the expression must be a Series instance.
  • the operand must be < or >
  • the right-hand side must be an int or float Thus, if we have a Series instance series1, expression=series1 > 0.5 will result in an alert when series is greater than 0.5.

The name for the new trigger.

severity:
AlertSeverityKind
= AlertSeverityKind.INFO

The severity of the trigger.

  • critical
  • error
  • warning
  • info

The owner or email of the trigger.

description: = None

A description to your Trigger. Descriptions provided here will be included in the alert message in Slack or PagerDuty.

For Slack alerts, you can use the mrkdwn syntax described here: https://api.slack.com/reference/surfaces/formatting#basics

Returns
type:

A copy of your Chart with the new trigger.

Explicitly link a Chart to a feature. This chart will then be visible on the webpage for this feature. Charts may only be linked to one entity.

Parameters

A Chalk feature

Returns
type:

A copy of your chart linked to the feature.

Explicitly link a chart to a resolver. This chart will then be visible on the webpage for this resolver. Charts may only be linked to one entity.

Parameters

A Chalk resolver.

Returns
type:

A copy of your chart linked to the resolver.

Explicitly link a chart to a query. This chart will then be visible on the webpage for this query. Charts may only be linked to one entity.

Parameters

A name of a Chalk query

Returns
type:

A copy of your chart linked to the query.

Designates that this chart and all of its descendants will be registered.

Returns
type:

The same chart.

Retrieve a series or formula by name from a chart.

Parameters

The name of the series or formula to retrieve.

Returns
type:
SeriesBase | _Formula
Series
Class

Class describing a series of data in two dimensions, as in a line chart. Series should be instantiated with one of the classmethods that specifies the metric to be tracked.

Functions

Creates a Series of metric kind FeatureRequestCount.

Parameters
name: = None

A name for your new feature_request_count Series. If no name is provided, one will be created.

Returns
type:
FeatureRequestCountSeries

A new FeatureRequestCountSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureStaleness.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new feature_staleness Series. If not provided, a name will be generated for you.

Returns
type:
FeatureStalenessSeries

A new FeatureStalenessSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureValue.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new feature_value Series. If not provided, a name will be generated for you.

Returns
type:
FeatureValueSeries

A new FeatureValueSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureNullRatio.

Parameters
name: = None

A name for your new feature_null_ratio Series. If no name is provided, one will be created.

Returns
type:
FeatureNullRatioSeries

A new FeatureNullRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverRequestCount.

Parameters
name: = None

A name for your new resolver_request_count Series. If no name is provided, one will be created.

Returns
type:
ResolverRequestCountSeries

A new ResolverRequestCountSeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new resolver_latency Series. If not provided, a name will be generated for you.

Returns
type:
ResolverLatencySeries

A new ResolverLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverSuccessRatio.

Parameters
name: = None

A name for your new resolver_success_ratio Series. If no name is provided, one will be created.

Returns
type:
ResolverSuccessRatioSeries

A new ResolverSuccessRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind QueryCount.

Parameters
name: = None

A name for your new query_count Series. If no name is provided, one will be created.

Returns
type:
QueryCountSeries

A new QueryCountSeries instance that inherits from the Series class.

Creates a Series of metric kind QueryLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new query_latency Series. If not provided, a name will be generated for you.

Returns
type:
QueryLatencySeries

A new QueryLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind QuerySuccessRatio.

Parameters
name: = None

A name for your new query_success_ratio Series. If no name is provided, one will be created.

Returns
type:
QuerySuccessRatioSeries

A new QuerySuccessRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind CronCount.

Parameters
name: = None

A name for your new cron_count Series. If no name is provided, one will be created.

Returns
type:
CronCountSeries

A new CronCountSeries instance that inherits from the Series class.

Creates a Series of metric kind CronLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new cron_latency Series. If not provided, a name will be generated for you.

Returns
type:
CronLatencySeries

A new CronLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamMessageLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_message_latency Series. If not provided, a name will be generated for you.

Returns
type:
StreamMessageLatencySeries

A new StreamMessageLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamMessagesProcessed.

Parameters
name: = None

A name for your new stream_messages_processed Series. If no name is provided, one will be created.

Returns
type:
StreamMessagesProcessedSeries

A new StreamMessagesProcessedSeries instance that inherits from the Series class.

Creates a Series of metric kind StreamWindowsProcessed.

Parameters
name: = None

A name for your new stream_windows_processed Series. If no name is provided, one will be created.

Returns
type:
StreamWindowsProcessedSeries

A new StreamWindowsProcessedSeries instance that inherits from the Series class.

Creates a Series of metric kind StreamWindowLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_window_latency Series. If not provided, a name will be generated for you.

Returns
type:
StreamWindowLatencySeries

A new StreamWindowLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamLag.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_lag Series. If not provided, a name will be generated for you.

Returns
type:
StreamLagSeries

A new StreamLagSeries instance that inherits from the Series class.

Given two DataFrames, left and right, check if left == right, and raise otherwise.

Parameters

The DataFrame to compare.

The DataFrame to compare with.

If False, allows the assert/test to succeed if the required columns are present, irrespective of the order in which they appear.

If False, allows the assert/test to succeed if the required rows are present, irrespective of the order in which they appear; as this requires sorting, you cannot set on frames that contain un-sortable columns.

Raises

If left does not equal right

error:
MissingDependencyException

If chalkpy[runtime] is not installed.

Functions

Used to freeze the 'now' value used to execute filters like after(days_ago=30).

Parameters

The time to freeze to. Must be timezone aware.

@online
def get_average_spend_30d(
    spend: User.cards[after(days_ago=30)],
) -> User.average_spend_30d:
    return spend.mean()
with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)):
    now = datetime.now(tz=timezone.utc)
    get_average_spend_30d(
        spend=DataFrame([
            Card(spend=10, ts=now - timedelta(days=31)),
            Card(spend=20, ts=now - timedelta(days=29)),
            Card(spend=30, ts=now - timedelta(days=28)),
        ])
    )

Returns the current time that filters will use.

Returns
type:

The current time that filters will use.

with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)) as ft:
    assert ft.time() == datetime(2021, 1, 1, tzinfo=timezone.utc)

The freeze_time class is a context manager, so it can be used with the with statement. The __enter__ method is called when entering the context manager.

The freeze_time class is a context manager, so it can be used with the with statement. The __exit__ method is automatically called when exiting the context manager.