Chalk home page
Docs
API
CLI

Chalk API Reference

Features define the data that you want to compute and store. Your features are defined as Python classes that look like dataclasses. For example:

@features
class User:
   id: int
   name: str
   email: str
   credit_card: CreditCard = has_one(
       lambda: CreditCard.user_id == User.id
   )
   logins: DataFrame[Login] = has_many(
       lambda: Login.user_id == User.id
   )

Features can be nested, as with credit_card and logins above. In this section, we'll dive into API components that make up the building blocks of features.

Chalk lets you spell out your features directly in Python.

Features are namespaced to a FeatureSet. To create a new FeatureSet, apply the @features decorator to a Python class with typed attributes. A FeatureSet is constructed and functions much like Python's own dataclass.

Parameters
owner: = None

The individual or team responsible for these features. The Chalk Dashboard will display this field, and alerts can be routed to owners.

tags: = None

Added metadata for features for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Setting etl_offline_to_online on a feature class assigns it to all features on the class which do not explicitly specify etl_offline_to_online.

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Assigning a max_staleness to the feature class assigns it to all features on the class which do not explicitly specify a max_staleness value of their own.

singleton: = False
Other Parameters
Show All
cls:
Type[T] | None
= None
name: = None
@features(
    owner="andy@chalk.ai",
    max_staleness="30m",
    etl_offline_to_online=True,
    tags="user-group",
)
class User:
    id: str
    # Comments here appear in the web!
    # :tags: pii
    name: str | None
    # :owner: userteam@mycompany.com
    location: LatLng

Add metadata and configuration to a feature.

Parameters
owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature. Read more at Owner

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team. Read more at Tags

version: = None

The maximum version for a feature. Versioned features can be referred to with the @ operator:

@features
class User:
    id: str
    score: int = feature(version=2)
str(User.score @ 2)
"user.score@2"

See more at Versioning

The default version for a feature. When you reference a versioned feature without the @ operator, you reference the default_version. Set to 1 by default.

@features
class User:
    id: str
    score: int = feature(version=2, default_version=2)
str(User.score)
"user.score"

See more at Default versions

max_staleness:
... | Duration | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver. Read more at Caching

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers. Read more at Reverse ETL

min:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
_TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

A list of validations to apply to this feature. Generally, max, min, max_length, and min_length are more convenient, but the parameter strict applies to all of those parameters. If you want to mix strict and non-strict validations, you can use this parameter.

default:
_TRich | ...
= ...

The default value of the feature if it otherwise can't be computed. If you don't need to specify other metadata, you can also assign a default in the same way you would assign a default to a dataclass:

from chalk.features import features
@features
class User:
    num_purchases: int = 0
Other Parameters
Show All
description: = None
name: = None
primary: = None
encoder:
TEncoder[_TPrim, _TRich] | None
= None
decoder:
TDecoder[_TPrim, _TRich] | None
= None
dtype: = None
offline_ttl:
... | Duration | None
= ...
Returns
type:
_TRich

The type of the input feature, given by _TRich.

from chalk.features import Primary
@features
class User:
    uid: Primary[int]
    # Uses a default value of 0 when one cannot be computed.
    num_purchases: int = 0
    # Description of the name feature.
    # :owner: fraud@company.com
    # :tags: fraud, credit
    name: str = feature(
        max_staleness="10m",
        etl_offline_to_online=True
    )
    score = feature(
        version=2, default_version=2
    )

Specify a feature that represents a one-to-one relationship.

Read more at Has One

Parameters
f:

The join condition between @feature classes. This argument is callable to allow for forward references to members of this class and the joined class.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    balance: float
@features
class User
    id: str
    card: Card = has_one(
        lambda: User.id == Card.user_id
    )

Specify a feature that represents a one-to-many relationship.

Parameters
f:

The join condition between @features classes. This argument is callable to allow for forward references to members of this class and the joined class.

max_staleness:
Duration | None | ...
= ...

The maximum staleness of the joined feature. The items in the joined feature aggregate, storing the latest values of the joined feature for each primary key in the joined feature.

from chalk.features import DataFrame, features
@features
class Card
    id: str
    balance: float
@features
class User
    id: str
    cards: DataFrame[Card] = has_many(
        lambda: User.id == Card.user_id
    )

The function after can be used with DataFrame to compute windowed features.

After filters a DataFrame relative to the current time in context. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to after take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for the DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[after(hours_ago=1, minutes_ago=30)]

The function before can be used with DataFrame to compute windowed features.

Before filters a DataFrame relative to the current time in context. This time could be in the past if you’re using an offline resolver. Using window functions ensures that you maintain point-in-time correctness.

The parameters to before take many keyword arguments describing the time relative to the present.

Parameters

Number of days ago.

Number of hours ago.

Number of minutes ago.

Number of seconds ago.

index: = None

The feature to use for the filter. By default, index is the FeatureTime of the referenced feature class.

Other Parameters
Show All
weeks_ago: = 0
Returns
type:

A filter for a DataFrame.

from chalk.features import DataFrame, features
@features
class Card:
    ...
@features
class User:
    cards: DataFrame[Card]
User.cards[before(hours_ago=1, minutes_ago=30)]
Primary
Class

Marks a feature as the primary feature for a feature class.

Features named id on feature classes without an explicit primary feature are declared primary keys by default, and don't need to be marked with Primary.

If you have primary key feature with a name other than id, you can use this marker to indicate the primary key.

Examples

@features
class User:
    uid: Primary[int]
Functions
Parameters

The type of the feature value.

Returns

The type, with a special annotation indicating that it is a primary key.

Specify explicit data validation for a feature.

The feature() function can also specify these validations, but this class allows you to specify both strict and non-strict validations at the same time.

Functions

Set validation parameters for a feature.

Parameters
min:
T | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
T | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

from chalk.features import features, feature
@features
class User:
    fico_score: int = feature(
        validations=[
            Validation(min=300, max=850, strict=True),
            Validation(min=300, max=320, strict=False),
            Validation(min=840, max=850, strict=False),
        ]
    )
    # If only one set of validations were needed,
    # you can use the [`feature`](#feature) function instead:
    first_name: str = feature(
        min_length=2, max_length=64, strict=True
    )

Decorator to create an online resolver.

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like:

