跳到主要内容

Server-Sent Events (SSE)

Server-Sent Events(服务端推送事件,简称 SSE)是一种轻量级的服务器向客户端推送实时数据的技术。与 WebSocket 不同,SSE 是单向的(仅服务器到客户端),基于标准 HTTP 协议,使用更加简单。

SSE 概述

SSE vs WebSocket

特性SSEWebSocket
通信方向单向(服务器→客户端)双向
协议HTTP/HTTPSWS/WSS
自动重连内置支持需手动实现
断点续传支持(通过 Last-Event-ID)不支持
复杂度简单较复杂
适用场景实时通知、日志流、进度更新聊天、协作编辑、游戏

SSE 适用场景

  • 实时通知:系统消息、提醒推送
  • 日志流:实时日志输出、调试信息
  • 进度更新:文件上传、任务执行进度
  • 实时数据流:股票行情、传感器数据
  • AI 流式输出:LLM 逐字输出响应

基本使用

FastAPI 0.135.0+ 新特性

FastAPI 0.135.0 内置了 EventSourceResponse,可以轻松创建 SSE 端点:

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
description: str | None = None

items = [
Item(name="Plumbus", description="多功能家用设备"),
Item(name="Portal Gun", description="传送门开启装置"),
Item(name="Meeseeks Box", description="召唤 Meeseeks 的盒子"),
]

@app.get("/items/stream", response_class=EventSourceResponse)
async def sse_items() -> AsyncIterable[Item]:
"""流式返回商品列表"""
for item in items:
yield item

当你 yield 普通对象(字典、Pydantic 模型等)时,FastAPI 会自动将其 JSON 编码并以 SSE data: 字段格式发送。

客户端连接

使用浏览器内置的 EventSource API 连接:

const eventSource = new EventSource("http://localhost:8000/items/stream");

// 监听消息
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log("收到:", data);
};

// 处理错误
eventSource.onerror = function(error) {
console.error("SSE 错误:", error);
eventSource.close();
};

ServerSentEvent 模型

对于更精细的控制,可以使用 ServerSentEvent 模型:

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
price: float

items = [
Item(name="商品A", price=32.99),
Item(name="商品B", price=999.99),
Item(name="商品C", price=49.99),
]

@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items() -> AsyncIterable[ServerSentEvent]:
"""流式返回带完整 SSE 字段的商品列表"""
# 发送注释(用于保持连接或调试)
yield ServerSentEvent(comment="开始推送商品更新")

for i, item in enumerate(items):
yield ServerSentEvent(
data=item, # 数据负载
event="item_update", # 事件类型
id=str(i + 1), # 事件 ID
retry=5000 # 重连间隔(毫秒)
)

字段说明

字段类型说明
dataAny事件数据,自动 JSON 序列化
raw_datastr原始字符串数据,不进行 JSON 编码
eventstr事件类型名称,用于客户端 addEventListener
idstr事件 ID,用于断点续传
retryint重连间隔(毫秒)
commentstr注释,不会发送给客户端

监听自定义事件

const eventSource = new EventSource("http://localhost:8000/items/stream");

// 监听自定义事件类型
eventSource.addEventListener("item_update", function(event) {
const item = JSON.parse(event.data);
console.log("商品更新:", item);
});

eventSource.addEventListener("error", function(event) {
const error = JSON.parse(event.data);
console.error("错误:", error);
});

原始数据发送

使用 raw_data 发送非 JSON 数据:

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()

@app.get("/logs/stream", response_class=EventSourceResponse)
async def stream_logs() -> AsyncIterable[ServerSentEvent]:
"""流式返回日志"""
logs = [
"2025-01-01 INFO 应用启动",
"2025-01-01 DEBUG 连接数据库成功",
"2025-01-01 WARN 内存使用率过高",
"2025-01-01 ERROR 服务异常",
]

for log_line in logs:
yield ServerSentEvent(raw_data=log_line)

dataraw_data 的区别:

# data:JSON 序列化
yield ServerSentEvent(data="hello")
# 发送: data: "hello" (带引号)

yield ServerSentEvent(data={"msg": "hello"})
# 发送: data: {"msg":"hello"}

# raw_data:原始字符串
yield ServerSentEvent(raw_data="hello")
# 发送: data: hello (不带引号)

断点续传

当客户端断开连接后重连时,可以通过 Last-Event-ID 请求头恢复流:

