OpenAPI Webhooks
Webhook 是一种事件通知机制,当你的应用发生某些事件时,会主动向用户指定的 URL 发送 HTTP 请求。FastAPI 支持在 OpenAPI 文档中声明 Webhook,让用户了解你的应用可能发送哪些请求。
什么是 Webhook
传统的 API 是用户向你的应用发送请求,而 Webhook 则相反——是你的应用向用户的服务发送请求。
Webhook 工作流程
Webhook 的典型场景
- 支付通知:支付成功后通知商户
- 订单状态:订单状态变更时推送
- 用户事件:用户注册、注销等事件
- 系统告警:异常情况主动通知
- 数据同步:数据变更后触发同步
Webhook 的实现方式
实现 Webhook 需要两部分:
- 定义 Webhook 结构:声明你的应用会发送什么样的请求(FastAPI/OpenAPI 负责)
- 实现业务逻辑:
- 用户在哪里配置接收 URL
- 何时发送请求
- 如何发送请求
FastAPI 只负责第一部分——在 OpenAPI 文档中声明 Webhook,让用户知道你的应用可能发送哪些类型的请求。
在 FastAPI 中定义 Webhook
基本使用
使用 app.webhooks 属性定义 Webhook,语法与定义路径操作类似:
from datetime import datetime
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
app = FastAPI()
# Webhook 数据模型
class Subscription(BaseModel):
"""订阅事件数据"""
username: str
monthly_fee: float
start_date: datetime
class UserDeleted(BaseModel):
"""用户删除事件数据"""
user_id: int
email: EmailStr
deleted_at: datetime
# 定义 Webhook
@app.webhooks.post("new-subscription")
def new_subscription(body: Subscription):
"""
当用户订阅服务时,我们会向你在控制面板配置的 URL
发送 POST 请求,包含订阅详情。
"""
pass # 这里不需要实现,仅用于文档
@app.webhooks.delete("user-deleted")
def user_deleted(body: UserDeleted):
"""
当用户删除账号时发送此事件。
"""
pass
# 正常的 API 路由
@app.get("/users/")
def read_users():
return ["Rick", "Morty"]
关键点说明
- 使用
app.webhooks:这是一个APIRouter,用法与app.get/post类似 - 不定义路径:
@app.webhooks.post("new-subscription")中的字符串是事件名称,不是路径 - 函数体留空:仅用于文档生成,实际发送逻辑需要单独实现
为什么不定义路径
因为 URL 路径是由用户在配置面板中指定的,你的应用不知道具体路径,只知道事件名称和数据格式。
OpenAPI 文档效果
定义 Webhook 后,访问 /docs 可以看到:
- 常规路径操作:你的 API 接收的请求
- Webhooks 部分:你的应用可能发送的请求
用户可以根据这些文档,实现接收 Webhook 的接口。
OpenAPI Schema 示例
{
"openapi": "3.1.0",
"webhooks": {
"new-subscription": {
"post": {
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/Subscription"
}
}
}
},
"responses": {
"200": {
"description": "成功接收"
}
}
}
}
}
}
完整示例:订阅系统
from datetime import datetime
from typing import Annotated
import httpx
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
app = FastAPI()
# ============ 模型定义 ============
class Subscription(BaseModel):
"""订阅事件"""
username: str
email: EmailStr
monthly_fee: float
start_date: datetime
class SubscriptionCancelled(BaseModel):
"""取消订阅事件"""
username: str
email: EmailStr
cancelled_at: datetime
reason: str | None = None
class WebhookConfig(BaseModel):
"""Webhook 配置"""
event: str
url: str
secret: str | None = None
# ============ Webhook 定义 ============
@app.webhooks.post("subscription.created")
def subscription_created(body: Subscription):
"""
新用户订阅时触发。
包含用户名、邮箱、月费和开始日期。
"""
pass
@app.webhooks.post("subscription.cancelled")
def subscription_cancelled(body: SubscriptionCancelled):
"""
用户取消订阅时触发。
包含取消原因和取消时间。
"""
pass
# ============ 模拟数据库 ============
# 存储 Webhook 配置(实际应用应使用数据库)
webhook_configs: dict[str, list[WebhookConfig]] = {
"subscription.created": [],
"subscription.cancelled": [],
}
# ============ API 路由 ============
@app.post("/webhooks/config")
async def configure_webhook(config: WebhookConfig):
"""配置 Webhook URL"""
if config.event not in webhook_configs:
raise HTTPException(400, "不支持的事件类型")
webhook_configs[config.event].append(config)
return {"message": "配置成功", "event": config.event}
@app.get("/webhooks/config")
async def list_webhooks():
"""列出所有 Webhook 配置"""
return webhook_configs
# ============ 发送 Webhook 的工具函数 ============
async def send_webhook(event: str, data: BaseModel):
"""发送 Webhook 到所有配置的 URL"""
configs = webhook_configs.get(event, [])
async with httpx.AsyncClient() as client:
for config in configs:
try:
response = await client.post(
config.url,
json=data.model_dump(mode="json"),
headers={
"X-Webhook-Event": event,
"X-Webhook-Secret": config.secret or "",
"Content-Type": "application/json"
},
timeout=10.0
)
print(f"Webhook {event} -> {config.url}: {response.status_code}")
except Exception as e:
print(f"Webhook 发送失败: {e}")
# ============ 业务路由 ============
@app.post("/subscribe")
async def subscribe(username: str, email: EmailStr, fee: float):
"""用户订阅"""
subscription = Subscription(
username=username,
email=email,
monthly_fee=fee,
start_date=datetime.now()
)
# 发送 Webhook
await send_webhook("subscription.created", subscription)
return {"message": "订阅成功", "username": username}
@app.post("/cancel")
async def cancel_subscription(username: str, reason: str | None = None):
"""取消订阅"""
cancelled = SubscriptionCancelled(
username=username,
email=f"{username}@example.com", # 实际应从数据库获取
cancelled_at=datetime.now(),
reason=reason
)
# 发送 Webhook
await send_webhook("subscription.cancelled", cancelled)
return {"message": "已取消订阅", "username": username}
安全最佳实践
1. 签名验证
发送 Webhook 时添加签名,让接收方验证请求来源:
import hmac
import hashlib
import json
def generate_signature(secret: str, payload: str) -> str:
"""生成签名"""
return hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
async def send_webhook_with_signature(event: str, data: BaseModel, config: WebhookConfig):
"""带签名的 Webhook 发送"""
payload = data.model_dump_json()
signature = generate_signature(config.secret, payload)
async with httpx.AsyncClient() as client:
await client.post(
config.url,
content=payload,
headers={
"X-Webhook-Signature": signature,
"X-Webhook-Event": event,
"Content-Type": "application/json"
}
)
接收方验证:
from fastapi import Request, HTTPException
def verify_signature(request: Request, secret: str):
"""验证签名中间件"""
signature = request.headers.get("X-Webhook-Signature")
if not signature:
raise HTTPException(401, "缺少签名")
# 获取请求体并验证
body = await request.body()
expected = generate_signature(secret, body.decode())
if not hmac.compare_digest(signature, expected):
raise HTTPException(401, "签名验证失败")
2. 重试机制
import asyncio
from typing import Callable
async def send_with_retry(
url: str,
data: dict,
max_retries: int = 3,
backoff: float = 1.0
):
"""带重试的发送"""
async with httpx.AsyncClient() as client:
for attempt in range(max_retries):
try:
response = await client.post(url, json=data, timeout=10.0)
if response.status_code < 500:
return response
except httpx.RequestError:
pass
if attempt < max_retries - 1:
await asyncio.sleep(backoff * (2 ** attempt))
raise Exception(f"Webhook 发送失败: {url}")
3. 幂等性处理
使用事件 ID 避免重复处理:
import uuid
class WebhookPayload(BaseModel):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str
timestamp: datetime = Field(default_factory=datetime.now)
data: dict
# 接收方处理
processed_events: set[str] = set()
async def handle_webhook(payload: WebhookPayload):
if payload.event_id in processed_events:
return {"status": "already_processed"}
# 处理事件
# ...
processed_events.add(payload.event_id)
return {"status": "success"}
Webhook vs 其他方案
| 方案 | 适用场景 | 特点 |
|---|---|---|
| Webhook | 事件通知 | 单向推送,用户需要公网服务器 |
| WebSocket | 实时通信 | 双向,需要保持连接 |
| SSE | 实时数据流 | 单向,服务器推送 |
| 轮询 | 简单场景 | 客户端定期查询,效率低 |
Webhook 的优势:
- 用户无需保持连接
- 适合异步事件通知
- 用户可以灵活选择处理方式
Webhook 的局限:
- 需要用户有公网可访问的服务器
- 需要处理重试和失败情况
- 实时性依赖发送方的可靠性
小结
本章学习了 FastAPI 中的 OpenAPI Webhooks:
- Webhook 概念:应用主动向用户发送请求的事件通知机制
- 定义 Webhook:使用
app.webhooks声明事件结构 - OpenAPI 文档:Webhook 信息自动出现在 API 文档中
- 实现发送逻辑:需要自行实现配置存储和请求发送
- 安全考虑:签名验证、重试机制、幂等性处理
Webhook 的关键点:
- FastAPI 只负责文档声明,业务逻辑需要自行实现
- URL 由用户配置,不是硬编码的路径
- 事件名称用于区分不同类型的 Webhook
练习
- 创建一个电商订单系统的 Webhook 定义,包含订单创建、支付完成、发货等事件
- 实现带签名验证的 Webhook 接收端
- 设计一个重试队列,处理发送失败的 Webhook
- 实现一个简单的 Webhook 日志记录系统