流式输出详解
流式输出(Streaming)是现代 LLM 应用的核心能力之一。与传统的请求-响应模式不同,流式输出让模型能够逐 token(或逐块)返回结果,大幅提升用户体验。本章将深入介绍 LangChain 中的流式输出机制。
为什么需要流式输出?
传统模式下,用户发送请求后需要等待模型生成完整响应,对于长文本可能需要等待数十秒。流式输出解决了这个问题:
- 即时反馈:用户立即看到第一个字,感知延迟大幅降低
- 更好体验:内容逐步呈现,类似人类说话的自然感
- 可中断:用户可以在生成过程中停止,节省成本
- 进度感知:对于长任务,用户能看到处理进度
流式输出的基本原理
流式输出的核心是将模型响应拆分成多个小块(chunk),每块包含一部分生成的文本。客户端持续接收这些块并实时显示。
传统模式:
请求 → [等待 30 秒] → 完整响应(500 字)
流式模式:
请求 → [0.1 秒] "你" → [0.2 秒] "好" → [0.3 秒] "!" → ... → 完成
基础流式输出
单次流式调用
最简单的流式输出方式:
from langchain.chat_models import init_chat_model
model = init_chat_model("openai:gpt-4o-mini")
# 流式输出
for chunk in model.stream("请用 200 字介绍 Python 编程语言"):
print(chunk.content, end="", flush=True)
print() # 换行
理解输出内容:
stream() 方法返回的每个 chunk 是一个 AIMessageChunk 对象:
for chunk in model.stream("讲一个故事"):
print(f"类型: {type(chunk).__name__}")
print(f"内容: {chunk.content}")
print("---")
输出示例:
类型: AIMessageChunk
内容: 从
---
类型: AIMessageChunk
内容: 前
---
类型: AIMessageChunk
内容: 有
---
...
累加流式块
流式块可以累加得到完整消息:
from langchain.chat_models import init_chat_model
model = init_chat_model("openai:gpt-4o-mini")
# 累加所有块
full_message = None
for chunk in model.stream("介绍机器学习"):
if full_message is None:
full_message = chunk
else:
full_message = full_message + chunk # 块可以相加
# 实时显示
print(chunk.content, end="", flush=True)
print(f"\n\n完整消息长度: {len(full_message.content)}")
print(f"响应元数据: {full_message.response_metadata}")
块相加的工作原理:
AIMessageChunk 实现了 __add__ 方法,相加时会:
- 合并文本内容
- 合并工具调用
- 合并响应元数据
带历史的流式输出
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
model = init_chat_model("openai:gpt-4o-mini")
# 对话历史
messages = [
SystemMessage(content="你是一个专业的 Python 编程助手"),
HumanMessage(content="什么是装饰器?"),
AIMessage(content="装饰器是 Python 的一种语法糖..."),
HumanMessage(content="给我举个例子"),
]
# 流式输出
for chunk in model.stream(messages):
print(chunk.content, end="", flush=True)
在 LCEL 链中使用流式输出
LCEL 链天然支持流式输出,无需额外配置:
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
model = init_chat_model("openai:gpt-4o-mini")
# 构建链
chain = (
ChatPromptTemplate.from_template("用 {language} 语言写一首关于 {topic} 的诗")
| model
| StrOutputParser()
)
# 流式输出
for chunk in chain.stream({"language": "中文", "topic": "春天"}):
print(chunk, end="", flush=True)
StrOutputParser 在流式中的作用:
- 直接调用模型时,流式返回
AIMessageChunk - 经过
StrOutputParser后,流式返回纯字符串
复杂链的流式输出
多步骤链同样支持流式:
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.output_parsers import StrOutputParser
model = init_chat_model("openai:gpt-4o-mini")
# 复杂链
chain = (
{
"context": lambda x: "Python 是一种高级编程语言...",
"question": RunnablePassthrough(),
}
| ChatPromptTemplate.from_template("""
基于上下文回答问题:
上下文:{context}
问题:{question}
请详细回答:
""")
| model
| StrOutputParser()
)
# 流式输出
for chunk in chain.stream("Python 有什么特点?"):
print(chunk, end="", flush=True)
异步流式输出
对于 I/O 密集型应用,使用异步流式输出可以提升性能:
import asyncio
from langchain.chat_models import init_chat_model
async def async_streaming_example():
model = init_chat_model("openai:gpt-4o-mini")
# 异步流式输出
async for chunk in model.astream("讲一个简短的故事"):
print(chunk.content, end="", flush=True)
print()
# 运行异步函数
asyncio.run(async_streaming_example())
异步链的流式输出
import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
async def async_chain_streaming():
model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("翻译成英文:{text}")
| model
| StrOutputParser()
)
# 异步流式输出
async for chunk in chain.astream({"text": "机器学习正在改变世界"}):
print(chunk, end="", flush=True)
print()
asyncio.run(async_chain_streaming())
并发流式请求
import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
async def stream_single(model, prompt_text, name):
"""单个流式请求"""
chain = prompt_text | model | StrOutputParser()
print(f"\n=== {name} ===")
async for chunk in chain.astream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
print()
async def concurrent_streaming():
model = init_chat_model("openai:gpt-4o-mini")
# 不同的提示模板
prompts = {
"简洁版": ChatPromptTemplate.from_template("用一句话解释:{topic}"),
"详细版": ChatPromptTemplate.from_template("详细解释:{topic}"),
"趣味版": ChatPromptTemplate.from_template("用有趣的方式解释:{topic}"),
}
# 并发执行
tasks = [
stream_single(model, prompt, name)
for name, prompt in prompts.items()
]
await asyncio.gather(*tasks)
asyncio.run(concurrent_streaming())
astream_events 详细事件流
astream_events 提供比 astream 更详细的事件信息,包括每个组件的开始、结束和中间状态:
import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
async def stream_events_example():
model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("解释 {concept}")
| model
| StrOutputParser()
)
# 使用 astream_events 获取详细事件
async for event in chain.astream_events(
{"concept": "量子计算"},
version="v2" # 使用 v2 版本的事件格式
):
# 事件类型
event_type = event["event"]
if event_type == "on_chat_model_stream":
# 模型流式输出
chunk = event["data"]["chunk"]
if chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_prompt_start":
print("\n[Prompt 开始]")
elif event_type == "on_prompt_end":
print("\n[Prompt 结束]")
elif event_type == "on_chat_model_start":
print("\n[模型开始生成]")
elif event_type == "on_chat_model_end":
print("\n[模型生成完成]")
asyncio.run(stream_events_example())
事件类型详解
astream_events 提供的事件类型:
| 事件 | 触发时机 | 包含信息 |
|---|---|---|
on_prompt_start | Prompt 模板开始处理 | 输入变量 |
on_prompt_end | Prompt 模板处理完成 | 格式化后的消息 |
on_chat_model_start | 模型开始调用 | 输入消息 |
on_chat_model_stream | 模型流式输出 | 输出块 |
on_chat_model_end | 模型调用完成 | 完整输出 |
on_parser_start | 解析器开始工作 | 输入数据 |
on_parser_stream | 解析器流式输出 | 解析结果 |
on_parser_end | 解析器完成 | 最终结果 |
on_chain_start | 链开始执行 | 链输入 |
on_chain_end | 链执行完成 | 链输出 |
on_tool_start | 工具开始执行 | 工具输入 |
on_tool_end | 工具执行完成 | 工具输出 |
追踪完整链路
import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.output_parsers import StrOutputParser
async def trace_chain_execution():
model = init_chat_model("openai:gpt-4o-mini")
chain = (
{
"context": RunnablePassthrough(),
"question": RunnablePassthrough(),
}
| ChatPromptTemplate.from_template("""
上下文:{context}
问题:{question}
""")
| model
| StrOutputParser()
)
async for event in chain.astream_events(
{"context": "AI 相关信息", "question": "什么是深度学习?"},
version="v2"
):
kind = event["event"]
name = event["name"]
if kind == "on_chain_start":
print(f"🔄 开始执行: {name}")
elif kind == "on_chain_end":
print(f"✅ 执行完成: {name}")
elif kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
asyncio.run(trace_chain_execution())
输出示例:
🔄 开始执行: RunnableSequence
🔄 开始执行: RunnableParallel<context,question>
✅ 执行完成: RunnableParallel<context,question>
🔄 开始执行: ChatPromptTemplate
✅ 执行完成: ChatPromptTemplate
🔄 开始执行: ChatOpenAI
深度学习是机器学习的一个子集...✅ 执行完成: ChatOpenAI
✅ 执行完成: RunnableSequence
Agent 流式输出
Agent 的流式输出比普通链更复杂,因为包含工具调用等中间步骤:
import asyncio
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool
@tool
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果:{query}"
async def stream_agent():
model = init_chat_model("openai:gpt-4o-mini")
agent = create_agent(
model=model,
tools=[search],
system_prompt="你是一个有帮助的助手"
)
# 流式输出 Agent 执行
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": "搜索 Python 教程"}]},
version="v2"
):
event_type = event["event"]
# 模型输出
if event_type == "on_chat_model_stream":
chunk = event["data"].get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
# 工具调用
elif event_type == "on_tool_start":
tool_input = event["data"].get("input")
print(f"\n🔧 调用工具: {event['name']}")
print(f" 参数: {tool_input}")
elif event_type == "on_tool_end":
tool_output = event["data"].get("output")
print(f"\n📤 工具结果: {tool_output}")
asyncio.run(stream_agent())
使用 stream 方法
对于简单的 Agent 流式输出:
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city}:晴,25°C"
model = init_chat_model("openai:gpt-4o-mini")
agent = create_agent(
model=model,
tools=[get_weather],
)
# 使用 stream 方法
for event in agent.stream({"messages": [{"role": "user", "content": "北京天气怎么样?"}]}):
# 注意:节点名是 "model" 而不是 "agent"
if "model" in event:
for msg in event["model"]["messages"]:
if hasattr(msg, "content") and msg.content:
print(msg.content)
elif "tools" in event:
for msg in event["tools"]["messages"]:
print(f"[工具结果]: {msg.content}")
Web 框架集成
FastAPI 流式响应
将流式输出集成到 FastAPI:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
from pydantic import BaseModel
app = FastAPI()
model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)
class QueryRequest(BaseModel):
question: str
@app.post("/stream")
async def stream_answer(request: QueryRequest):
"""流式返回答案"""
async def generate():
async for chunk in chain.astream({"question": request.question}):
# SSE 格式
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# 非流式端点用于对比
@app.post("/complete")
async def complete_answer(request: QueryRequest):
"""完整返回答案"""
result = await chain.ainvoke({"question": request.question})
return {"answer": result}
客户端调用:
// JavaScript 前端
async function streamQuery(question) {
const response = await fetch('/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析 SSE 格式
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('流式输出完成');
} else {
// 显示内容
document.getElementById('output').textContent += data;
}
}
}
}
}
流式输出的前端体验优化
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
import json
app = FastAPI()
model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)
@app.post("/stream-with-metadata")
async def stream_with_metadata(request: dict):
"""带元数据的流式响应"""
async def generate():
chunk_count = 0
async for chunk in chain.astream({"question": request["question"]}):
chunk_count += 1
# 返回 JSON 格式的块
data = {
"content": chunk,
"chunk_id": chunk_count,
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
# 完成信号
yield f"data: {json.dumps({'done': True, 'total_chunks': chunk_count})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
)
流式输出最佳实践
1. 处理连接断开
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.post("/stream")
async def stream_with_disconnect_handling(request: dict):
"""处理客户端断开连接"""
async def generate():
try:
async for chunk in chain.astream({"question": request["question"]}):
# 检查连接是否还活跃
if await request.app.state.is_disconnected():
print("客户端已断开,停止生成")
break
yield f"data: {chunk}\n\n"
except asyncio.CancelledError:
print("请求被取消")
raise
except Exception as e:
yield f"data: ERROR: {str(e)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
2. 超时控制
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/stream-with-timeout")
async def stream_with_timeout(request: dict):
"""带超时的流式输出"""
async def generate():
try:
async for chunk in chain.astream({"question": request["question"]}):
yield f"data: {chunk}\n\n"
await asyncio.sleep(0) # 让出控制权,检查取消
except asyncio.TimeoutError:
yield "data: ERROR: 请求超时\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# 或使用 asyncio.wait_for
async def stream_with_limit():
try:
async for chunk in asyncio.wait_for(
chain.astream({"question": "test"}),
timeout=60.0
):
yield chunk
except asyncio.TimeoutError:
yield "超时"
3. 错误恢复
import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
async def resilient_streaming(chain, input_data, max_retries=3):
"""带错误恢复的流式输出"""
for attempt in range(max_retries):
try:
full_content = []
async for chunk in chain.astream(input_data):
full_content.append(chunk)
yield chunk
# 成功完成,退出重试循环
return
except Exception as e:
if attempt < max_retries - 1:
print(f"尝试 {attempt + 1} 失败: {e},重试中...")
await asyncio.sleep(2 ** attempt) # 指数退避
else:
yield f"\n[错误] 多次重试后失败: {str(e)}"
# 使用
model = init_chat_model("openai:gpt-4o-mini")
chain = ChatPromptTemplate.from_template("{question}") | model | StrOutputParser()
async def safe_stream():
async for chunk in resilient_streaming(chain, {"question": "介绍 Python"}):
print(chunk, end="", flush=True)
asyncio.run(safe_stream())
4. 缓冲优化
对于某些场景,可以缓冲输出以优化网络效率:
import asyncio
from collections import deque
async def buffered_stream(chain, input_data, buffer_size=10, buffer_timeout=0.1):
"""带缓冲的流式输出"""
buffer = deque()
async def fill_buffer():
async for chunk in chain.astream(input_data):
buffer.append(chunk)
# 启动填充任务
fill_task = asyncio.create_task(fill_buffer())
last_yield = asyncio.get_event_loop().time()
while not fill_task.done() or buffer:
# 缓冲满了或超时,输出
current_time = asyncio.get_event_loop().time()
if len(buffer) >= buffer_size or (buffer and current_time - last_yield >= buffer_timeout):
while buffer:
yield buffer.popleft()
last_yield = current_time
await asyncio.sleep(0.01)
# 输出剩余内容
while buffer:
yield buffer.popleft()
调试流式输出
启用详细日志
import logging
# 启用 LangChain 调试日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("langchain")
logger.setLevel(logging.DEBUG)
# 或使用全局调试
from langchain.globals import set_debug
set_debug(True)
# 现在流式输出会显示详细日志
for chunk in model.stream("测试"):
print(chunk.content, end="")
使用 LangSmith 追踪
import os
# 启用 LangSmith
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "ls-xxx"
from langchain.chat_models import init_chat_model
model = init_chat_model("openai:gpt-4o-mini")
# 流式输出会自动追踪到 LangSmith
for chunk in model.stream("介绍人工智能"):
print(chunk.content, end="")
# 访问 smith.langchain.com 查看详细的流式追踪
常见问题
1. 流式输出中断
问题:流式输出在中间停止,没有完成
解决方案:
- 检查网络连接稳定性
- 添加重试机制
- 记录已输出内容,支持断点续传
2. 流式输出不显示
问题:后端正常返回,前端没有显示
解决方案:
- 检查前端是否正确处理 SSE 格式
- 确认 HTTP 响应头设置正确
- 禁用代理缓冲(如 Nginx)
3. 编码问题
问题:中文显示为乱码
解决方案:
# 确保使用 UTF-8 编码
from fastapi.responses import StreamingResponse
async def generate():
async for chunk in chain.astream(input_data):
# 确保 chunk 是字符串
if isinstance(chunk, bytes):
chunk = chunk.decode('utf-8')
yield f"data: {chunk}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream; charset=utf-8"
)
4. 内存泄漏
问题:长时间运行后内存增长
解决方案:
# 及时清理已完成的事件
async def safe_stream():
try:
async for event in chain.astream_events(input_data, version="v2"):
yield event
finally:
# 清理资源
pass