from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Header
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
price: float

items = [
Item(name=f"商品{i}", price=i * 10.0)
for i in range(100)
]

@app.get("/items/stream", response_class=EventSourceResponse)
async def stream_items(
last_event_id: Annotated[int | None, Header(alias="Last-Event-ID")] = None,
) -> AsyncIterable[ServerSentEvent]:
"""支持断点续传的流式接口"""
# 从上次断开的位置继续
start = last_event_id + 1 if last_event_id is not None else 0

for i, item in enumerate(items):
if i < start:
continue # 跳过已发送的事件

yield ServerSentEvent(
data=item,
id=str(i), # 设置事件 ID
event="item"
)

工作原理:

  1. 客户端连接时,服务器为每个事件设置 id
  2. 客户端断开时,浏览器自动记录最后收到的事件 ID
  3. 客户端重连时,自动发送 Last-Event-ID 请求头
  4. 服务器据此从断开位置继续发送

POST 请求支持

SSE 同样支持 POST 请求,适用于需要请求体的流式场景:

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()

class Prompt(BaseModel):
text: str

@app.post("/chat/stream", response_class=EventSourceResponse)
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
"""流式聊天接口(模拟 AI 输出)"""
# 模拟逐字输出
words = prompt.text.split()

for word in words:
yield ServerSentEvent(
data={"token": word},
event="token"
)

# 发送完成信号
yield ServerSentEvent(
raw_data="[DONE]",
event="done"
)

客户端使用 fetch API:

const response = await fetch("http://localhost:8000/chat/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ text: "你好,世界" }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
console.log("收到:", chunk);
}

POST 方式的 SSE 适用于:

  • AI 流式对话(如 LLM API)
  • 需要复杂请求参数的流式场景
  • Model Context Protocol (MCP) 等协议

同步生成器

除了异步生成器,也可以使用普通生成器:

from collections.abc import Iterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
name: str
description: str | None = None

@app.get("/items/stream", response_class=EventSourceResponse)
def sse_items() -> Iterable[Item]:
"""同步生成器示例"""
items = [
Item(name="商品A", description="描述A"),
Item(name="商品B", description="描述B"),
]

for item in items:
yield item

实际应用示例

实时通知系统

import asyncio
from collections.abc import AsyncIterable
from datetime import datetime
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()

# 存储活跃连接
active_connections: list = []

@app.get("/notifications/stream", response_class=EventSourceResponse)
async def stream_notifications() -> AsyncIterable[ServerSentEvent]:
"""实时通知流"""
queue = asyncio.Queue()
active_connections.append(queue)

try:
while True:
# 等待新通知
notification = await queue.get()
yield ServerSentEvent(
data=notification,
event="notification"
)
except asyncio.CancelledError:
active_connections.remove(queue)

async def send_notification(message: str):
"""向所有连接发送通知"""
notification = {
"message": message,
"timestamp": datetime.now().isoformat()
}

for queue in active_connections:
await queue.put(notification)

@app.post("/notifications")
async def create_notification(message: str):
"""创建新通知"""
await send_notification(message)
return {"status": "sent"}

任务进度追踪

import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI, BackgroundTasks
from fastapi.sse import EventSourceResponse, ServerSentEvent
import uuid

app = FastAPI()

# 任务进度存储
task_progress: dict[str, int] = {}
task_queues: dict[str, asyncio.Queue] = {}

async def process_task(task_id: str):
"""模拟长时间运行的任务"""
for i in range(101):
task_progress[task_id] = i

if task_id in task_queues:
await task_queues[task_id].put({
"progress": i,
"status": "processing" if i < 100 else "completed"
})

await asyncio.sleep(0.1) # 模拟工作

@app.post("/tasks/start")
async def start_task(background_tasks: BackgroundTasks):
"""启动后台任务"""
task_id = str(uuid.uuid4())
task_queues[task_id] = asyncio.Queue()
task_progress[task_id] = 0

background_tasks.add_task(process_task, task_id)

return {"task_id": task_id}

@app.get("/tasks/{task_id}/progress", response_class=EventSourceResponse)
async def stream_progress(task_id: str) -> AsyncIterable[ServerSentEvent]:
"""流式返回任务进度"""
queue = task_queues.get(task_id)

if not queue:
yield ServerSentEvent(
data={"error": "任务不存在"},
event="error"
)
return

