Chalk home page
  1. Streaming
  2. Webhooks

Webhook resolvers function almost exactly like stream resolvers.

The first step is to create a webhook source:

from chalk.webhooks import WebhookSource

source = WebhookSource(id="my_webhook_id")

You can optionally also specify a Pydantic Model for any of the body, headers, and query parameters. Chalk will validate the parameters against the model. If the model doesn’t validate, Chalk will return a 400 with the Pydantic error message to the user.

from pydantic import BaseModel

class WebhookBody(BaseModel):
    username: str
    friends: list[int]

class QueryParameters(BaseModel):
    environment: str

class Headers(BaseModel):

source = WebhookSource(

Once you create the source and deploy your change, Chalk will host a URL for your third-party service to hit.

Webhooks will be exposed at the url<client_id>/<environment>/<id>

After creating your webhook source, you can start processing messages and creating feature values. The webhook object above named source has an attribute named Message that gives a type you can use in your resolver, described below:


bodyBaseModel | str
The body of the webhook. If you provided a Pydantic model in describing the webhook, this will be a model. Otherwise, it will be the string contents of the body.
argsdict[str, str | list[str]]
The query parameters attached to the URL. If the same query arg is used more than once, the value in the dict will be a list of strings.
headersmap[string, duration]
Request headers from the webhook.

Once you’ve defined your webhook source, you can write a resolver for processing webhook updates with the @webhook decorator:

from chalk.webhooks import webhook

def fn(message: webhook.Message):

Full example

from pydantic import BaseModel, Field
from chalk.webhooks import webhook, WebhookSource

class WebhookBody(BaseModel):
    webhook_type: str
    item_id: str

class Headers(BaseModel):
    plaid_verification: str = Field(alias="plaid-verification")

source = WebhookSource(

def get_transactions(account: Account.item_id) -> DataFrame[PlaidTransaction]:

def fn(message: source.Message):

    if message.body.webhook_type == "TRANSACTIONS":
        return get_transactions(message.body.item_id)