Chalk home page
Docs
API
CLI
  1. Testing
  2. Unit Tests

Chalk lets you specify your feature pipelines using idiomatic Python. You can unit test individual resolvers and combinations of resolvers as you would normal Python functions.


Testing resolvers

Consider the following features and resolvers:

example.py
from chalk.features import features
from chalk import online, Features

@features
class Home:
    id: str
    address: str
    price: int
    sq_ft: int

@online
def get_address(hid: Home.id) -> Home.address:
    return "Bridge Street" if hid == 1 else "Filbert Street"

@online
def get_home_data(
    hid: Home.id,
) -> Features[Home.price, Home.sq_ft]:
    return Home(
        price=200_000,
        sq_ft=2_000,
    )

You can test them just like normal Python functions using any unit testing framework:

test_example.py
def test_single_output(self):
    assert get_address(2) == "Filbert Street"

def test_multiple_output(self):
    result = get_home_data(2)
    assert result.price == 200_000
    assert result.sq_ft == 2_000
    assert result == Home(
        price=200_000,
        sq_ft=2_000,
    )

DataFrame inputs

If you specify projections or filters in DataFrame arguments of resolvers, Chalk will automatically project out columns and filter rows in the input data.

Filters

Consider if we extend the Home class to include a rooms field:

example.py
@features
class Room:
    id: str
    name: str

@features
class Home:
  id: str
  address: str
  price: int
  sq_ft: int
  rooms: DataFrame[Room] = has_many(
      lambda: Room.home_id == Home.id
  )
  num_bedrooms: int

@online
def get_num_bedrooms(
    rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
    return len(rooms)

The get_num_bedrooms resolver filters the rooms argument to only include bedrooms. We can test this by passing in a list of rooms, some of which are bedrooms and some of which are not:

test_example.py
def test_get_num_rooms():
    # Rooms is automatically converted to a `DataFrame`
    rooms = [
        Room(id=1, name="bedroom"),
        Room(id=2, name="kitchen"),
        Room(id=3, name="bedroom"),
    ]

    # The kitchen room is filtered out
    assert get_num_bedrooms(rooms) == 2

    # `get_num_bedrooms` also works with a `DataFrame`
    assert get_num_bedrooms(DataFrame(rooms)) == 2

Note that although we passed in a list of three rooms, only two of them were bedrooms, so the resolver returns 2.

Furthermore, we didn’t need to convert the list of rooms to a DataFrame. In this case, we passed in a list of Room objects, but Chalk automatically converts it to a DataFrame for us. We also could have passed in a polars.DataFrame or any other valid constructor for DataFrame.

Projections

Chalk will also automatically project the input data as specified by the DataFrame argument. So, if you specify which columns you need in the resolver body, but accidentally use an extra column in your resolver body, your unit tests will fail.

For example, if we wrote a resolver for summing the square footage of all rooms in a home using the Room.sq_ft feature, but accidentally excluded the Room.sq_ft column in the DataFrame argument:

example.py
@online
def get_sqft(
    rooms: Home.rooms[Room.name]
) -> Home.sq_ft:
    # This will fail because we used `Room.sq_ft`
    return rooms[Room.sq_ft].sum()

Then our unit tests will fail:

test_example.py
def test_get_sqft():
    rooms = [
        Room(id=1, name="bedroom", sq_ft=100),
        Room(id=2, name="kitchen", sq_ft=200),
        Room(id=3, name="bedroom", sq_ft=300),
    ]

    # This will fail because we used `Room.sq_ft`,
    # which is excluded by the `Home.rooms[Room.name]`
    # argument
    assert get_sqft(rooms) == 400

after(...)/before(...) and time filtering

Some dataframe filters are implicitly resolved relative to “now”. In offline_query and online_query, this value is controlled by input_times= and now= parameters, respectively. In unit tests, the value defaults to datetime.now(), but can be explicitly set with the chalk.freeze_time context manager:

from chalk import freeze_time

@online
def get_p30d_transactions_count(txns: User.transactions[after(days_ago=30)]) -> User.recent_txn_count_30d:
    """
    Count the number of transactions that have occurred in the past 30 days
    """
    return txns.count()

NOW = datetime.now(tz=timezone.utc)

transactions = DataFrame([
    Transaction(id=1, created_at=NOW),
    Transaction(id=2, created_at=NOW - timedelta(days_ago=30)),
    Transaction(id=3, created_at=NOW - timedelta(days_ago=32))
])

# There is a single transaction in the range (30, 0] days ago
assert get_p30d_transactions_count(transactions) == 1

# There are two transactions in the range (45, 15] days ago
with freeze_time(at=NOW - timedelta(days_ago=15)):
    assert get_p30d_transactions_count(transactions) == 2

Note that the at= parameter must be timezone-aware.