try:
while True:
progress = await queue.get()
yield ServerSentEvent(
data=progress,
event="progress"
)

if progress.get("status") == "completed":
break
finally:
del task_queues[task_id]

AI 流式输出

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
message: str

@app.post("/ai/chat/stream", response_class=EventSourceResponse)
async def chat_stream(request: ChatRequest) -> AsyncIterable[ServerSentEvent]:
"""模拟 AI 流式输出"""
# 模拟 AI 回复
response_text = f"收到你的消息:{request.message}。这是一个模拟的 AI 回复,展示流式输出效果。"

# 逐字输出
for char in response_text:
yield ServerSentEvent(
data={"token": char},
event="token"
)
await asyncio.sleep(0.02) # 模拟生成延迟

# 发送完成信号
yield ServerSentEvent(
raw_data="[DONE]",
event="done"
)

自动保活

FastAPI 自动每 15 秒发送保活 ping,防止连接超时。无需手动发送注释事件:

# FastAPI 自动发送保活,无需手动处理
@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
while True:
data = await get_data()
yield ServerSentEvent(data=data)
# FastAPI 自动处理保活

如果需要手动发送注释:

yield ServerSentEvent(comment="保活心跳")

错误处理

服务端错误

from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()

@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
try:
for i in range(10):
if i == 5:
raise ValueError("模拟错误")

yield ServerSentEvent(data={"count": i})

except Exception as e:
# 发送错误事件
yield ServerSentEvent(
data={"error": str(e)},
event="error"
)

客户端错误处理

const eventSource = new EventSource("http://localhost:8000/stream");

// 监听错误事件
eventSource.addEventListener("error", function(event) {
const error = JSON.parse(event.data);
console.error("服务器错误:", error);
eventSource.close();
});

// 处理连接错误
eventSource.onerror = function(error) {
console.error("连接错误:", error);
// EventSource 会自动重连
};

生产环境注意事项

连接管理

内存存储连接适用于单进程,多进程/多服务器需要使用 Redis 等:

import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()

class ConnectionManager:
def __init__(self):
self.connections: dict[str, list[asyncio.Queue]] = {}

async def connect(self, channel: str) -> asyncio.Queue:
queue = asyncio.Queue()
if channel not in self.connections:
self.connections[channel] = []
self.connections[channel].append(queue)
return queue

def disconnect(self, channel: str, queue: asyncio.Queue):
if channel in self.connections:
self.connections[channel].remove(queue)

async def broadcast(self, channel: str, message: dict):
if channel in self.connections:
for queue in self.connections[channel]:
await queue.put(message)

manager = ConnectionManager()

@app.get("/stream/{channel}", response_class=EventSourceResponse)
async def stream(channel: str) -> AsyncIterable[ServerSentEvent]:
queue = await manager.connect(channel)

try:
while True:
message = await queue.get()
yield ServerSentEvent(data=message)
except asyncio.CancelledError:
manager.disconnect(channel, queue)

代理配置

确保代理服务器支持长连接:

# Nginx 配置
location /stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
}

资源限制

import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI
from fastapi.sse import EventSourceResponse, ServerSentEvent

app = FastAPI()

@app.get("/stream", response_class=EventSourceResponse)
async def stream() -> AsyncIterable[ServerSentEvent]:
max_events = 1000
count = 0

while count < max_events:
data = await get_data()
yield ServerSentEvent(data=data)
count += 1

# 发送结束事件
yield ServerSentEvent(
data={"message": "流结束"},
event="end"
)

小结

本章学习了 FastAPI 中的 Server-Sent Events (SSE):

  1. SSE 基础:单向服务器推送,基于 HTTP 协议
  2. EventSourceResponse:FastAPI 0.135.0+ 内置支持
  3. ServerSentEvent:精细控制事件字段
  4. 断点续传:通过 Last-Event-ID 恢复流
  5. POST 支持:适用于 AI 流式输出等场景
  6. 实际应用:通知系统、进度追踪、AI 流式输出

SSE 是简单高效的实时通信方案,特别适合服务器推送场景。对于需要双向通信的场景,应考虑使用 WebSocket。

练习

  1. 实现一个实时日志查看器,支持过滤关键字
  2. 创建一个任务进度追踪系统,支持多个并发任务
  3. 实现一个简单的 AI 流式对话接口
  4. 添加断点续传支持,测试客户端重连恢复