Chalk home page
Docs
SDK
CLI
  1. Streaming
  2. Webhooks

Webhook resolvers function almost exactly like stream resolvers.

The first step is to create a webhook source:

from chalk.webhooks import WebhookSource

source = WebhookSource(id="my_webhook_id")

You can optionally also specify a Pydantic Model for any of the body, headers, and query parameters. Chalk will validate the parameters against the model. If the model doesn’t validate, Chalk will return a 400 with the Pydantic error message to the user.

from pydantic import BaseModel

class WebhookBody(BaseModel):
    username: str
    friends: list[int]
    ...

class QueryParameters(BaseModel):
    environment: str
    ...

class Headers(BaseModel):
    ...

source = WebhookSource(
    id="my_webhook_id",
    body=WebhookBody,
    query=QueryParameters,
    headers=Headers
)

Once you create the source and deploy your change, Chalk will host a URL for your third-party service to hit.

Webhooks will be exposed at the url

https://webhooks.chalk.ai/<client_id>/<environment>/<id>

After creating your webhook source, you can start processing messages and creating feature values. The webhook object above named source has an attribute named Message that gives a type you can use in your resolver, described below:

webhook.Message

Attributes
bodyBaseModel | str
The body of the webhook. If you provided a Pydantic model in describing the webhook, this will be a model. Otherwise, it will be the string contents of the body.
argsdict[str, str | list[str]]
The query parameters attached to the URL. If the same query arg is used more than once, the value in the dict will be a list of strings.
headersmap[string, duration]
Request headers from the webhook.

Once you’ve defined your webhook source, you can write a resolver for processing webhook updates with the @webhook decorator:

from chalk.webhooks import webhook

@webhook
def fn(message: webhook.Message):
    ...

Full example

from pydantic import BaseModel, Field
from chalk.webhooks import webhook, WebhookSource

class WebhookBody(BaseModel):
    webhook_type: str
    item_id: str


class Headers(BaseModel):
    plaid_verification: str = Field(alias="plaid-verification")

source = WebhookSource(
    id="plaid_transactions",
    body=WebhookBody,
    headers=Headers
)

@online
def get_transactions(account: Account.item_id) -> DataFrame[PlaidTransaction]:
    ...

@webhook
def fn(message: source.Message):
    verify_webhook(message.headers.plaid_verification)

    if message.body.webhook_type == "TRANSACTIONS":
        return get_transactions(message.body.item_id)