流式 JSON Lines
FastAPI 0.134.0 新增了对流式 JSON Lines 的原生支持。通过使用 yield 语法,可以高效地逐条发送 JSON 数据,适用于 AI LLM 服务、日志流、遥测数据等场景。
什么是流式传输
流式传输(Streaming)意味着应用程序开始向客户端发送数据项,而无需等待整个数据序列准备完毕。服务器发送第一条数据,客户端接收并开始处理,同时服务器可能仍在生成下一条数据。
这种方式甚至可以用于无限流,持续不断地发送数据。
JSON Lines 格式
JSON Lines(也称 NDJSON)是一种文件格式,每行一个 JSON 对象。响应的 Content-Type 为 application/jsonl(而非 application/json),格式如下:
{"name": "Plumbus", "description": "多功能家用设备"}
{"name": "Portal Gun", "description": "传送门开启装置"}
{"name": "Meeseeks Box", "description": "召唤 Meeseeks 的盒子"}
与 JSON 数组的区别:
- JSON 数组:包裹在
[]中,元素间用,分隔 - JSON Lines:每行一个独立的 JSON 对象,用换行符分隔
重要提示:由于每个 JSON 对象用换行符分隔,内容中不能包含字面换行符,但可以包含转义的换行符(\n),这是 JSON 标准的一部分。
适用场景
流式 JSON Lines 特别适合以下场景:
- AI LLM 服务:流式输出 AI 生成的内容
- 日志流:实时推送日志数据
- 遥测数据:传感器数据、监控指标
- 大数据导出:逐条输出大量数据,避免内存溢出
基本使用
使用 yield 流式返回数据
在路径操作函数中使用 yield 代替 return,逐条生成数据:
from collections.abc import AsyncIterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
"""商品模型"""
name: str
description: str | None = None
# 模拟数据源
items = [
Item(name="Plumbus", description="多功能家用设备"),
Item(name="Portal Gun", description="传送门开启装置"),
Item(name="Meeseeks Box", description="召唤 Meeseeks 的盒子"),
]
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
"""流式返回商品列表"""
for item in items:
yield item
返回类型声明
声明返回类型的好处:
| 功能 | 说明 |
|---|---|
| 数据验证 | FastAPI 验证每个 yielded 项是否符合模型 |
| OpenAPI 文档 | 自动生成正确的文档 |
| 数据过滤 | 自动过滤未声明的字段 |
| 性能优化 | Pydantic 使用 Rust 进行序列化,性能更高 |
异步 vs 同步函数
FastAPI 智能处理异步和同步生成器:
from collections.abc import AsyncIterable, Iterable
# 异步生成器(推荐)
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# 同步生成器
@app.get("/items/stream-sync")
def stream_items_sync() -> Iterable[Item]:
for item in items:
yield item
同步函数会由 FastAPI 在线程池中执行,不会阻塞事件循环。
不声明返回类型
如果省略返回类型,FastAPI 使用 jsonable_encoder 进行转换:
@app.get("/items/stream-auto")
async def stream_items_auto():
# 自动转换,但性能较低
for item in items:
yield item
不推荐这种方式,因为:
- 性能较低(不使用 Pydantic Rust 序列化)
- 缺少数据验证
- OpenAPI 文档不完整
实际应用示例
AI 流式输出
模拟 AI 逐字输出:
import asyncio
from collections.abc import AsyncIterable
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Token(BaseModel):
"""AI 输出的 token"""
text: str
index: int
@app.get("/ai/generate")
async def generate_text() -> AsyncIterable[Token]:
"""模拟 AI 流式输出"""
response = "你好,这是一个模拟的 AI 流式输出示例。"
for i, char in enumerate(response):
await asyncio.sleep(0.05) # 模拟生成延迟
yield Token(text=char, index=i)
客户端接收:
{"text": "你", "index": 0}
{"text": "好", "index": 1}
{"text": ",", "index": 2}
...
实时日志流
from collections.abc import AsyncIterable
from datetime import datetime
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class LogEntry(BaseModel):
"""日志条目"""
timestamp: datetime
level: str
message: str
@app.get("/logs/stream")
async def stream_logs() -> AsyncIterable[LogEntry]:
"""实时日志流"""
logs = [
("INFO", "应用启动"),
("DEBUG", "连接数据库成功"),
("INFO", "加载配置完成"),
("WARN", "内存使用率较高"),
]
for level, message in logs:
yield LogEntry(
timestamp=datetime.now(),
level=level,
message=message
)
数据库大数据导出
from collections.abc import AsyncIterable
from typing import Annotated
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlmodel import Session, select
app = FastAPI()
class UserExport(BaseModel):
"""用户导出模型"""
id: int
username: str
email: str
# 注意:不包含敏感字段如 password
@app.get("/users/export")
async def export_users(session: SessionDep) -> AsyncIterable[UserExport]:
"""流式导出用户数据"""
# 使用流式查询,避免一次性加载所有数据
statement = select(User)
for user in session.exec(statement):
yield UserExport(
id=user.id,
username=user.username,
email=user.email
)
无限数据流
import asyncio
import random
from collections.abc import AsyncIterable
from datetime import datetime
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class SensorData(BaseModel):
"""传感器数据"""
timestamp: datetime
temperature: float
humidity: float
@app.get("/sensors/stream")
async def stream_sensor_data() -> AsyncIterable[SensorData]:
"""无限传感器数据流"""
while True:
yield SensorData(
timestamp=datetime.now(),
temperature=round(random.uniform(20, 30), 2),
humidity=round(random.uniform(40, 60), 2)
)
await asyncio.sleep(1) # 每秒一条数据
与其他流式方案对比
| 方案 | 适用场景 | 特点 |
|---|---|---|
| JSON Lines | 结构化数据流 | 每行独立 JSON,易于解析 |
| SSE | 服务器推送通知 | 带事件类型和 ID,支持断点续传 |
| WebSocket | 双向实时通信 | 全双工,适合聊天、协作 |
| StreamingResponse | 二进制流 | 视频、音频、文件 |
客户端处理
JavaScript 客户端
async function consumeStream() {
const response = await fetch("/items/stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行分割处理
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // 保留不完整的行
for (const line of lines) {
if (line.trim()) {
const item = JSON.parse(line);
console.log("收到:", item);
}
}
}
}
consumeStream();
Python 客户端
import httpx
import json
async def consume_stream():
async with httpx.AsyncClient() as client:
async with client.stream("GET", "http://localhost:8000/items/stream") as response:
async for line in response.aiter_lines():
if line.strip():
item = json.loads(line)
print(f"收到: {item}")
# 运行
import asyncio
asyncio.run(consume_stream())
性能考虑
内存效率
流式传输的最大优势是内存效率:
# ❌ 错误:一次性返回大量数据
@app.get("/items/all")
async def get_all_items() -> list[Item]:
return load_all_items() # 可能导致内存溢出
# ✅ 正确:流式返回
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in load_items_lazily(): # 懒加载
yield item
声明返回类型的性能优势
# ✅ 高性能:Pydantic Rust 序列化
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
for item in items:
yield item
# ⚠️ 较低性能:使用 jsonable_encoder
@app.get("/items/stream-slow")
async def stream_items_slow():
for item in items:
yield item
声明返回类型后,Pydantic 会使用 Rust 实现的序列化器,性能可提升 2 倍以上。
注意事项
连接中断处理
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
try:
for item in items:
yield item
except Exception as e:
print(f"流中断: {e}")
# 可以记录日志或清理资源
响应头设置
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.get("/items/stream")
async def stream_items() -> AsyncIterable[Item]:
# FastAPI 自动设置 Content-Type: application/jsonl
# 如需自定义响应头,使用 response_class
pass
# 或完全控制响应
@app.get("/items/custom-stream")
async def custom_stream():
async def generate():
for item in items:
yield item.model_dump_json() + "\n"
return StreamingResponse(
generate(),
media_type="application/jsonl",
headers={"X-Custom-Header": "value"}
)
小结
本章学习了 FastAPI 的流式 JSON Lines 功能:
- 流式传输概念:逐条发送数据,无需等待全部生成
- JSON Lines 格式:每行一个 JSON 对象
- 使用 yield:在路径操作函数中使用 yield 生成数据
- 返回类型声明:获得验证、文档和性能优化
- 实际应用:AI 输出、日志流、数据导出
流式 JSON Lines 的优势:
- 内存高效:逐条处理,避免内存溢出
- 实时响应:客户端可以立即开始处理数据
- 性能优化:Pydantic Rust 序列化
练习
- 创建一个流式输出 Fibonacci 数列的接口
- 实现一个模拟股票价格实时推送的 API
- 创建大数据导出接口,使用流式传输避免内存问题
- 编写客户端代码,处理流式 JSON Lines 数据