@online(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
    ...
owner: = None

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be returned along with each output feature.

Please use supported Chalk durations 'w', 'd', 'h', 'm', 's', and/or 'ms'.

Read more at Timeout and Duration

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@online
def name_match(
    name: User.full_name,
    account_name: User.bank_account.title
) -> User.account_name_match_score:
    if name.lower() == account_name.lower():
        return 1.
    return 0.

Decorator to create an offline resolver.

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

cron: = None

You can schedule resolvers to run on a pre-determined schedule via the cron argument to resolver decorators.

Cron can sample all examples, a subset of all examples, or a custom provided set of examples.

Read more at Scheduling

when: = None

Like tags, when can filter when a resolver is eligible to run. Unlike tags, when can use feature values, so that you can write resolvers like::

@offline(when=User.risk_profile == "low" or User.is_employee)
def resolver_fn(...) -> ...:
   ...
owner: = None

Allows you to specify an individual or team who is responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

You can specify the maximum duration to wait for the resolver's result. Once the resolver's runtime exceeds the specified duration, a timeout error will be raised.

Please use supported Chalk durations 'w', 'd', 'h', 'm', 's', and/or 'ms'.

Read more at Timeout and Duration

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
Returns

A ResolverProtocol which can be called as a normal function! You can unit-test resolvers as you would unit-test any other code.

Read more at Unit Tests

@offline(cron="1h")
def get_fraud_score(
    email: User.email,
    name: User.name,
) -> User.fraud_score:
    return socure.get_sigma_score(email, name)
Cron
Class

Detailed options for specify the schedule and filtering functions for Chalk batch jobs.

Functions

Run an online or offline resolver on a schedule.

This class lets you add a filter or sample function to your cron schedule for a resolver. See the overloaded signatures for more information.

Parameters

The period of the cron job. Can be either a crontab ("0 * * * *") or a Duration ("2h").

filter: = None

Optionally, a function to filter down the arguments to consider.

See Filtering examples for more information.

sample: = None

Explicitly provide the sample function for the cron job.

See Custom examples for more information.

Using a filter

def fltr(v: Account.active):
    return v
@online(cron=Cron(schedule="1d", filter=fltr))
def fn(balance: Account.balance) -> ...:

Using a sample function

def s() -> DataFrame[User.id]:
    return DataFrame.read_csv(...)
@offline(cron=Cron(schedule="1d", sample=s))
def fn(balance: User.account.balance) -> ...:

A resolver, returned from the decorators @offline and @online.

Attributes

The content of the resolver as a string.

Individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:
- [`None`](https://docs.python.org/3/library/constants.html#None) (default) - candidate to run in every environment
- [`str`](https://docs.python.org/3/library/stdtypes.html#str) - run only in this environment
- `list[str]` - run in any of the specified environment and no others

Read more at Environments

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

The docstring of the resolver.

The docstring of the resolver.

The filename in which the resolver is defined.

The name of the resolver, either given by the name of the function, or by the keyword argument name given to @offline or @online.

Functions

Returns the result of calling the function decorated with @offline or @online with the given arguments.

Parameters
args:
P.args
= ()

The arguments to pass to the decorated function. If one of the arguments is a DataFrame with a filter or projection applied, the resolver will only be called with the filtered or projected data. Read more at https://docs.chalk.ai/docs/unit-tests#data-frame-inputs

kwargs:
P.kwargs
= {}
Returns
type:
T_co

The result of calling the decorated function with args. Useful for unit-testing.

Read more at Unit Tests

@online
def get_num_bedrooms(
    rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
    return len(rooms)
rooms = [
    Room(id=1, name="bedroom"),
    Room(id=2, name="kitchen"),
    Room(id=3, name="bedroom"),
]
assert get_num_bedrooms(rooms) == 2
MachineType
Type Alias

The type of machine to use.

You can optionally specify that resolvers need to run on a machine other than the default. Must be configured in your deployment.

Chalk includes a DataFrame class that models tabular data in much the same way that pandas does. However, there are some key differences that allow the Chalk DataFrame to increase type safety and performance.

Like pandas, Chalk's DataFrame is a two-dimensional data structure with rows and columns. You can perform operations like filtering, grouping, and aggregating on a DataFrame. However, there are two main differences.

  • Lazy implementation - Chalk's DataFrame is lazy and can be backed by multiple data sources, where a pandas.DataFrame executes eagerly in memory.
  • Use as a type - Chalk's DataFrame[...] can be used to represent a type of data with pre-defined filters.

Lazy Execution

Unlike pandas, the implementation of a Chalk DataFrame is lazy, and can be executed against many different backend sources of data. For example, in unit tests, a DataFrame uses an implementation backed by polars. But if your DataFrame was returned from a SQL source, filters and aggregations may be pushed down to the database for efficient execution.

Use as a Type

Each column of a Chalk DataFrame typed by a Feature type. For example, you might have a resolver returning a DataFrame containing user ids and names:

@features
class User:
   id: int
   name: str
   email: str

@online
def get_users() -> DataFrame[User.id, User.name]:
   return DataFrame([
       User(id=1, name="Alice"),
       User(id=2, name="Bob")
   ])

Note that the DataFrame type is parameterized by the columns that it contains. In this case, the DataFrame contains two columns, User.id and User.name.

The specific operations available on a DataFrame are discussed below. For a higher-level discussion, see DataFrame.

Attributes
filters
ClassVar[tuple[Filter, ...]]
columns
tuple[Feature, ...]

The maximum number of rows to return

The shape of the DataFrame as a tuple of (num_rows, num_columns).

Examples

DataFrame({User.id: [1, 2, 3, 4, 5]}).shape
(5, 1)
Functions

Construct a Chalk DataFrame.

Parameters
data: = None

The data. Can be an existing pandas.DataFrame, polars.DataFrame or polars.LazyFrame, a sequence of feature instances, or a dict mapping a feature to a sequence of values.

missing_value_strategy:
MissingValueStrategy
= 'default_or_allow'

The strategy to use to handle missing values.

A feature value is "missing" if it is an ellipsis (...), or it is None and the feature is not annotated as Optional[...].

The available strategies are:

  • 'error': Raise a TypeError if any missing values are found. Do not attempt to replace missing values with the default value for the feature.
  • 'default_or_error': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, raise a TypeError.
  • 'default_or_allow': If the feature has a default value, then replace missing values with the default value for the feature. Otherwise, leave it as None. This is the default strategy.
  • 'allow': Allow missing values to be stored in the DataFrame. This option may result non-nullable features being assigned None values.

Row-wise construction

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])

Column-wise construction

df = DataFrame({
    User.id: [1, 2],
    User.first: ["Sam", "Iris"],
    User.last: ["Wu", "Xi"]
})

Construction from polars.DataFrame

import polars
df = DataFrame(polars.DataFrame({
    "user.id": [1, 2],
    "user.first": ["Sam", "Iris"],
    "user.last": ["Wu", "Xi"]
}))

Filter the rows of a DataFrame or project out columns.

You can select columns out of a DataFrame from the set of columns already present to produce a new DataFrame scoped down to those columns.

Or, you can filter the rows of a DataFrame by using Python's built-in operations on feature columns.

Parameters

Filters and projections to apply to the DataFrame.

Returns

A DataFrame with the filters and projections in item applied.

df = DataFrame({
    User.age: [21, 22, 23],
    User.email: [...],
})

Filtering

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai"
]

Projecting

df[User.name]

Filtering & Projecting

df = df[
    User.age > 21 and
    User.email == "joe@chalk.ai",
    User.name
]

Aggregate the DataFrame by the specified columns.

Parameters
group:
dict[Feature | Any, Feature | Any]

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

Returns
type:

The DataFrame with the specified aggregations applied.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 3        │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Compute a histogram with fixed width bins.

Parameters
nbins: = None

If supplied, will be used to compute the binwidth.

If not supplied, computed from the data (actual max and min values).

base: = None

The value of the first histogram bin. Defaults to the minimum value of column.

eps: = 1e-13

Allowed floating point epsilon for histogram base

column: = None

The column to compute the histogram on. If not supplied, the DataFrame is assumed to contain a single column.

descending: = False

If True, the histogram buckets will be sorted in descending order.

Returns
type:

A list of the counts in each bin.

DataFrame({
  Taco.price: list(range(100, 200)),
}).histogram_list(nbins=4, base=100)
[25, 25, 25, 25]

Group based on a time value (date or datetime).

The groups are defined by a time-based window, and optionally, other columns in the DataFrame. The "width" of the window is defined by the period parameter, and the spacing between the windows is defined by the every parameter. Note that if the every parameter is smaller than the period parameter, then the windows will overlap, and a single row may be assigned to multiple groups.

As an example, consider the following DataFrame:


val:    a  b    c   d e     f           g h
    ─────────●─────────●─────────●─────────●───────▶
time:        A         B         C         D
    ┌─────────┐
1   │   a  b  │                                    1: [a, b]
    └────┬────┴────┐
2   ◀───▶│ b    c  │                               2: [b, c]
    every└────┬────┴────┐
3   ◀────────▶│ c   d e │                          3: [c, d, e]
      period  └────┬────┴────┐
4                  │d e     f│                     4: [d, e, f]
                   └────┬────┴────┐
5                       │   f     │                5: [f]
                        └────┬────┴────┐
6                            │         │
                             └────┬────┴────┐
7                                 │     g h │      7: [g, h]
                                  └────┬────┴────┐
8                                      │g h      │ 8: [g, h]
                                       └─────────┘

In the above example, the sixth time bucket is empty, and will not be included in the resulting DataFrame.

Parameters
index:
Feature | Any

The column to use as the index for the time-based grouping.

A mapping from the desired column name in the resulting DataFrame to the aggregation operation to perform on the source DataFrame.

The spacing between the time-based windows. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration.

group:
dict[Feature | Any, Feature | Any] | None
= None

A mapping from the desired column name in the resulting DataFrame to the name of the column in the source DataFrame. This parameter is optional, and if not specified, then the resulting DataFrame groups will be determined by the index parameter alone.

period: = None

The width of the time-based window. This parameter can be specified as a str or a timedelta. If specified as a str, then it must be a valid Duration. If None it is equal to every.

Other Parameters
Show All
offset: = None
start_by:
'window' | 'datapoint' | 'monday' | 'tuesday' | 'wednesday' | 'thursday' | 'friday' | 'saturday' | 'sunday'
= 'window'
Returns
type:

A new DataFrame with the specified time-based grouping applied. The resulting DataFrame will have a column for each of the keys in group", "or each of the keys inagg, and for theindex` parameter.

from chalk import DataFrame, op
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
        User.ts: [datetime(2020, 1, 1), datetime(2020, 1, 1), datetime(2020, 1, 3)],
    },
).group_by_hopping(
     index=User.ts,
     group={User.id: User.id},
     agg={User.val: op.median(User.val)},
     period="1d",
)
╭─────────┬──────────┬──────────╮
│ User.id │ User.ts  │ User.val │
╞═════════╪══════════╪══════════╡
│  1      │ 2020-1-1 │ 3        │
├─────────┼──────────┼──────────┤
│  3      │ 2020-1-3 │ 10       │
╰─────────┴──────────┴──────────╯

Vertically stack the DataFrame with another DataFrame containing the same columns. The DataFrame other will be appended to the bottom of this DataFrame.

Parameters

The other DataFrame to stack with this DataFrame.

Returns
type:

The DataFrame with the other DataFrame stacked on the bottom.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.vstack(df)

Return the number of unique values in the specified column.

Parameters
column: = None

The column to compute the number of unique values for. If None, then the number of unique values in the entire DataFrame is returned.

Returns
type:

The number of unique values in the specified column.

from chalk.features import DataFrame
df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
df.num_unique(User.id)
2

Rename columns in the DataFrame.

Parameters

A mapping from the current feature for a column to the desired feature for the column.

Returns
type:

The DataFrame with the specified columns renamed.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
]).rename({User.last: User.family})

Add a column to the DataFrame.

Parameters

The name of the column to add.

The definition of the column to add. This could be a constant value (e.g. 1 or True), an expression (e.g. op.max(User.score_1, User.score_2)), or a list of values (e.g. [1, 2, 3]).

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_column(User.fraud_score, 0)
# Concatenation of first & last as full_name
df.with_column(
    User.full_name, op.concat(User.first, User.last)
)
# Alias a column name
df.with_column(
    User.first_name, User.first
)

Add columns to the DataFrame.

Parameters

A Mapping from the desired name of the column in the DataFrame to the definition of the new column.

Returns
type:

A new DataFrame with all the existing columns, plus those specified in this function.

df = DataFrame([
    User(id=1, first="Sam", last="Wu"),
    User(id=2, first="Iris", last="Xi")
])
# Set the fraud score to 0 for all users
df.with_columns({User.fraud_score: 0})
# Concatenation of first & last as full_name
df.with_columns({
    User.full_name: op.concat(User.first, User.last)
})
# Alias a column name
df.with_columns({
    User.first_name: User.first
})

Deprecated. Use DataFrame(...) instead.

Deprecated. Use DataFrame(...) instead.

Read a .csv file as a DataFrame.

Parameters

The path to the .csv file. This may be a S3 or GCS storage url.

Whether the .csv file has a header row as the first row.

columns:
| None
= None

A mapping of index to feature name.

Returns
type:
Self

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_csv(
    "s3://...",
    columns={0: MyFeatures.id, 1: MyFeatures.name},
    has_header=False,
)

Read a .avro file as a DataFrame.

Parameters

The path to the .avro file. This may be a S3 or GCS storage url.

Returns
type:
Self

A DataFrame with the contents of the file loaded as features.

values = DataFrame.read_avro(
    "s3://...",
)

Compute the max value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the max value of each column.

Returns
type:
Self

A DataFrame with the max value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).max()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 3       │ 10       │
╰─────────┴──────────╯

Compute the mean value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the mean value of each column.

Returns
type:
Self

A DataFrame with the mean value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).mean()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 2       │ 5        │
╰─────────┴──────────╯

Compute the median value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the median value of each column.

Returns
type:
Self

A DataFrame with the median value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).median()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 2       │ 4        │
╰─────────┴──────────╯

Compute the min value of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the min value of each column.

Returns
type:
Self

A DataFrame with the min value of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).min()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1       │ 1        │
╰─────────┴──────────╯

Compute the standard deviation of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the standard deviation of each column.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:
Self

A DataFrame with the standard deviation of each column.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
).std()
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│ 1       │ 4.5826   │
╰─────────┴──────────╯

Compute the sum of each of the columns in the DataFrame. The resulting DataFrame will have a single row with the sum of each column.

Returns
type:
Self

A DataFrame with the sum of each column.

Compute the variance of each of the columns in the DataFrame.

Parameters
ddof: = 1

"Delta Degrees of Freedom": the divisor used in the calculation is N - ddof, where N represents the number of elements. By default, ddof is 1.

Returns
type:
Self

A DataFrame with the variance of each column.

Returns whether any the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns whether all the values in the DataFrame are truthy. Requires the DataFrame to only contain boolean values.

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3

Returns the number of rows in the DataFrame.

Returns
type:

The number of rows in the DataFrame.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 2, 3],
        User.val: [1, 4, 10],
    }
)
len(df)
3
df.count()
3

Get the only item from the DataFrame. This method will raise an error if the DataFrame contains more than one row or more than column.

Sort the DataFrame by the given columns.

Parameters
by:
str | Feature | Any

Feature(s) to sort by. Strings are parsed as feature names.

more_by:
str | Feature | Any
= ()

Additional columns to sort by, specified as positional arguments.

descending: = False

Sort in descending order. When sorting by multiple columns, can be specified per feature by passing a sequence of booleans.

nulls_last: = False

Place null values last.

Returns
type:
Self

A new DataFrame with the rows sorted.

df = DataFrame({
    User.a: [1, 2, 3],
    User.b: [3, 2, 1],
})
df.sort(User.a)
a  b
-----------
0     1  3
1     2  2
2     3  1

Get the underlying DataFrame as a polars.LazyFrame.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns

The underlying polars.LazyFrame.

Get the underlying DataFrame as a pyarrow.Table.

Parameters
prefixed: = True

Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Returns
type:

The underlying pyarrow.Table. This format is the canonical representation of the data in Chalk.

Get the underlying DataFrame as a pandas.DataFrame.prefixed Whether to prefix the column names with the feature namespace (i.e. if prefixed=True, user.name, if if prefixed=False, name)

Parameters

If True, use strings for column names. If False, use Feature objects.

prefixed: = True
Returns

The data formatted as a pandas.DataFrame.

Get values in the DataFrame as Features instances.

df = DataFrame({
    SpaceShip.id: [1, 2],
    SpaceShip.volume: [4_000, 5_000]
})
df.to_features()
[
    SpaceShip(id=1, volume=4000),
    SpaceShip(id=2, volume=5000)
]

Slice the DataFrame.

Parameters
offset: = 0

The offset to start at.

length: = None

The number of rows in the slice. If None (the default), include all rows from offset to the end of the DataFrame.

Returns
type:
Self

The dataframe with the slice applied.

op
Class

Operations for aggregations in DataFrame.

The class methods on this class are used to create aggregations for use in DataFrame.group_by.

Functions

Add together the values of col and *cols in a DataFrame.

Parameters

There must be at least one column to aggregate.

cols = ()

Subsequent columns to aggregate.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 4.5      │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Multiply together the values of col in a DataFrame.

Parameters

The column to aggregate. Used in DataFrame.group_by.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
        User.active: [True, True, False],
    }
).group_by(
     group={User.id: User.id}
     agg={
        User.val: op.product(User.val),
        User.active: op.product(User.active),
     }
)
╭─────────┬──────────┬─────────────╮
│ User.id │ User.val │ User.active │
╞═════════╪══════════╪═════════════╡
│  1      │ 2        │ 1           │
├─────────┼──────────┼─────────────┤
│  3      │ 10       │ 0           │
╰─────────┴──────────┴─────────────╯

Find the maximum of the values of col in a DataFrame.

Parameters

The column along which to find the maximum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.max(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 4        │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Find the minimum of the values of col in a DataFrame.

Parameters

The column along which to find the minimum value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.min(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 0.5      │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Find the median of the values of col in a DataFrame.

Parameters

The column along which to find the median value. In the case of an even number of elements, the median is the mean of the two middle elements.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.median(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 3        │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Find the mean of the values of col in a DataFrame.

Parameters

The column along which to find the mean value.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.mean(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 3        │
├─────────┼──────────┤
│  3      │ 6.5      │
╰─────────┴──────────╯

Find the standard deviation of the values of col in a DataFrame.

Parameters

The column along which to find the standard deviation.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.std(User.val)}
)

Find the variance of the values of col in a DataFrame.

Parameters

The column along which to find the variance.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.variance(User.val)}
)

Find the count of the values of col in a DataFrame.

Parameters

The column along which to find the count.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [1, 5, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.count(User.val)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 2        │
├─────────┼──────────┤
│  3      │ 1        │
╰─────────┴──────────╯

Concatenate the string values of col and col2 in a DataFrame.

Parameters

The column along which to find the last value.

The column with which to concatenate col.

sep: = ''

The separator to use when concatenating col and col2.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val='a'),
        User(id=1, val='b'),
        User(id=3, val='c'),
        User(id=3, val='d'),
    ]
).group_by(
    group={User.id: User.id},
    agg={User.val: op.concat(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ "ab"     │
├─────────┼──────────┤
│  3      │ "cd"     │
╰─────────┴──────────╯

Deprecated. Use concat instead.

Find the last value of col in a DataFrame.

Parameters

The column along which to find the last value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=True,
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 1        │
├─────────┼──────────┤
│  3      │ 5        │
╰─────────┴──────────╯

Find the first value of col in a DataFrame.

Parameters

The column along which to find the first value.

from chalk.features import DataFrame
DataFrame(
    [
        User(id=1, val=1),
        User(id=1, val=3),
        User(id=3, val=7),
        User(id=3, val=5),
    ]
).sort(
    User.amount, descending=False
).group_by(
    group={User.id: User.id},
    agg={User.val: op.last(User.val)},
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ 1        │
├─────────┼──────────┤
│  3      │ 5        │
╰─────────┴──────────╯

A class for refining an aggregation defined by op.

Attributes
filters
list[Filter]
Functions

Filter the aggregation to apply to only rows where all the filters in f are true. If no rows match the filter, the aggregation for the column will be null, and the resulting feature type must be a nullable type.

Parameters
f:
Filter | Any
= ()

A set of filters to apply to the aggregation. Each of the filters must be true to apply the aggregation.

Returns

The aggregation, allowing you to continue to chain methods.

from chalk.features import DataFrame
df = DataFrame(
    {
        User.id: [1, 1, 3],
        User.val: [0.5, 4, 10],
    }
).group_by(
     group={User.id: User.id}
     agg={User.val: op.sum(User.val).where(User.val > 5)}
)
╭─────────┬──────────╮
│ User.id │ User.val │
╞═════════╪══════════╡
│  1      │ null     │
├─────────┼──────────┤
│  3      │ 10       │
╰─────────┴──────────╯

Create a Snowflake data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a PostgreSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a MySQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a BigQuery data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a Redshift data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a CloudSQL data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Create a SQLite source for a file.

Parameters
filename:
str | PathLike

The name of the file.

name: = None

The name to use in testing

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

Testing SQL source.

If you have only one SQLiteInMemorySource integration, there's no need to provide a distinguishing name.

Parameters
name: = None

The name of the integration.

Additional arguments to use when constructing the SQLAlchemy engine.

Additional arguments to use when constructing an async SQLAlchemy engine.

Returns

The SQL source for use in Chalk resolvers.

source = SQLiteInMemorySource(name="RISK")

Create a Databricks data source. SQL-based data sources created without arguments assume a configuration in your Chalk Dashboard. Those created with the name= keyword argument will use the configuration for the integration with the given name. And finally, those created with explicit arguments will use those arguments to configure the data source. See the overloaded signatures for more details.

Incremental settings for Chalk SQL queries.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

"WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>"

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s. This will incrementalize your query using the timestamp of the latest row that has been ingested.

Chalk will also include another query parameter named "chalk_last_execution_timestamp" that can be used instead. This will incrementalize your query using the last time the query was executed.

incremental_timestamp:

If incremental_timestamp is "feature_time", we will incrementalize your query using the timestamp of the latest row that has been ingested. This is the default.

If incremental_timestamp is "resolver_execution_time", we will incrementalize your query using the last time the query was executed instead.

Attributes
mode
'row' | 'group' | 'parameter'

The amount of overlap to check for late-arriving rows.

The column on which to incrementalize.

incremental_timestamp
'feature_time' | 'resolver_execution_time'

The timestamp to set as the lower bound

Functions

Run a query from a SQL string.

Parameters

The query that you'd like to run.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql string specified by query need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Run a query from a SQL file.

This method allows you to query the SQL file within a Python resolver. However, Chalk can also infer resolvers from SQL files. See SQL File Resolvers for more information.

Parameters
path:
str | bytes | PathLike

The path to the file with the sql file, relative to the caller's file, or to the directory that your chalk.yaml file lives in.

fields:
dict[str, Feature | str | Any] | None
= None

A mapping from the column names selected to features.

args: = None

Any args in the sql file specified by path need to have corresponding value assignments in args.

Returns

A query that can be returned from a @online or @offline resolver.

Query using a SQLAlchemy model.

Parameters

Arguments as would normally be passed to a SQLAlchemy.

Returns

A query that can be returned from a resolver.

Get an SQLAlchemy Engine. The engine will be created and cached on the first call of this method.

Returns

A SQLAlchemy engine.

Execute a query to a pa.Table.

Parameters
finalized_query:
FinalizedChalkQuery

The query to execute

columns_to_converters:
Callable[[list[str]], dict[str, FeatureConverter]]

A function that, given the list of column names returned from the database, produces a mapping of columns to converters. Columns not in the resulting dictionary should be ignored.

connection:
Connection | None
= None

A connection to execute the query under. If not specified, then a new connection is acquired.

The executor will attempt to avoid the use of SQLAlchemy and use more efficient methods to retrieve data.

Returns
type:

A pa.Table containing the results.

Execute a query to a pa.Table.

Parameters
finalized_query:
FinalizedChalkQuery

The query to execute

columns_to_converters:
Callable[[list[str]], dict[str, FeatureConverter]]

A function that, given the list of column names returned from the database, produces a mapping of columns to converters. Columns not in the resulting dictionary should be ignored.

connection:
sqlalchemy.ext.asyncio.AsyncConnection | None
= None

A connection to execute the query under. If not specified, then a new connection is acquired.

Returns
type:

A pa.Table containing the results.

Returns dialect for sqlglot operations. Also happens to be the name of the source.

Returns
type:

The dialect if applicable to sqlglot

Functions

Automatically ingest a table.

Parameters

The name of the table to ingest.

The feature class that this table should be mapping to, e.g. User.

Columns in the table that should be ignored, and not mapped to features, even if there is a matching name.

Features on the feature class that should be ignored, and not mapped to columns, even if there is a matching name.

Columns that must exist in the mapping.

Features that must exist in the mapping.

Explicit mapping of columns to features for names that do not match.

Settings for incrementally ingesting the table.

from chalk.sql import PostgreSQLSource
from chalk.features import features
PostgreSQLSource().with_table(
    name="users",
    features=User,
).with_table(
    name="accounts",
    features=Account,
    # Override one of the column mappings.
    column_to_feature={
        "acct_id": Account.id,
    },
)
Functions

Return the first result of this Query or None if the result doesn't contain any row.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a DataFrame.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_column:
str | Feature | None
= None

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

params: = None

Apply the given filtering criterion to a copy of this Query, using keyword expressions.

Parameters
kwargs: = {}

The column names assigned to the desired values (i.e. name="Maria").

Returns

A query that can be returned from a resolver or further filtered.

from chalk.sql import PostgreSQLSource
session = PostgreSQLSource()
session.query(UserFeatures(id=UserSQL.id)).filter_by(name="Maria")

Apply the given filtering criterion to a copy of this Query, using SQL expressions.

Parameters

SQLAlchemy filter criterion

Returns

A query that can be returned from a resolver or further filtered.

Apply one or more ORDER BY criteria to the query and return the newly resulting Query.

Parameters
clauses: = ()

SQLAlchemy columns.

Returns

A query that can be returned from a resolver or further filtered.

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Note: this requires the usage of the fields={...} argument when used in conjunction with query_string or query_sql_file.

Returns

The raw result of executing the query. For .all(), returns a DataFrame. For .one() or .one_or_none(), returns a Features instance corresponding to the relevant feature class.

Functions

Materialize the query.

Chalk queries are lazy, which allows Chalk to perform performance optimizations like push-down filters. Instead of calling execute, consider returning this query from a resolver as an intermediate feature, and processing that intermediate feature in a different resolver.

Returns
type:

A DataFrame with the results of the query.

Return at most one result or raise an exception.

Returns None if the query selects no rows. Raises if multiple object identities are returned, or if multiple rows are returned for a query that returns only scalar values as opposed to full identity-mapped entities.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return exactly one result or raise an exception.

Returns
type:
SingletonFinalizedChalkQuery

A query that can be returned from a resolver.

Return the results represented by this Query as a list.

Returns
type:
DataframeFinalizedChalkQuery

A query that can be returned from a resolver.

Operates like .all(), but tracks previous_latest_row_timestamp between query executions in order to limit the amount of data returned.

previous_latest_row_timestamp will be set the start of the query execution, or if you return a FeatureTime-mapped column, Chalk will update previous_latest_row_timestamp to the maximum observed FeatureTime value.

In "row" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only return "new" results, by adding a clause that looks like:

WHERE <incremental_column> >= <previous_latest_row_timestamp> - <lookback_period>

In "group" mode: incremental_column MUST be set.

Returns the results represented by this query as a list (like .all()), but modifies the query to only results from "groups" which have changed since the last run of the query.

This works by (1) parsing your query, (2) finding the "group keys", (3) selecting only changed groups. Concretely:

SELECT user_id, sum(amount) as sum_amount
FROM payments
GROUP BY user_id

would be rewritten like this:

SELECT user_id, sum(amount) as sum_amount
FROM payments
WHERE user_id in (
    SELECT DISTINCT(user_id)
    FROM payments WHERE created_at >= <previous_latest_row_timestamp> - <lookback_period>
)
GROUP BY user_id

In "parameter" mode: incremental_column WILL BE IGNORED.

This mode is for cases where you want full control of incrementalization. Chalk will not manipulate your query. Chalk will include a query parameter named "chalk_incremental_timestamp". Depending on your SQL dialect, you can use this value to incrementalize your query with :chalk_incremental_timestamp or %(chalk_incremental_timestamp)s.

Parameters

This should reference a timestamp column in your underlying table, typically something like "updated_at", "created_at", "event_time", etc.

Defaults to 0, which means we only return rows that are strictly newer than the last observed row.

mode:
'row' | 'group' | 'parameter'
= 'row'

Defaults to "row", which indicates that only rows newer than the last observed row should be considered. When set to "group", Chalk will only ingest features from groups which are newer than the last observation time. This requires that the query is grouped by a primary key.

incremental_timestamp:
'feature_time' | 'resolver_execution_time'
= 'feature_time'

Defaults to "feature_time", which means that the timestamp associated with the last feature value will be used as the incremental time. Alternatively, setting this parameter to "resolver_execution_time" will use last literal timestamp that the resolver ran.

Decorator to create a stream resolver.

Parameters

The streaming source, e.g. KafkaSource(...) or KinesisSource(...).

mode:
'continuous' | 'tumbling' | None
= None

This parameter is defined when the streaming resolver returns a windowed feature. Tumbling windows are fixed-size, contiguous and non-overlapping time intervals. You can think of tumbling windows as adjacently arranged bins of equal width. Tumbling windows are most often used alongside max_staleness to allow the features to be sent to the online store and offline store after each window period.

Continuous windows, unlike tumbling window, are overlapping and exact. When you request the value of a continuous window feature, Chalk looks at all the messages received in the window and computes the value on-demand.

See more at Window Modes

parse:
Callable[[T], V] | None
= None

A callable that will interpret an input prior to the invocation of the resolver. Parse functions can serve many functions, including pre-parsing bytes, skipping unrelated messages, or supporting rekeying.

See more at Parsing

keys: = None

A mapping from input BaseModel attribute to Chalk feature attribute to support continuous streaming re-keying. This parameter is required for continuous resolvers. Features that are included here do not have to be explicitly returned in the stream resolver: the feature will automatically be set to the key value used for aggregation. See more at Keys

timestamp: = None

An optional string specifying an input attribute as the timestamp used for windowed aggregations. See more at Custom Event Timestamping

Other Parameters
Show All
owner: = None
Returns
type:

A callable function! You can unit-test stream resolvers as you would unit-test any other code.

Decorator to create a sink. Read more at Sinks

Parameters

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environment can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

Read more at Environments

tags: = None

Allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

You might consider using tags, for example, to change out whether you want to use a sandbox environment for a vendor, or to bypass the vendor and return constant values in a staging environment.

Read more at Tags

buffer_size: = None

Count of updates to buffer.

owner: = None

The individual or team responsible for this resolver. The Chalk Dashboard will display this field, and alerts can be routed to owners.

Other Parameters
Show All
fn:
Callable[P, T] | None
= None
debounce: = None
max_delay: = None
upsert: = None
integration:
BaseSQLSourceProtocol | SinkIntegrationProtocol | None
= None
Returns

A callable function! You can unit-test sinks as you would unit test any other code. Read more at Unit Tests

@sink
def process_updates(
    uid: User.id,
    email: User.email,
    phone: User.phone,
):
    user_service.update(
        uid=uid,
        email=email,
        phone=phone
    )
process_updates(123, "sam@chalk.ai", "555-555-5555")

Declare a windowed feature.

Examples

@features
class User:
    failed_logins: Windowed[int] = windowed("10m", "24h")
Functions

Create a windowed feature.

See more at Windowed Features

Parameters
buckets: = ()

The size of the buckets for the window function. Buckets are specified as strings in the format "1d", "2h", "1h30m", etc. You may also choose to specify the buckets using the days, hours, and minutes parameters instead. The buckets parameter is helpful if you want to use multiple units to express the bucket size, like "1h30m".

days: = ()

Convenience parameter for specifying the buckets in days. Using this parameter is equvalent to specifying the buckets parameter with a string like "1d".

hours: = ()

Convenience parameter for specifying the buckets in hours. Using this parameter is equvalent to specifying the buckets parameter with a string like "1h".

minutes: = ()

Convenience parameter for specifying the buckets in minutes. Using this parameter is equvalent to specifying the buckets parameter with a string like "1m".

owner: = None

You may also specify which person or group is responsible for a feature. The owner tag will be available in Chalk's web portal. Alerts that do not otherwise have an owner will be assigned to the owner of the monitored feature.

tags: = None

Add metadata to a feature for use in filtering, aggregations, and visualizations. For example, you can use tags to assign features to a team and find all features for a given team.

default:
TRich | ...
= ...

The default value of the feature if it otherwise can't be computed.

max_staleness:
Duration | ... | None
= ...

When a feature is expensive or slow to compute, you may wish to cache its value. Chalk uses the terminology "maximum staleness" to describe how recently a feature value needs to have been computed to be returned without re-running a resolver.

Read more at Caching

offline_ttl:
Duration | ... | None
= ...
version: = None

Feature versions allow you to manage a feature as its definition changes over time.

The version keyword argument allows you to specify the maximum number of versions available for this feature.

See more at Versioning

When True, Chalk copies this feature into the online environment when it is computed in offline resolvers.

Read more at Reverse ETL

min:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x >= min.

max:
TRich | None
= None

If specified, when this feature is computed, Chalk will check that x <= max.

min_length: = None

If specified, when this feature is computed, Chalk will check that len(x) >= min_length.

max_length: = None

If specified, when this feature is computed, Chalk will check that len(x) <= max_length.

strict: = False

If True, if this feature does not meet the validation criteria, Chalk will not persist the feature value and will treat it as failed.

Other Parameters
Show All
description: = None
name: = None
encoder:
TEncoder[TPrim, TRich] | None
= None
decoder:
TDecoder[TPrim, TRich] | None
= None
dtype: = None
Returns
type:
Windowed[TRich]

Metadata for the windowed feature, parameterized by TPrim (the primitive type of the feature) and TRich (the decoded type of the feature, if decoder is provided).

from chalk import windowed, Windowed
@features
class User:
    id: int
    email_count: Windowed[int] = windowed(days=range(1, 30))
    logins: Windowed[int] = windowed("10m", "1d", "30d")
User.email_count["7d"]
Attributes

The URL of one of your Kafka brokers from which to fetch initial metadata about your Kafka cluster

The name of the topic to subscribe to.

An S3 or GCS URI that points to the keystore file that should be used for brokers. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

An S3 or GCS URI that points to the certificate authority file that should be used to verify broker certificates. You must configure the appropriate AWS or GCP integration in order for Chalk to be able to access these files.

security_protocol
'PLAINTEXT' | 'SSL' | 'SASL_PLAINTEXT' | 'SASL_SSL'

Protocol used to communicate with brokers. Valid values are "PLAINTEXT", "SSL", "SASL_PLAINTEXT", and "SASL_SSL". Defaults to "PLAINTEXT".

sasl_mechanism
'PLAIN' | 'GSSAPI' | 'SCRAM-SHA-256' | 'SCRAM-SHA-512' | 'OAUTHBEARER'

Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are "PLAIN", "GSSAPI", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER". Defaults to "PLAIN".

Username for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

Password for SASL PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 authentication.

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kafka topic to send messages when message processing fails

Attributes

The name of your stream. Either this or the stream_arn must be specified

The ARN of your stream. Either this or the stream_name must be specified

AWS region string, e.g. "us-east-2"

The name of the integration, as configured in your Chalk Dashboard.

Messages older than this deadline will not be processed.

Kinesis stream name to send messages when message processing fails

AWS access key id credential

AWS secret access key credential

AWS access key id credential

optional endpoint to hit Kinesis server

Optional role ARN for the consumer to assume

Base class for all stream sources generated from @stream.

Attributes

e.g. 'kafka' or 'kinesis'

stream name for kinesis, topic for kafka

Kafka topic name or Kinesis stream name

The ChalkClient is the primary Python interface for interacting with Chalk.

You can use it to query data, trigger resolver runs, gather offline data, and more.

Functions

Create a ChalkClient. Clients can either be created explicitly, with environment variables, or using credentials in ~/.chalk.yml.

The __init__ method specifies overloads for this purpose. See the overloaded methods for details.

from chalk.client import ChalkClient
client = ChalkClient(branch="testing")
client.query(
    input={
        User.name: "Katherine Johnson"
    },
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

Compute features values using online resolvers. See Overview for more information.

Parameters

The features for which there are known values, mapped to those values. For example, {User.id: 1234}. Features can also be expressed as snakecased strings, e.g. {"user.id": 1234}

Outputs are the features that you'd like to compute from the inputs. For example, [User.age, User.name, User.email].

now: = None

The time at which to evaluate the query. If not specified, the current time will be used. This parameter is complex in the context of online_query since the online store only stores the most recent value of an entity's features. If now is in the past, it is extremely likely that None will be returned for cache-only features.

This parameter is primarily provided to support:

  • controlling the time window for aggregations over cached has-many relationships
  • controlling the time wnidow for aggregations over has-many relationships loaded from an external database

If you are trying to perform an exploratory analysis of past feature values, prefer offline_query.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

query_name: = None

The semantic name for the query you're making, for example, "loan_application_model". Typically, each query that you make from your application should have a name. Chalk will present metrics and dashboard functionality grouped by 'query_name'.

explain:
bool | 'only'
= False

Log the query execution plan. Requests using explain=True will be slower than requests using explain=False. If "only", the query will not be executed, and only the query plan will be returned.

If True, 'include_meta' will be set to True as well.

If True, the output of each of the query plan stages will be stored. This option dramatically impacts the performance of the query, so it should only be used for debugging.

Other Parameters
Show All
meta: = None
Returns
type:
OnlineQueryResult

Wrapper around the output features and any query metadata, plus errors encountered while running the resolvers.

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

from chalk.client import ChalkClient
result = ChalkClient().query(
    input={
        User.name: "Katherine Johnson"
    },
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)
result.get_feature_value(User.fico_score)

Execute multiple queries (represented by queries= argument) in a single request. This is useful if the queries are "rooted" in different @features classes -- i.e. if you want to load features for User and Merchant and there is no natural relationship object which is related to both of these classes, multi_query allows you to submit two independent queries.

In contrast, query_bulk executes a single query with multiple inputs/outputs.

Compute features values for many rows of inputs using online resolvers. See Overview for more information on online query.

This method is similar to query, except it takes in list of inputs, and produces one output per row of inputs.

This method is appropriate if you want to fetch the same set of features for many different input primary keys.

This method contrasts with multi_query, which executes multiple fully independent queries.

This endpoint is not available in all environments.

Parameters

The features for which there are known values, mapped to a list of the values.

Outputs are the features that you'd like to compute from the inputs.

The time at which to evaluate the query. If not specified, the current time will be used. The length of this list must be the same as the length of the values in input.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
Returns

An output containing results: list[BulkOnlineQueryResult], where each result contains dataframes of the results of each query.

from chalk.client import ChalkClient
ChalkClient().query_bulk(
    input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

Compute features values for many inputs using online resolvers. See Overview for more information.

This endpoint is not available in all environments.

Parameters

The features for which there are known values, mapped to a list of the values.

Outputs are the features that you'd like to compute from the inputs.

Maximum staleness overrides for any output features or intermediate features. See Caching for more information.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

If specified, Chalk will route your request to the relevant preview deployment.

If specified, Chalk will route your request to the relevant branch.

Other Parameters
Show All
correlation_id: = None
query_name: = None
meta: = None
Returns
type:
OnlineQueryResult

The outputs features and any query metadata, plus errors encountered while running the resolvers.

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

from chalk.client import ChalkClient
ChalkClient().query(
    input={User.name: ["Katherine Johnson", "Eleanor Roosevelt"]},
    output=[User.fico_score],
    staleness={User.fico_score: "10m"},
)

Compute feature values from the offline store or by running offline/online resolvers. See Dataset for more information.

Parameters

The features for which there are known values. It can be a mapping of features to a list of values for each feature, or an existing DataFrame. Each element in the DataFrame or list of values represents an observation in line with the timestamp in input_times.

A list of the times of the observations from input.

The features that you'd like to sample, if they exist. If an output feature was never computed for a sample (row) in the resulting DataFrame, its value will be None.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

A unique name that if provided will be used to generate and save a Dataset constructed from the list of features computed from the inputs.

If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.

You can specify a correlation ID to be used in logs and web interfaces. This should be globally unique, i.e. a uuid or similar. Logs generated during the execution of your query will be tagged with this correlation id.

max_samples: = None

The maximum number of samples to include in the DataFrame. If not specified, all samples will be returned.

wait: = False

If True, this method will block until the query is finished.

If True, progress bars will be shown while the query is running. Primarily intended for use in a Jupyter-like notebook environment. This flag will also be propagated to the methods of the resulting Dataset.

Used to control whether or not resolvers are allowed to run in order to compute feature values.

If True, all output features will be recomputed by resolvers. If False, all output features will be sampled from the offline store. If a list, all output features in recompute_features will be recomputed, and all other output features will be sampled from the offline store.

If specified, the query will only be run on data observed after this timestamp

If specified, the query will only be run on data observed before this timestamp

If True, the output of each of the query plan stages will be stored in S3/GCS. This will dramatically impact the performance of the query, so it should only be used for debugging. These files will be visible in the web dashboard's query detail view, and can be downloaded in full by clicking on a plan node in the query plan visualizer.

explain:
bool | 'only'
= False
tags: = None

The tags used to scope the resolvers. See Tags for more information.

Returns
type:

A Chalk Dataset.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now()
dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
        User.ts: [at] * len(uids),
    },
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()

Get a Chalk Dataset containing data from a previously created dataset.

If an offline query has been created with a dataset name, .get_dataset will return a Chalk Dataset. The Dataset wraps a lazily-loading Chalk DataFrame that enables us to analyze our data without loading all of it directly into memory. See Overview for more information.

Parameters

The name of the Dataset to return. Previously, you must have supplied a dataset name upon an offline query. Dataset names are unique for each environment.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

Returns
type:

A Dataset that lazily loads your query data.

from chalk.client import ChalkClient
uids = [1, 2, 3, 4]
at = datetime.now()
X = ChalkClient().offline_query(
    input={
        User.id: uids,
        User.ts: [at] * len(uids),
    },
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset='my_dataset_name'
)

Some time later...

dataset = ChalkClient().get_dataset(
    dataset_name='my_dataset_name'
)
...

If memory allows:

df: pd.DataFrame = dataset.get_data_as_pandas()

Triggers a resolver to run. See Triggered Runs for more information.

Parameters

The fully qualified name of the resolver to trigger.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.

The ID of the resolver run.

status
ResolverRunStatus

The current status of the resolver run.

from chalk.client import ChalkClient
ChalkClient().trigger_resolver_run(
    resolver_fqn="mymodule.fn"
)

Retrieves the status of a resolver run. See Triggered Runs for more information.

Parameters

ID of the resolver run to check.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment.

Returns
type:
ResolverRunResponse

Status of the resolver run and the run ID.

The ID of the resolver run.

status
ResolverRunStatus

The current status of the resolver run.

from chalk.client import ChalkClient
ChalkClient().get_run_status(
    run_id="3",
)
ResolverRunResponse(
    id="3",
    status=ResolverRunStatus.SUCCEEDED
)

Checks the identity of your client.

Useful as a sanity test of your configuration.

Returns
type:
WhoAmIResponse

The identity of your client.

The ID of the user or service token making the query.

from chalk.client import ChalkClient
ChalkClient().whoami()
WhoAmIResponse(user="44")

Targets feature observation values for deletion and performs deletion online and offline.

Parameters

The namespace in which the target features reside.

An optional list of the feature names of the features that should be deleted for the targeted primary keys. Not specifying this and not specifying the "tags" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "tags" parameter are mutually exclusive.

An optional list of tags that specify features that should be targeted for deletion. If a feature has a tag in this list, its observations for the primary keys you listed will be targeted for deletion. Not specifying this and not specifying the "features" field will result in all features being targeted for deletion for the specified primary keys. Note that this parameter and the "features" parameter are mutually exclusive.

The primary keys of the observations that should be targeted for deletion.

Returns

Holds any errors (if any) that occurred during the drop request. Deletion of a feature may partially-succeed.

from chalk.client import ChalkClient
ChalkClient().delete_features(
    namespace="user",
    features=["name", "email", "age"],
    primary_keys=[1, 2, 3]
)

Performs a drop on features, which involves a deletes all their data (both online and offline). Once the feature is reset in this manner, its type can be changed.

Parameters

The namespace in which the target features reside.

A list of the feature names of the features that should be dropped.

Returns

Holds any errors (if any) that occurred during the drop request. Dropping a feature may partially-succeed.

from chalk.client import ChalkClient
ChalkClient().drop_features(
    namespace="user",
    features=["name", "email", "age"],
)

Upload data to Chalk for use in offline resolvers or to prime a cache.

Parameters

The features for which there are known values, mapped to those values.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment

query_name: = None

Optionally associate this upload with a query name. See .query for more information.

Other Parameters
Show All
branch: = ...
correlation_id: = None
meta: = None
Returns

The errors encountered from uploading features.

from chalk.client import ChalkClient
ChalkClient().upload_features(
    input={
        User.id: 1,
        User.name: "Katherine Johnson"
    }
)

Upload data to Chalk for use in offline resolvers or to prime a cache.

Parameters

A list of mappings, each of which includes the features for which there are known values mapped to those values. Each mapping can have different keys, but each mapping must have the same root features class. OR A mapping where each feature key is mapped to a list of the values for that feature. You can consider this a mapping that describes columns (keys, i.e. features) and rows (the list of values in the map for each feature). Each list must be the same length. OR A pandas, polars or Chalk DataFrame.

The environment under which to run the upload. API tokens can be scoped to an environment. If no environment is specified in the upload, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

If specified, Chalk will route your request to the relevant preview deployment

Other Parameters
Show All
correlation_id: = None
meta: = None
Returns

The errors encountered from uploading features.

from chalk.client import ChalkClient
ChalkClient().multi_upload_features(
    input=[
        {
            User.id: 1,
            User.name: "Katherine Johnson"
        },
        {
            User.id: 2,
            User.name: "Eleanor Roosevelt"
        }
    ]
)

Get the most recent feature values from the offline store.

See Overview for more information.

Parameters

The features that you'd like to sample, if they exist. If an output feature was never computed for a sample (row) in the resulting DataFrame, its value will be None.

max_samples: = None

The maximum number of rows to return.

dataset: = None

The Dataset name under which to save the output.

The environment under which to run the resolvers. API tokens can be scoped to an environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

tags: = None

The tags used to scope the resolvers. See Tags for more information.

Other Parameters
Show All
output_id: = False
output_ts: = False
branch: = None
Returns

A pandas.DataFrame with columns equal to the names of the features in output, and values representing the value of the most recent observation.

from chalk.client import ChalkClient
sample_df = ChalkClient().sample(
    output=[
        Account.id,
        Account.title,
        Account.user.full_name
    ],
    max_samples=10
)

Create a new branch based off of a deployment from the server. By default, uses the latest live deployment.

Parameters

The name of the new branch to create.

create_only: = False

If True, will raise an error if a branch with the given name already exists. If False and the branch exists, then that branch will be deployed to.

switch: = True

If True, will switch the client to the newly created branch. Defaults to True.

The specific deployment ID to use for the branch. If not specified, the latest live deployment on the server will be used. You can see which deployments are available by clicking on the 'Deployments' tab on the project page in the Chalk dashboard.

The environment under which to create the branch. API tokens can be scoped to an environment. If no environment is specified in the query, the environment will be taken from the client's cached token.

Returns

A response object containing metadata about the branch.

from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")

Lists the current branches for this environment.

Returns
type:

A list of the names of branches available on this environment.

from chalk.client import ChalkClient
ChalkClient().get_branches()
["testing", "feat/new-feature"]

Displays the current branch this client is pointed at.

If the current environment does not support branch deployments or no branch is set, this method returns None.

Returns
type:

The name of the current branch or None.

from chalk.client import ChalkClient
client = ChalkClient(branch="my-branch")
assert client.get_branch() == "my-branch"

Point the ChalkClient at the given branch.

If the branch does not exist or if branch deployments are not enabled for the current environment, this method raises an error.

Parameters

The name of the branch to use

from chalk.client import ChalkClient
client = ChalkClient()
client.create_branch("my-new-branch")
client.set_branch("my-new-branch")

Returns a BranchGraphSummary object that contains the state of the branch server: Which resolver/features are defined, and the history of live notebook updates on the server.

Parameters
branch:
BranchId | ...
= ...

The branch to query. If not specified, the branch is expected to be included in the constructor for ChalkClient.

Optionally override the environment under which to query the branch state.

Tests a streaming resolver and its ability to parse and resolve messages. See Streams for more information.

Parameters
resolver:
str | Resolver

The streaming resolver or its string name.

The number of messages to digest from the stream source. As messages may not be incoming into the stream, this action may time out.

A filepath from which test messages will be ingested. This file should be newline delimited json as follows:

{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}
{"message_key": "my-key", "message_body": {"field1": "value1", "field2": "value2"}}

Each line may optionally contain a timezone string as a value to the key "message_timestamp".

Alternatively, keys can be supplied in code along with the "test_message_bodies" argument. Both arguments must be the same length.

Message bodies can be supplied in code as strings or bytes along with the "test_message_keys" argument. Both arguments must be the same length.

Optionally, timestamps can be provided for each message,

Other Parameters
Show All
branch:
BranchId | ...
= ...
Returns

A simple wrapper around a status and optional error message. Calling .features() will return the test results, if they exist. Otherwise, check .errors and .message for errors.

from chalk.streams import stream, KafkaSource
from chalk.client import ChalkClient
from chalk.features import Features, features
import pydantic
>>>
# This code is an example of a simple streaming feature setup.
stream_source=KafkaSource(...)
>>>
@features(etl_offline_to_online=True, max_staleness="7d")
class StreamingFeature:
    id: str
    user_id: str
    card_id: str
>>>
class StreamingMessage(pydantic.BaseModel):
    card_id: str
    user_id: str
>>>
@stream(source=stream_source)
def our_stream_resolver(
    m: StreamingMessage,
) -> Features[StreamingFeature.id, StreamingFeature.card_id, StreamingFeature.user_id]:
   return StreamingFeature(
       id=f"{m.card_id}-{m.user_id}",
       card_id=m.card_id,
       user_id=m.user_id,
   )
>>>
# Once you have done a `chalk apply`, you can test the streaming resolver with custom messages as follows
client = ChalkClient()
keys = ["my_key"] * 10
messages = [StreamingMessage(card_id="1", user_id=str(i)).json() for i in range(10)]
resp = client.test_streaming_resolver(
    resolver="our_stream_resolver",
    message_keys=keys,
    message_bodies=messages,
)
print(resp.features)
Attributes

The name of the feature requested, e.g. 'user.identity.has_voip_phone'.

The value of the requested feature. If an error was encountered in resolving this feature, this field will be empty.

The primary key of the resolved feature.

The error code encountered in resolving this feature. If no error occurred, this field is empty.

The time at which this feature was computed. This value could be significantly in the past if you're using caching.

meta
FeatureResolutionMeta | None

Detailed information about how this feature was computed.

The ChalkError describes an error from running a resolver or from a feature that can't be validated.

Attributes

The type of the error.

The category of the error, given in the type field for the error codes. This will be one of "REQUEST", "NETWORK", and "FIELD".

A readable description of the error message.

A human-readable hint that can be used to identify the entity that this error is associated with.

If provided, can be used to add additional context to 'display_primary_key'.

The exception that caused the failure, if applicable.

The fully qualified name of the failing feature, e.g. user.identity.has_voip_phone.

The fully qualified name of the failing resolver, e.g. my.project.get_fraud_score.

Functions

Class wrapper around revisions for Datasets.

Attributes
revision_id
uuid.UUID

UUID for the revision job.

UUID for the creator of the job.

Output features for the dataset revision.

Location of the givens stored for the dataset.

Status of the revision job.

Filters performed on the dataset.

Number of partitions for revision job.

Location of the outputs stored fo the dataset.

Storage version of the outputs.

Number of bytes of the output, updated upon success.

Timestamp for creation of revision job.

Timestamp for start of revision job.

Timestamp for end of revision job.

Name of revision, if given.

dataset_id
uuid.UUID | None

ID of revision, if name is given.

url linking to relevant dashboard page

Functions

Loads a pl.DataFrame containing the output. Use .to_polars_lazyframe() if you want a LazyFrame instead, which allows local filtering of datasets that are larger than memory.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.DataFrame materializing query output data.

Loads a pl.LazyFrame containing the output. This method is appropriate for working with larger-than-memory datasets. Use .to_polars() if you want a DataFrame instead.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.LazyFrame materializing query output data.

Loads a pl.LazyFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.LazyFrame materializing query output data.

Loads a pd.DataFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pd.DataFrame materializing query output data.

Loads a Chalk DataFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns
type:

A DataFrame materializing query output data.

Downloads output files pertaining to the revision to given path.

Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.

Parameters

A directory where the Parquet files from the dataset will be downloaded.

Other Parameters
Show All
output_id: = False
output_ts: = False
num_executors: = None

Loads a pl.LazyFrame containing the inputs.

Returns

A pl.LazyFrame materializing query input data.

returns and opens a url that opens the offline query page in the Chalk dashboard. Must be logged in.

Parameters

If True, does not open url in browser. Default is False.

Returns
type:

A url redirecting to the Chalk dashboard.

Waits for the revision job to complete.

ChalkClient.offline_query returns a DatasetRevision instance immediately after submitting the revision job. This method can be used to wait for the revision job to complete.

Once the revision job is complete, the status attribute of the DatasetRevision instance will be updated to reflect the status of the revision job.

If the revision job was successful, you can then use methods such as get_data_as_pandas() without having to wait for the revision job to complete.

Parameters

Whether to display progress bars while waiting for the revision job.

Downloads the resolver replay data for the given resolver in the revision, provided the revision had store_plan_stages enabled.

The replay data is functionally similar to viewing the intermediate results on the plan explorer.

If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.lazy Whether to return a pl.LazyFrame or a pl.DataFrame. If True, a pl.LazyFrame will be returned.

Parameters

The resolver to download the replay data for, or its fqn.

Dataset
Class

Wrapper around Offline Query results.

Datasets are obtained by invoking ChalkClient.offline_query(). Dataset instances store important metadata and enable the retrieval of offline query outputs.

Examples

from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now()
dataset: Dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
        User.ts: [at] * len(uids),
    },
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
df = dataset.get_data_as_pandas()
df.recompute(features=[User.fraud_score], branch="feature/testing")
Attributes

Whether the most recent DatasetRevision is finished or still pending.

Storage version number of outputs.

A list of all DatasetRevision instances belonging to this dataset.

The unique name for this dataset, if given.

dataset_id
uuid.UUID | None

The unique UUID for this dataset.

A list of errors in loading the dataset, if they exist.

Functions

Loads a pl.DataFrame containing the output. Use .to_polars_lazyframe() if you want a LazyFrame instead, which allows local filtering of datasets that are larger than memory.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.DataFrame materializing query output data.

Loads a pl.LazyFrame containing the output. This method is appropriate for working with larger-than-memory datasets. Use .to_polars() if you want a DataFrame instead.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.LazyFrame materializing query output data.

Loads a pl.LazyFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pl.LazyFrame materializing query output data.

Loads a pd.DataFrame containing the output.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pd.DataFrame materializing query output data.

Loads a Chalk DataFrame containing the output. Requires the pertinent Chalk features to be accessible via import

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns
type:

A DataFrame materializing query output data.

Loads a pd.DataFrame containing the output of the most recent revision.

Parameters
None
Other Parameters
Show All
output_id: = False
output_ts: = False
Returns

A pd.DataFrame materializing query output data.

Downloads output files pertaining to the revision to the given path.

Datasets are stored in Chalk as sharded Parquet files. With this method, you can download those raw files into a directory for processing with other tools.

Parameters

A directory where the Parquet files from the dataset will be downloaded.

Other Parameters
Show All
num_executors: = None
from chalk.client import ChalkClient, Dataset
uids = [1, 2, 3, 4]
at = datetime.now()
dataset: Dataset = ChalkClient().offline_query(
    input={
        User.id: uids,
        User.ts: [at] * len(uids),
    },
    output=[
        User.id,
        User.fullname,
        User.email,
        User.name_email_match_score,
    ],
    dataset_name='my_dataset'
)
dataset.download_data('my_directory')

Loads a pl.LazyFrame containing the inputs that were used to create the dataset.

Returns

A pl.LazyFrame materializing query input data.

returns and opens a url that opens the offline query page in the Chalk dashboard. Must be logged in.

Parameters

If True, does not open url in browser. Default is False.

Returns
type:

A url redirecting to the Chalk dashboard.

Creates a new revision of this Dataset by recomputing the specified features.

Carries out the new computation on the branch specified when constructing the client.

Parameters

A list of specific features to recompute. Features that don't exist in the dataset will be added. If not provided, all the existing features in the dataset will be recomputed.

branch: = None

If specified, Chalk will route your request to the relevant branch. If None, Chalk will route your request to a non-branch deployment. If not specified, Chalk will use the current client's branch info.

wait: = False

If True, this method will block until recomputation is finished.

If True, progress bars will be shown while recomputation is running. This flag will also be propogated to the methods of the resulting Dataset.

Raises
error:

If no branch was provided to the Chalk Client.

from chalk.client import ChalkClient
dataset = ChalkClient(branch="data_science").offline_query(...)
df = dataset.get_data_as_polars()
# make changes to resolvers in your project
dataset.recompute()
new_df = dataset.get_data_as_polars() # receive newly computed data

Downloads the resolver replay data for the given resolver in the latest revision of the dataset.

The replay data is functionally similar to viewing the intermediate results on the plan explorer.

If the resolver appears in only one stage of the plan, the resolver's replay data is returned directly. If the resolver instead appears in multiple stages of the plan, a mapping of the operation's ID to the replay data will be returned. If the resolver does not appear in the plan, an exception will be thrown.

Parameters
resolver:
Resolver

The resolver to download the replay data for, or its fqn.

The category of an error.

For more detailed error information, see ErrorCode

Values

Request errors are raised before execution of your resolver code. They may occur due to invalid feature names in the input or a request that cannot be satisfied by the resolvers you have defined.

Field errors are raised while running a feature resolver for a particular field. For this type of error, you'll find a feature and resolver attribute in the error type. When a feature resolver crashes, you will receive null value in the response. To differentiate from a resolver returning a null value and a failure in the resolver, you need to check the error schema.

Network errors are thrown outside your resolvers. For example, your request was unauthenticated, connection failed, or an error occurred within Chalk.

The detailed error code.

For a simpler category of error, see ErrorCodeCategory.

Values

The query contained features that do not exist.

A resolver was required as part of running the dependency graph that could not be found.

The query is invalid. All supplied features need to be rooted in the same top-level entity.

A feature value did not match the expected schema (e.g. incompatible type "int"; expected "str")

The resolver for a feature errored.

The resolver for a feature timed out.

A crash in a resolver that was to produce an input for the resolver crashed, and so the resolver could not run crashed, and so the resolver could not run.

The request was submitted with an invalid authentication header.

The supplied credentials do not provide the right authorization to execute the request.

An unspecified error occurred.

The operation was cancelled, typically by the caller.

The deadline expired before the operation could complete.

Information about an exception from a resolver run.

Attributes

The name of the class of the exception.

The message taken from the exception.

The stacktrace produced by the code.

The stacktrace produced by the code, full detail.

Attributes

The output features and any query metadata.

Errors encountered while running the resolvers.

If no errors were encountered, this field is empty.

Metadata about the query execution. Only present if include_meta=True is passed to the relevant query method.

Functions

Convenience method for accessing feature result from the data response.

Parameters

The feature or its string representation.

Returns

The FeatureResult for the feature, if it exists.

from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature(User.name).ts
datetime.datetime(2023, 2, 5, 23, 25, 26, 427605)
data.get_feature("user.name").meta.cache_hit
False

Convenience method for accessing feature values from the data response.

Parameters

The feature or its string representation.

Returns
type:

The value of the feature.

from chalk.client import ChalkClient
data = ChalkClient().query(...)
data.get_feature_value(User.name)
"Katherine Johnson"
data.get_feature_value("user.name")
"Katherine Johnson"
Attributes

The environment under which to run the resolvers. API tokens can be scoped to an # environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

Context in which to execute a query.

Attributes

The environment under which to run the resolvers. API tokens can be scoped to an # environment. If no environment is specified in the query, but the token supports only a single environment, then that environment will be taken as the scope for executing the request.

The tags used to scope the resolvers. More information at Tags

Raised when constructing a ChalkClient without valid credentials.

When this exception is raised, no explicit client_id and client_secret were provided, there was no ~/.chalk.yml file with applicable credentials, and the environment variables CHALK_CLIENT_ID and CHALK_CLIENT_SECRET were not set.

You may need to run chalk login from your command line, or check that your working directory is set to the root of your project.

Duration
Type Alias
str | timedelta | 'infinity'

Duration is used to describe time periods in natural language. To specify using natural language, write the count of the unit you would like, followed by the representation of the unit.

Chalk supports the following units:

Signifier Meaning
w Weeks
d Days
h Hours
m Minutes
s Seconds
ms Milliseconds

As well as the special keyword "infinity"

Examples

Signifier Meaning
"10h" 10 hours
"1w 2m" 1 week and 2 minutes
"1h 10m 2s" 1 hour, 10 minutes, and 2 seconds
"infinity" Unbounded time duration

Read more at Duration

CronTab
Type Alias

A schedule defined using the Unix-cron string format (* * * * *). Values are given in the order below:

Field Values
Minute 0-59
Hour 0-23
Day of Month 1-31
Month 1-12
Day of Week 0-6

Environments are used to trigger behavior in different deployments such as staging, production, and local development. For example, you may wish to interact with a vendor via an API call in the production environment, and opt to return a constant value in a staging environment.

Environments can take one of three types:

  • None (default) - candidate to run in every environment
  • str - run only in this environment
  • list[str] - run in any of the specified environment and no others

See more at Environments

Tags
Type Alias

Tags allow you to scope requests within an environment. Both tags and environment need to match for a resolver to be a candidate to execute.

Like Environments, tags control when resolvers run based on the Online Context or Training Context matching the tags provided to the resolver decorator. Resolvers optionally take a keyword argument named tags that can take one of three types:

  • None (default) - The resolver will be a candidate to run for every set of tags.
  • str - The resolver will run only if this tag is provided.
  • list[str] - The resolver will run in all of the specified tags match.

See more at Tags

Get the owner for a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The owner for a feature or feature class, if it exists. Note that the owner of a feature could be inherited from the feature class.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

@features(owner="ship")
class RocketShip:
    id: int
    software_version: str
owner(RocketShip.software_version)
'ship'

Get the tags for a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The tags for a feature, feature class, or resolver, if it exists. Note that the tags of a feature could be inherited from the feature class.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

Feature tags

@features(tags="group:risk")
class User:
    id: str
    # :tags: pii
    email: str
tags(User.id)
['group:risk']

Feature class tags

tags(User)
['group:risk']

Feature + feature class tags

tags(User.email)
['pii', 'group:risk']

Get the description of a feature, feature class, or resolver.

Parameters

A feature (User.email), feature class (User), or resolver (get_user)

Returns
type:

The description for a feature, feature class, or resolver, if it exists.

Raises
error:

If the supplied variable is not a feature, feature class, or resolver.

@features
class RocketShip:
    # Comments above a feature become
    # descriptions for the feature!
    software_version: str
description(RocketShip.software_version)
'Comments above a feature become descriptions for the feature!'

Determine whether a feature is a primary key.

Parameters

A feature (i.e. User.email)

Returns
type:

True if f is primary and False otherwise.

Raises
error:

If f is not a feature.

from chalk.features import features
@features
class User:
    uid: Primary[int]
    email: str
assert is_primary(User.uid)
assert not is_primary(User.email)

Determine whether a feature is a feature time. See Time for more details on FeatureTime.

Parameters

A feature (i.e. User.ts)

Returns
type:

True if the feature is a FeatureTime and False otherwise

from chalk.features import features
@features
class User:
    id: str
    updated_at: datetime = feature_time()
assert is_feature_time(User.updated_at) is True
assert is_feature_time(User.id) is False
AnyDataclass
Type Alias

Any class decorated by @dataclass.

There isn't a base class for dataclass, so we use this TypeAlias to refer to indicate any class decorated with @dataclass.

The base type for Chalk exceptions.

This exception makes error handling easier, as you can look only for this exception class.

Attributes

A message describing the specific type of exception raised.

A message that describes the specific type of exception raised and contains the readable representation of each error in the errors attribute.

Also includes the trace ID if one is available.

Chart
Class

Class describing a single visual metric.

Functions

Create a chart for monitoring or alerting on the Chalk dashboard.

Parameters

The name of the chart. If a name is not provided, the chart will be named according to the series and formulas it contains.

The length of the window, e.g. "20m" or "1h".

Other Parameters
Show All
keep: = False
Returns

A chart for viewing in the Chalk dashboard.

from chalk.monitoring import Chart, Series
Chart(name="Request count").with_trigger(
    Series
        .feature_null_ratio_metric()
        .where(feature=User.fico_score) > 0.2,
)

Override the name of a chart.

Parameters

A new name for a chart.

Returns
type:

A copy of your Chart with the new name.

Change the window period for a Chart.

Parameters

A new window period for a chart, e.g. "20m" or "1h".

Returns
type:

A copy of your Chart with the new window period.

Attaches a Series to your Chart instance.

Parameters
series:
SeriesBase

A Series instance to attach to the Chart. A Chart can have any number of Series.

Returns
type:

A copy of your chart with the new name

Get a Series from your Chart by series name.

It is advised to use different series names within your charts.

Parameters

The name of the Series.

Returns
type:
SeriesBase

The first series added to your Chart with the given series name.

Attaches a Trigger to your Chart. Your Chart may optionally have one Trigger.

Parameters
expression:
ThresholdFunction

Triggers are applied when a certain series is above or below a given value. The expression specifies the series, operand, and value as follows

  • the left-hand side of the expression must be a Series instance.
  • the operand must be < or >
  • the right-hand side must be an int or float Thus, if we have a Series instance series1, expression=series1 > 0.5 will result in an alert when series is greater than 0.5.

The name for the new trigger.

severity:
AlertSeverityKind
= AlertSeverityKind.INFO

The severity of the trigger.

  • critical
  • error
  • warning
  • info

The owner or email of the trigger.

description: = None

A description to your Trigger. Descriptions provided here will be included in the alert message in Slack or PagerDuty.

For Slack alerts, you can use the mrkdwn syntax described here: https://api.slack.com/reference/surfaces/formatting#basics

Returns
type:

A copy of your Chart with the new trigger.

Explicitly link a Chart to a feature. This chart will then be visible on the webpage for this feature. Charts may only be linked to one entity.

Parameters

A Chalk feature

Returns
type:

A copy of your chart linked to the feature.

Explicitly link a chart to a resolver. This chart will then be visible on the webpage for this resolver. Charts may only be linked to one entity.

Parameters

A Chalk resolver.

Returns
type:

A copy of your chart linked to the resolver.

Explicitly link a chart to a query. This chart will then be visible on the webpage for this query. Charts may only be linked to one entity.

Parameters

A name of a Chalk query

Returns
type:

A copy of your chart linked to the query.

Designates that this chart and all of its descendants will be registered.

Returns
type:

The same chart.

Retrieve a series or formula by name from a chart.

Parameters

The name of the series or formula to retrieve.

Returns
type:
SeriesBase | _Formula
Series
Class

Class describing a series of data in two dimensions, as in a line chart. Series should be instantiated with one of the classmethods that specifies the metric to be tracked.

Functions

Creates a Series of metric kind FeatureRequestCount.

Parameters
name: = None

A name for your new feature_request_count Series. If no name is provided, one will be created.

Returns
type:
FeatureRequestCountSeries

A new FeatureRequestCountSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureStaleness.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new feature_staleness Series. If not provided, a name will be generated for you.

Returns
type:
FeatureStalenessSeries

A new FeatureStalenessSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureValue.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new feature_value Series. If not provided, a name will be generated for you.

Returns
type:
FeatureValueSeries

A new FeatureValueSeries instance that inherits from the Series class.

Creates a Series of metric kind FeatureNullRatio.

Parameters
name: = None

A name for your new feature_null_ratio Series. If no name is provided, one will be created.

Returns
type:
FeatureNullRatioSeries

A new FeatureNullRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverRequestCount.

Parameters
name: = None

A name for your new resolver_request_count Series. If no name is provided, one will be created.

Returns
type:
ResolverRequestCountSeries

A new ResolverRequestCountSeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new resolver_latency Series. If not provided, a name will be generated for you.

Returns
type:
ResolverLatencySeries

A new ResolverLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind ResolverSuccessRatio.

Parameters
name: = None

A name for your new resolver_success_ratio Series. If no name is provided, one will be created.

Returns
type:
ResolverSuccessRatioSeries

A new ResolverSuccessRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind QueryCount.

Parameters
name: = None

A name for your new query_count Series. If no name is provided, one will be created.

Returns
type:
QueryCountSeries

A new QueryCountSeries instance that inherits from the Series class.

Creates a Series of metric kind QueryLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new query_latency Series. If not provided, a name will be generated for you.

Returns
type:
QueryLatencySeries

A new QueryLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind QuerySuccessRatio.

Parameters
name: = None

A name for your new query_success_ratio Series. If no name is provided, one will be created.

Returns
type:
QuerySuccessRatioSeries

A new QuerySuccessRatioSeries instance that inherits from the Series class.

Creates a Series of metric kind CronCount.

Parameters
name: = None

A name for your new cron_count Series. If no name is provided, one will be created.

Returns
type:
CronCountSeries

A new CronCountSeries instance that inherits from the Series class.

Creates a Series of metric kind CronLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new cron_latency Series. If not provided, a name will be generated for you.

Returns
type:
CronLatencySeries

A new CronLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamMessageLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_message_latency Series. If not provided, a name will be generated for you.

Returns
type:
StreamMessageLatencySeries

A new StreamMessageLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamMessagesProcessed.

Parameters
name: = None

A name for your new stream_messages_processed Series. If no name is provided, one will be created.

Returns
type:
StreamMessagesProcessedSeries

A new StreamMessagesProcessedSeries instance that inherits from the Series class.

Creates a Series of metric kind StreamWindowsProcessed.

Parameters
name: = None

A name for your new stream_windows_processed Series. If no name is provided, one will be created.

Returns
type:
StreamWindowsProcessedSeries

A new StreamWindowsProcessedSeries instance that inherits from the Series class.

Creates a Series of metric kind StreamWindowLatency.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_window_latency Series. If not provided, a name will be generated for you.

Returns
type:
StreamWindowLatencySeries

A new StreamWindowLatencySeries instance that inherits from the Series class.

Creates a Series of metric kind StreamLag.

Parameters
window_function:
'mean' | 'max' | '99%' | '95%' | '75%' | '50%' | '25%' | '5%' | 'min' | 'all'

The time window to calculate the metric over.

name: = None

A name for your new stream_lag Series. If not provided, a name will be generated for you.

Returns
type:
StreamLagSeries

A new StreamLagSeries instance that inherits from the Series class.

Given two DataFrames, left and right, check if left == right, and raise otherwise.

Parameters

The DataFrame to compare.

The DataFrame to compare with.

If False, allows the assert/test to succeed if the required columns are present, irrespective of the order in which they appear.

If False, allows the assert/test to succeed if the required rows are present, irrespective of the order in which they appear; as this requires sorting, you cannot set on frames that contain un-sortable columns.

Raises

If left does not equal right

error:
MissingDependencyException

If chalkpy[runtime] is not installed.

Functions

Used to freeze the 'now' value used to execute filters like after(days_ago=30).

Parameters

The time to freeze to. Must be timezone aware.

@online
def get_average_spend_30d(
    spend: User.cards[after(days_ago=30)],
) -> User.average_spend_30d:
    return spend.mean()
with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)):
    now = datetime.now(tz=timezone.utc)
    get_average_spend_30d(
        spend=DataFrame([
            Card(spend=10, ts=now - timedelta(days=31)),
            Card(spend=20, ts=now - timedelta(days=29)),
            Card(spend=30, ts=now - timedelta(days=28)),
        ])
    )

Returns the current time that filters will use.

Returns
type:

The current time that filters will use.

with freeze_time(datetime(2021, 1, 1, tzinfo=timezone.utc)) as ft:
    assert ft.time() == datetime(2021, 1, 1, tzinfo=timezone.utc)

The freeze_time class is a context manager, so it can be used with the with statement. The __enter__ method is called when entering the context manager.

The freeze_time class is a context manager, so it can be used with the with statement. The __exit__ method is automatically called when exiting the context manager.