Server-Sent Events (SSE)
Server-Sent Events(服务端推送事件,简称 SSE)是一种轻量级的服务器向客户端推送实时数据的技术。与 WebSocket 不同,SSE 是单向的(仅服务器到客户端),基于标准 HTTP 协议,使用更加简单。
SSE 概述
SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向 |
| 协议 | HTTP/HTTPS | WS/WSS |
| 自动重连 | 内置支持 | 需手动实现 |
| 断点续传 | 支持(通过 Last-Event-ID) | 不支持 |
| 复杂度 | 简单 | 较复杂 |
| 适用场景 | 实时通知、日志流、进度更新 | 聊天、协作编辑、游戏 |
SSE 适用场景
- 实时通知:系统消息、提醒推送
- 日志流:实时日志输出、调试信息
- 进度更新:文件上传、任务执行进度
- 实时数据流:股票行情、传感器数据
- AI 流式输出:LLM 逐字输出响应
基本使用
FastAPI 0.135.0+ 新特性
FastAPI 0.135.0 内置了 EventSourceResponse,可以轻松创建 SSE 端点:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
items = [
Item(name="Plumbus", description="多功能家用设备"),
Item(name="Portal Gun", description="传送门开启装置"),
Item(name="Meeseeks Box", description="召唤 Meeseeks 的盒子"),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
"""流式返回商品列表"""
for item in items:
yield item
当你 yield 普通对象(字典、Pydantic 模型等)时,FastAPI 会自动将其 JSON 编码并以 SSE data: 字段格式发送。
客户端连接
使用浏览器内置的 EventSource API 连接:
const eventSource = new EventSource("http://localhost:8000/items/stream");
// 监听消息
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log("收到:", data);
};
// 处理错误
eventSource.onerror = function(error) {
console.error("SSE 错误:", error);
eventSource.close();
};
ServerSentEvent 模型
对于更精细的控制,可以使用 ServerSentEvent 模型:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name="商品A", price=32.99),
Item(name="商品B", price=999.99),
Item(name="商品C", price=49.99),
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
"""流式返回带完整 SSE 字段的商品列表"""
# 发送注释(用于保持连接或调试)
yield ServerSentEvent(comment="开始推送商品更新")
for i, item in enumerate(items):
yield ServerSentEvent(
data=item, # 数据负载
event="item_update", # 事件类型
id=str(i + 1), # 事件 ID
retry=5000 # 重连间隔(毫秒)
)
字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
data | Any | 事件数据,自动 JSON 序列化 |
raw_data | str | 原始字符串数据,不进行 JSON 编码 |
event | str | 事件类型名称,用于客户端 addEventListener |
id | str | 事件 ID,用于断点续传 |
retry | int | 重连间隔(毫秒) |
comment | str | 注释,不会发送给客户端 |
监听自定义事件
const eventSource = new EventSource("http://localhost:8000/items/stream");
// 监听自定义事件类型
eventSource.addEventListener("item_update", function(event) {
const item = JSON.parse(event.data);
console.log("商品更新:", item);
});
eventSource.addEventListener("error", function(event) {
const error = JSON.parse(event.data);
console.error("错误:", error);
});
原始数据发送
使用 raw_data 发送非 JSON 数据:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
"""流式返回日志"""
logs = [
"2025-01-01 INFO 应用启动",
"2025-01-01 DEBUG 连接数据库成功",
"2025-01-01 WARN 内存使用率过高",
"2025-01-01 ERROR 服务异常",
]
for log_line in logs:
yield ServerSentEvent(raw_data=log_line)
data 和 raw_data 的区别:
# data:JSON 序列化
yield ServerSentEvent(data="hello")
# 发送: data: "hello" (带引号)
yield ServerSentEvent(data={"msg": "hello"})
# 发送: data: {"msg":"hello"}
# raw_data:原始字符串
yield ServerSentEvent(raw_data="hello")
# 发送: data: hello (不带引号)
断点续传
当客户端断开连接后重连时,可以通过 Last-Event-ID 请求头恢复流:
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items = [
Item(name=f"商品{i}", price=i * 10.0)
for i in range(100)
]
@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header(alias="Last-Event-ID")] = None,
) -> AsyncIterable[ServerSentEvent]:
"""支持断点续传的流式接口"""
# 从上次断开的位置继续
start = last_event_id + 1 if last_event_id is not None else 0
for i, item in enumerate(items):
if i < start:
continue # 跳过已发送的事件
yield ServerSentEvent(
data=item,
id=str(i), # 设置事件 ID
event="item"
)
工作原理:
- 客户端连接时,服务器为每个事件设置
id - 客户端断开时,浏览器自动记录最后收到的事件 ID
- 客户端重连时,自动发送
Last-Event-ID请求头 - 服务器据此从断开位置继续发送
POST 请求支持
SSE 同样支持 POST 请求,适用于需要请求体的流式场景:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class Prompt(BaseModel):
text: str
@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
"""流式聊天接口(模拟 AI 输出)"""
# 模拟逐字输出
words = prompt.text.split()
for word in words:
yield ServerSentEvent(
data={"token": word},
event="token"
)
# 发送完成信号
yield ServerSentEvent(
raw_data="[DONE]",
event="done"
)
客户端使用 fetch API:
const response = await fetch("http://localhost:8000/chat/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ text: "你好,世界" }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
console.log("收到:", chunk);
}
POST 方式的 SSE 适用于:
- AI 流式对话(如 LLM API)
- 需要复杂请求参数的流式场景
- Model Context Protocol (MCP) 等协议
同步生成器
除了异步生成器,也可以使用普通生成器:
from collections.abc import Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
@app.get("/items/stream", response_class=EventSourceResponse)
def sse_items() -> Iterable[Item]:
"""同步生成器示例"""
items = [
Item(name="商品A", description="描述A"),
Item(name="商品B", description="描述B"),
]
for item in items:
yield item
实际应用示例
实时通知系统
import asyncio
from collections.abc import AsyncIterable
from datetime import datetime
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
# 存储活跃连接
active_connections: list = []
@app.get("/notifications/stream", response_class=EventSourceResponse)
async def stream_notifications() -> AsyncIterable[ServerSentEvent]:
"""实时通知流"""
queue = asyncio.Queue()
active_connections.append(queue)
try:
while True:
# 等待新通知
notification = await queue.get()
yield ServerSentEvent(
data=notification,
event="notification"
)
except asyncio.CancelledError:
active_connections.remove(queue)
async def send_notification(message: str):
"""向所有连接发送通知"""
notification = {
"message": message,
"timestamp": datetime.now().isoformat()
}
for queue in active_connections:
await queue.put(notification)
@app.post("/notifications")
async def create_notification(message: str):
"""创建新通知"""
await send_notification(message)
return {"status": "sent"}
任务进度追踪
import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI, BackgroundTasks
from fastapi.sse import EventSourceResponse, ServerSentEvent
import uuid
app = FastAPI()
# 任务进度存储
task_progress: dict[str, int] = {}
task_queues: dict[str, asyncio.Queue] = {}
async def process_task(task_id: str):
"""模拟长时间运行的任务"""
for i in range(101):
task_progress[task_id] = i
if task_id in task_queues:
await task_queues[task_id].put({
"progress": i,
"status": "processing" if i < 100 else "completed"
})
await asyncio.sleep(0.1) # 模拟工作
@app.post("/tasks/start")
async def start_task(background_tasks: BackgroundTasks):
"""启动后台任务"""
task_id = str(uuid.uuid4())
task_queues[task_id] = asyncio.Queue()
task_progress[task_id] = 0
background_tasks.add_task(process_task, task_id)
return {"task_id": task_id}
@app.get("/tasks/{task_id}/progress", response_class=EventSourceResponse)
async def stream_progress(task_id: str) -> AsyncIterable[ServerSentEvent]:
"""流式返回任务进度"""
queue = task_queues.get(task_id)
if not queue:
yield ServerSentEvent(
data={"error": "任务不存在"},
event="error"
)
return
try:
while True:
progress = await queue.get()
yield ServerSentEvent(
data=progress,
event="progress"
)
if progress.get("status") == "completed":
break
finally:
del task_queues[task_id]
AI 流式输出
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
@app.post("/ai/chat/stream", response_class=EventSourceResponse)
async def chat_stream(request: ChatRequest) -> AsyncIterable[ServerSentEvent]:
"""模拟 AI 流式输出"""
# 模拟 AI 回复
response_text = f"收到你的消息:{request.message}。这是一个模拟的 AI 回复,展示流式输出效果。"
# 逐字输出
for char in response_text:
yield ServerSentEvent(
data={"token": char},
event="token"
)
await asyncio.sleep(0.02) # 模拟生成延迟
# 发送完成信号
yield ServerSentEvent(
raw_data="[DONE]",
event="done"
)
自动保活
FastAPI 自动每 15 秒发送保活 ping,防止连接超时。无需手动发送注释事件:
# FastAPI 自动发送保活,无需手动处理
@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
while True:
data = await get_data()
yield ServerSentEvent(data=data)
# FastAPI 自动处理保活
如果需要手动发送注释:
yield ServerSentEvent(comment="保活心跳")
错误处理
服务端错误
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
try:
for i in range(10):
if i == 5:
raise ValueError("模拟错误")
yield ServerSentEvent(data={"count": i})
except Exception as e:
# 发送错误事件
yield ServerSentEvent(
data={"error": str(e)},
event="error"
)
客户端错误处理
const eventSource = new EventSource("http://localhost:8000/stream");
// 监听错误事件
eventSource.addEventListener("error", function(event) {
const error = JSON.parse(event.data);
console.error("服务器错误:", error);
eventSource.close();
});
// 处理连接错误
eventSource.onerror = function(error) {
console.error("连接错误:", error);
// EventSource 会自动重连
};
生产环境注意事项
连接管理
内存存储连接适用于单进程,多进程/多服务器需要使用 Redis 等:
import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.connections: dict[str, list[asyncio.Queue]] = {}
async def connect(self, channel: str) -> asyncio.Queue:
queue = asyncio.Queue()
if channel not in self.connections:
self.connections[channel] = []
self.connections[channel].append(queue)
return queue
def disconnect(self, channel: str, queue: asyncio.Queue):
if channel in self.connections:
self.connections[channel].remove(queue)
async def broadcast(self, channel: str, message: dict):
if channel in self.connections:
for queue in self.connections[channel]:
await queue.put(message)
manager = ConnectionManager()
@app.get("/stream/{channel}", response_class=EventSourceResponse)
async def stream(channel: str) -> AsyncIterable[ServerSentEvent]:
queue = await manager.connect(channel)
try:
while True:
message = await queue.get()
yield ServerSentEvent(data=message)
except asyncio.CancelledError:
manager.disconnect(channel, queue)
代理配置
确保代理服务器支持长连接:
# Nginx 配置
location /stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
}
资源限制
import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
app = FastAPI()
@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
max_events = 1000
count = 0
while count < max_events:
data = await get_data()
yield ServerSentEvent(data=data)
count += 1
# 发送结束事件
yield ServerSentEvent(
data={"message": "流结束"},
event="end"
)
小结
本章学习了 FastAPI 中的 Server-Sent Events (SSE):
- SSE 基础:单向服务器推送,基于 HTTP 协议
- EventSourceResponse:FastAPI 0.135.0+ 内置支持
- ServerSentEvent:精细控制事件字段
- 断点续传:通过 Last-Event-ID 恢复流
- POST 支持:适用于 AI 流式输出等场景
- 实际应用:通知系统、进度追踪、AI 流式输出
SSE 是简单高效的实时通信方案,特别适合服务器推送场景。对于需要双向通信的场景,应考虑使用 WebSocket。
练习
- 实现一个实时日志查看器,支持过滤关键字
- 创建一个任务进度追踪系统,支持多个并发任务
- 实现一个简单的 AI 流式对话接口
- 添加断点续传支持,测试客户端重连恢复