跳到主要内容

OpenAPI Webhooks

Webhook 是一种事件通知机制,当你的应用发生某些事件时,会主动向用户指定的 URL 发送 HTTP 请求。FastAPI 支持在 OpenAPI 文档中声明 Webhook,让用户了解你的应用可能发送哪些请求。

什么是 Webhook

传统的 API 是用户向你的应用发送请求,而 Webhook 则相反——是你的应用向用户的服务发送请求。

Webhook 工作流程

Webhook 的典型场景

  • 支付通知:支付成功后通知商户
  • 订单状态:订单状态变更时推送
  • 用户事件:用户注册、注销等事件
  • 系统告警:异常情况主动通知
  • 数据同步:数据变更后触发同步

Webhook 的实现方式

实现 Webhook 需要两部分:

  1. 定义 Webhook 结构:声明你的应用会发送什么样的请求(FastAPI/OpenAPI 负责)
  2. 实现业务逻辑
    • 用户在哪里配置接收 URL
    • 何时发送请求
    • 如何发送请求

FastAPI 只负责第一部分——在 OpenAPI 文档中声明 Webhook,让用户知道你的应用可能发送哪些类型的请求。

在 FastAPI 中定义 Webhook

基本使用

使用 app.webhooks 属性定义 Webhook,语法与定义路径操作类似:

from datetime import datetime
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr

app = FastAPI()

# Webhook 数据模型
class Subscription(BaseModel):
"""订阅事件数据"""
username: str
monthly_fee: float
start_date: datetime

class UserDeleted(BaseModel):
"""用户删除事件数据"""
user_id: int
email: EmailStr
deleted_at: datetime

# 定义 Webhook
@app.webhooks.post("new-subscription")
def new_subscription(body: Subscription):
"""
当用户订阅服务时,我们会向你在控制面板配置的 URL
发送 POST 请求,包含订阅详情。
"""
pass # 这里不需要实现,仅用于文档

@app.webhooks.delete("user-deleted")
def user_deleted(body: UserDeleted):
"""
当用户删除账号时发送此事件。
"""
pass

# 正常的 API 路由
@app.get("/users/")
def read_users():
return ["Rick", "Morty"]

关键点说明

  1. 使用 app.webhooks:这是一个 APIRouter,用法与 app.get/post 类似
  2. 不定义路径@app.webhooks.post("new-subscription") 中的字符串是事件名称,不是路径
  3. 函数体留空:仅用于文档生成,实际发送逻辑需要单独实现

为什么不定义路径

因为 URL 路径是由用户在配置面板中指定的,你的应用不知道具体路径,只知道事件名称和数据格式。

OpenAPI 文档效果

定义 Webhook 后,访问 /docs 可以看到:

  • 常规路径操作:你的 API 接收的请求
  • Webhooks 部分:你的应用可能发送的请求

用户可以根据这些文档,实现接收 Webhook 的接口。

OpenAPI Schema 示例

{
"openapi": "3.1.0",
"webhooks": {
"new-subscription": {
"post": {
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Subscription"
}
}
}
},
"responses": {
"200": {
"description": "成功接收"
}
}
}
}
}
}

完整示例:订阅系统

from datetime import datetime
from typing import Annotated
import httpx

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr

app = FastAPI()

# ============ 模型定义 ============

class Subscription(BaseModel):
"""订阅事件"""
username: str
email: EmailStr
monthly_fee: float
start_date: datetime

class SubscriptionCancelled(BaseModel):
"""取消订阅事件"""
username: str
email: EmailStr
cancelled_at: datetime
reason: str | None = None

class WebhookConfig(BaseModel):
"""Webhook 配置"""
event: str
url: str
secret: str | None = None

# ============ Webhook 定义 ============

@app.webhooks.post("subscription.created")
def subscription_created(body: Subscription):
"""
新用户订阅时触发。

包含用户名、邮箱、月费和开始日期。
"""
pass

@app.webhooks.post("subscription.cancelled")
def subscription_cancelled(body: SubscriptionCancelled):
"""
用户取消订阅时触发。

包含取消原因和取消时间。
"""
pass

# ============ 模拟数据库 ============

# 存储 Webhook 配置(实际应用应使用数据库)
webhook_configs: dict[str, list[WebhookConfig]] = {
"subscription.created": [],
"subscription.cancelled": [],
}

# ============ API 路由 ============

@app.post("/webhooks/config")
async def configure_webhook(config: WebhookConfig):
"""配置 Webhook URL"""
if config.event not in webhook_configs:
raise HTTPException(400, "不支持的事件类型")

webhook_configs[config.event].append(config)
return {"message": "配置成功", "event": config.event}

@app.get("/webhooks/config")
async def list_webhooks():
"""列出所有 Webhook 配置"""
return webhook_configs

# ============ 发送 Webhook 的工具函数 ============

