Streaming
Handling webhooks with Chalk.
Webhook resolvers function almost exactly like stream resolvers.
The first step is to create a webhook source:
from chalk.webhooks import WebhookSource
source = WebhookSource(id="my_webhook_id")
You can optionally also specify a Pydantic Model for any of the body, headers, and query parameters. Chalk will validate the parameters against the model. If the model doesn’t validate, Chalk will return a 400 with the Pydantic error message to the user.
from pydantic import BaseModel
class WebhookBody(BaseModel):
username: str
friends: list[int]
...
class QueryParameters(BaseModel):
environment: str
...
class Headers(BaseModel):
...
source = WebhookSource(
id="my_webhook_id",
body=WebhookBody,
query=QueryParameters,
headers=Headers
)
Once you create the source and deploy your change, Chalk will host a URL for your third-party service to hit.
Webhooks will be exposed at the url
https://webhooks.chalk.ai/<client_id>/<environment>/<id>
After creating your webhook source,
you can start processing messages and creating feature values.
The webhook object above named source
has an attribute
named Message
that gives a type you can use in your resolver,
described below:
bodyBaseModel | str
argsdict[str, str | list[str]]
dict
will be a list of strings.headersmap[string, duration]
Once you’ve defined your webhook source,
you can write a resolver for processing
webhook updates with the @webhook
decorator:
from chalk.webhooks import webhook
@webhook
def fn(message: webhook.Message):
...
from pydantic import BaseModel, Field
from chalk.webhooks import webhook, WebhookSource
class WebhookBody(BaseModel):
webhook_type: str
item_id: str
class Headers(BaseModel):
plaid_verification: str = Field(alias="plaid-verification")
source = WebhookSource(
id="plaid_transactions",
body=WebhookBody,
headers=Headers
)
@online
def get_transactions(account: Account.item_id) -> DataFrame[PlaidTransaction]:
...
@webhook
def fn(message: source.Message):
verify_webhook(message.headers.plaid_verification)
if message.body.webhook_type == "TRANSACTIONS":
return get_transactions(message.body.item_id)