Testing
Unit tests for Chalk resolvers
Chalk lets you specify your feature pipelines using idiomatic Python. You can unit test individual resolvers and combinations of resolvers as you would normal Python functions.
Read more about how to write integration tests for resolvers here.
Consider the following features and resolvers:
from chalk.features import features, Features
from chalk import online
@features
class Home:
id: str
address: str
price: int
sq_ft: int
@online
def get_address(hid: Home.id) -> Home.address:
return "Bridge Street" if hid == 1 else "Filbert Street"
@online
def get_home_data(
hid: Home.id,
) -> Features[Home.price, Home.sq_ft]:
return Home(
price=200_000,
sq_ft=2_000,
)
You can test them just like normal Python functions using any unit testing framework:
def test_single_output(self):
assert get_address(2) == "Filbert Street"
def test_multiple_output(self):
result = get_home_data(2)
assert result.price == 200_000
assert result.sq_ft == 2_000
assert result == Home(
price=200_000,
sq_ft=2_000,
)
If you specify projections or filters in
DataFrame
arguments of resolvers, Chalk will
automatically project out columns and filter rows in
the input data.
Consider if we extend the Home
class to include
a rooms
field:
@features
class Room:
id: str
name: str
@features
class Home:
id: str
address: str
price: int
sq_ft: int
rooms: DataFrame[Room] = has_many(
lambda: Room.home_id == Home.id
)
num_bedrooms: int
@online
def get_num_bedrooms(
rooms: Home.rooms[Room.name == 'bedroom']
) -> Home.num_bedrooms:
return len(rooms)
The get_num_bedrooms
resolver filters the rooms
argument to only include bedrooms. We can test this
by passing in a list of rooms, some of which are
bedrooms and some of which are not:
def test_get_num_rooms():
# Rooms is automatically converted to a `DataFrame`
rooms = [
Room(id=1, name="bedroom"),
Room(id=2, name="kitchen"),
Room(id=3, name="bedroom"),
]
# The kitchen room is filtered out
assert get_num_bedrooms(rooms) == 2
# `get_num_bedrooms` also works with a `DataFrame`
assert get_num_bedrooms(DataFrame(rooms)) == 2
Note that although we passed in a list of three rooms,
only two of them were bedrooms, so the resolver
returns 2
.
Furthermore, we didn’t need to convert the list of
rooms to a DataFrame
. In this case, we passed in a
list of Room
objects, but Chalk automatically
converts it to a DataFrame
for us. We also could
have passed in a polars.DataFrame
or any other
valid constructor for DataFrame
.
Chalk will also automatically project the input data
as specified by the DataFrame
argument. So, if you
specify which columns you need in the resolver body,
but accidentally use an extra column in your resolver
body, your unit tests will fail.
For example, if we wrote a resolver for summing
the square footage of all rooms in a home
using the Room.sq_ft
feature, but accidentally
excluded the Room.sq_ft
column in the DataFrame
argument:
@online
def get_sqft(
rooms: Home.rooms[Room.name]
) -> Home.sq_ft:
# This will fail because we used `Room.sq_ft`
return rooms[Room.sq_ft].sum()
Then our unit tests will fail:
def test_get_sqft():
rooms = [
Room(id=1, name="bedroom", sq_ft=100),
Room(id=2, name="kitchen", sq_ft=200),
Room(id=3, name="bedroom", sq_ft=300),
]
# This will fail because we used `Room.sq_ft`,
# which is excluded by the `Home.rooms[Room.name]`
# argument
assert get_sqft(rooms) == 400
Some DataFrame filters are implicitly resolved relative to “now”. In offline_query
and online_query
, this value
is controlled by input_times=
and now=
parameters, respectively. In unit tests, the value defaults to
datetime.now()
, but can be explicitly set with the chalk.freeze_time
context manager:
from chalk import freeze_time
@online
def get_p30d_transactions_count(txns: User.transactions[after(days_ago=30)]) -> User.recent_txn_count_30d:
"""
Count the number of transactions that have occurred in the past 30 days
"""
return txns.count()
NOW = datetime.now(tz=timezone.utc)
transactions = DataFrame([
Transaction(id=1, created_at=NOW),
Transaction(id=2, created_at=NOW - timedelta(days_ago=30)),
Transaction(id=3, created_at=NOW - timedelta(days_ago=32))
])
# There is a single transaction in the range (30, 0] days ago
assert get_p30d_transactions_count(transactions) == 1
# There are two transactions in the range (45, 15] days ago
with freeze_time(at=NOW - timedelta(days_ago=15)):
assert get_p30d_transactions_count(transactions) == 2
Note that the at=
parameter must be timezone-aware.