async def send_webhook(event: str, data: BaseModel):
"""发送 Webhook 到所有配置的 URL"""
configs = webhook_configs.get(event, [])

async with httpx.AsyncClient() as client:
for config in configs:
try:
response = await client.post(
config.url,
json=data.model_dump(mode="json"),
headers={
"X-Webhook-Event": event,
"X-Webhook-Secret": config.secret or "",
"Content-Type": "application/json"
},
timeout=10.0
)
print(f"Webhook {event} -> {config.url}: {response.status_code}")
except Exception as e:
print(f"Webhook 发送失败: {e}")

# ============ 业务路由 ============

@app.post("/subscribe")
async def subscribe(username: str, email: EmailStr, fee: float):
"""用户订阅"""
subscription = Subscription(
username=username,
email=email,
monthly_fee=fee,
start_date=datetime.now()
)

# 发送 Webhook
await send_webhook("subscription.created", subscription)

return {"message": "订阅成功", "username": username}

@app.post("/cancel")
async def cancel_subscription(username: str, reason: str | None = None):
"""取消订阅"""
cancelled = SubscriptionCancelled(
username=username,
email=f"{username}@example.com", # 实际应从数据库获取
cancelled_at=datetime.now(),
reason=reason
)

# 发送 Webhook
await send_webhook("subscription.cancelled", cancelled)

return {"message": "已取消订阅", "username": username}

安全最佳实践

1. 签名验证

发送 Webhook 时添加签名,让接收方验证请求来源:

import hmac
import hashlib
import json

def generate_signature(secret: str, payload: str) -> str:
"""生成签名"""
return hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()

async def send_webhook_with_signature(event: str, data: BaseModel, config: WebhookConfig):
"""带签名的 Webhook 发送"""
payload = data.model_dump_json()
signature = generate_signature(config.secret, payload)

async with httpx.AsyncClient() as client:
await client.post(
config.url,
content=payload,
headers={
"X-Webhook-Signature": signature,
"X-Webhook-Event": event,
"Content-Type": "application/json"
}
)

接收方验证:

from fastapi import Request, HTTPException

def verify_signature(request: Request, secret: str):
"""验证签名中间件"""
signature = request.headers.get("X-Webhook-Signature")
if not signature:
raise HTTPException(401, "缺少签名")

# 获取请求体并验证
body = await request.body()
expected = generate_signature(secret, body.decode())

if not hmac.compare_digest(signature, expected):
raise HTTPException(401, "签名验证失败")

2. 重试机制

import asyncio
from typing import Callable

async def send_with_retry(
url: str,
data: dict,
max_retries: int = 3,
backoff: float = 1.0
):
"""带重试的发送"""
async with httpx.AsyncClient() as client:
for attempt in range(max_retries):
try:
response = await client.post(url, json=data, timeout=10.0)
if response.status_code < 500:
return response
except httpx.RequestError:
pass

if attempt < max_retries - 1:
await asyncio.sleep(backoff * (2 ** attempt))

raise Exception(f"Webhook 发送失败: {url}")

3. 幂等性处理

使用事件 ID 避免重复处理:

import uuid

class WebhookPayload(BaseModel):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str
timestamp: datetime = Field(default_factory=datetime.now)
data: dict

# 接收方处理
processed_events: set[str] = set()

async def handle_webhook(payload: WebhookPayload):
if payload.event_id in processed_events:
return {"status": "already_processed"}

# 处理事件
# ...

processed_events.add(payload.event_id)
return {"status": "success"}

Webhook vs 其他方案

方案适用场景特点
Webhook事件通知单向推送,用户需要公网服务器
WebSocket实时通信双向,需要保持连接
SSE实时数据流单向,服务器推送
轮询简单场景客户端定期查询,效率低

Webhook 的优势:

  • 用户无需保持连接
  • 适合异步事件通知
  • 用户可以灵活选择处理方式

Webhook 的局限:

  • 需要用户有公网可访问的服务器
  • 需要处理重试和失败情况
  • 实时性依赖发送方的可靠性

小结

本章学习了 FastAPI 中的 OpenAPI Webhooks:

  1. Webhook 概念:应用主动向用户发送请求的事件通知机制
  2. 定义 Webhook:使用 app.webhooks 声明事件结构
  3. OpenAPI 文档:Webhook 信息自动出现在 API 文档中
  4. 实现发送逻辑:需要自行实现配置存储和请求发送
  5. 安全考虑:签名验证、重试机制、幂等性处理

Webhook 的关键点:

  • FastAPI 只负责文档声明,业务逻辑需要自行实现
  • URL 由用户配置,不是硬编码的路径
  • 事件名称用于区分不同类型的 Webhook

练习

  1. 创建一个电商订单系统的 Webhook 定义,包含订单创建、支付完成、发货等事件
  2. 实现带签名验证的 Webhook 接收端
  3. 设计一个重试队列,处理发送失败的 Webhook
  4. 实现一个简单的 Webhook 日志记录系统