跳到主要内容

流式输出详解

流式输出(Streaming)是现代 LLM 应用的核心能力之一。与传统的请求-响应模式不同,流式输出让模型能够逐 token(或逐块)返回结果,大幅提升用户体验。本章将深入介绍 LangChain 中的流式输出机制。

为什么需要流式输出?

传统模式下,用户发送请求后需要等待模型生成完整响应,对于长文本可能需要等待数十秒。流式输出解决了这个问题:

  • 即时反馈:用户立即看到第一个字,感知延迟大幅降低
  • 更好体验:内容逐步呈现,类似人类说话的自然感
  • 可中断:用户可以在生成过程中停止,节省成本
  • 进度感知:对于长任务,用户能看到处理进度

流式输出的基本原理

流式输出的核心是将模型响应拆分成多个小块(chunk),每块包含一部分生成的文本。客户端持续接收这些块并实时显示。

传统模式:
请求 → [等待 30 秒] → 完整响应(500 字)

流式模式:
请求 → [0.1 秒] "你" → [0.2 秒] "好" → [0.3 秒] "!" → ... → 完成

基础流式输出

单次流式调用

最简单的流式输出方式:

from langchain.chat_models import init_chat_model

model = init_chat_model("openai:gpt-4o-mini")

# 流式输出
for chunk in model.stream("请用 200 字介绍 Python 编程语言"):
print(chunk.content, end="", flush=True)

print() # 换行

理解输出内容

stream() 方法返回的每个 chunk 是一个 AIMessageChunk 对象:

for chunk in model.stream("讲一个故事"):
print(f"类型: {type(chunk).__name__}")
print(f"内容: {chunk.content}")
print("---")

输出示例:

类型: AIMessageChunk
内容: 从
---
类型: AIMessageChunk
内容: 前
---
类型: AIMessageChunk
内容: 有
---
...

累加流式块

流式块可以累加得到完整消息:

from langchain.chat_models import init_chat_model

model = init_chat_model("openai:gpt-4o-mini")

# 累加所有块
full_message = None
for chunk in model.stream("介绍机器学习"):
if full_message is None:
full_message = chunk
else:
full_message = full_message + chunk # 块可以相加

# 实时显示
print(chunk.content, end="", flush=True)

print(f"\n\n完整消息长度: {len(full_message.content)}")
print(f"响应元数据: {full_message.response_metadata}")

块相加的工作原理

AIMessageChunk 实现了 __add__ 方法,相加时会:

  • 合并文本内容
  • 合并工具调用
  • 合并响应元数据

带历史的流式输出

from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

model = init_chat_model("openai:gpt-4o-mini")

# 对话历史
messages = [
SystemMessage(content="你是一个专业的 Python 编程助手"),
HumanMessage(content="什么是装饰器?"),
AIMessage(content="装饰器是 Python 的一种语法糖..."),
HumanMessage(content="给我举个例子"),
]

# 流式输出
for chunk in model.stream(messages):
print(chunk.content, end="", flush=True)

在 LCEL 链中使用流式输出

LCEL 链天然支持流式输出,无需额外配置:

from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser

model = init_chat_model("openai:gpt-4o-mini")

# 构建链
chain = (
ChatPromptTemplate.from_template("用 {language} 语言写一首关于 {topic} 的诗")
| model
| StrOutputParser()
)

# 流式输出
for chunk in chain.stream({"language": "中文", "topic": "春天"}):
print(chunk, end="", flush=True)

StrOutputParser 在流式中的作用

  • 直接调用模型时,流式返回 AIMessageChunk
  • 经过 StrOutputParser 后,流式返回纯字符串

复杂链的流式输出

多步骤链同样支持流式:

from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.output_parsers import StrOutputParser

model = init_chat_model("openai:gpt-4o-mini")

# 复杂链
chain = (
{
"context": lambda x: "Python 是一种高级编程语言...",
"question": RunnablePassthrough(),
}
| ChatPromptTemplate.from_template("""
基于上下文回答问题:

上下文:{context}
问题:{question}

请详细回答:
""")
| model
| StrOutputParser()
)

# 流式输出
for chunk in chain.stream("Python 有什么特点?"):
print(chunk, end="", flush=True)

异步流式输出

对于 I/O 密集型应用,使用异步流式输出可以提升性能:

import asyncio
from langchain.chat_models import init_chat_model

async def async_streaming_example():
model = init_chat_model("openai:gpt-4o-mini")

# 异步流式输出
async for chunk in model.astream("讲一个简短的故事"):
print(chunk.content, end="", flush=True)

print()

# 运行异步函数
asyncio.run(async_streaming_example())

异步链的流式输出

import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser

async def async_chain_streaming():
model = init_chat_model("openai:gpt-4o-mini")

chain = (
ChatPromptTemplate.from_template("翻译成英文:{text}")
| model
| StrOutputParser()
)

# 异步流式输出
async for chunk in chain.astream({"text": "机器学习正在改变世界"}):
print(chunk, end="", flush=True)

print()

asyncio.run(async_chain_streaming())

并发流式请求

import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser

async def stream_single(model, prompt_text, name):
"""单个流式请求"""
chain = prompt_text | model | StrOutputParser()

print(f"\n=== {name} ===")
async for chunk in chain.astream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
print()

async def concurrent_streaming():
model = init_chat_model("openai:gpt-4o-mini")

# 不同的提示模板
prompts = {
"简洁版": ChatPromptTemplate.from_template("用一句话解释:{topic}"),
"详细版": ChatPromptTemplate.from_template("详细解释:{topic}"),
"趣味版": ChatPromptTemplate.from_template("用有趣的方式解释:{topic}"),
}

# 并发执行
tasks = [
stream_single(model, prompt, name)
for name, prompt in prompts.items()
]

await asyncio.gather(*tasks)

asyncio.run(concurrent_streaming())

astream_events 详细事件流

astream_events 提供比 astream 更详细的事件信息,包括每个组件的开始、结束和中间状态:

import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser

async def stream_events_example():
model = init_chat_model("openai:gpt-4o-mini")

chain = (
ChatPromptTemplate.from_template("解释 {concept}")
| model
| StrOutputParser()
)

# 使用 astream_events 获取详细事件
async for event in chain.astream_events(
{"concept": "量子计算"},
version="v2" # 使用 v2 版本的事件格式
):
# 事件类型
event_type = event["event"]

if event_type == "on_chat_model_stream":
# 模型流式输出
chunk = event["data"]["chunk"]
if chunk.content:
print(chunk.content, end="", flush=True)

elif event_type == "on_prompt_start":
print("\n[Prompt 开始]")

elif event_type == "on_prompt_end":
print("\n[Prompt 结束]")

elif event_type == "on_chat_model_start":
print("\n[模型开始生成]")

elif event_type == "on_chat_model_end":
print("\n[模型生成完成]")

asyncio.run(stream_events_example())

事件类型详解

astream_events 提供的事件类型:

事件触发时机包含信息
on_prompt_startPrompt 模板开始处理输入变量
on_prompt_endPrompt 模板处理完成格式化后的消息
on_chat_model_start模型开始调用输入消息
on_chat_model_stream模型流式输出输出块
on_chat_model_end模型调用完成完整输出
on_parser_start解析器开始工作输入数据
on_parser_stream解析器流式输出解析结果
on_parser_end解析器完成最终结果
on_chain_start链开始执行链输入
on_chain_end链执行完成链输出
on_tool_start工具开始执行工具输入
on_tool_end工具执行完成工具输出

追踪完整链路

import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.output_parsers import StrOutputParser

async def trace_chain_execution():
model = init_chat_model("openai:gpt-4o-mini")

chain = (
{
"context": RunnablePassthrough(),
"question": RunnablePassthrough(),
}
| ChatPromptTemplate.from_template("""
上下文:{context}
问题:{question}
""")
| model
| StrOutputParser()
)

async for event in chain.astream_events(
{"context": "AI 相关信息", "question": "什么是深度学习?"},
version="v2"
):
kind = event["event"]
name = event["name"]

if kind == "on_chain_start":
print(f"🔄 开始执行: {name}")
elif kind == "on_chain_end":
print(f"✅ 执行完成: {name}")
elif kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)

asyncio.run(trace_chain_execution())

输出示例:

🔄 开始执行: RunnableSequence
🔄 开始执行: RunnableParallel<context,question>
✅ 执行完成: RunnableParallel<context,question>
🔄 开始执行: ChatPromptTemplate
✅ 执行完成: ChatPromptTemplate
🔄 开始执行: ChatOpenAI
深度学习是机器学习的一个子集...✅ 执行完成: ChatOpenAI
✅ 执行完成: RunnableSequence

Agent 流式输出

Agent 的流式输出比普通链更复杂,因为包含工具调用等中间步骤:

import asyncio
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool

@tool
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果:{query}"

async def stream_agent():
model = init_chat_model("openai:gpt-4o-mini")

agent = create_agent(
model=model,
tools=[search],
system_prompt="你是一个有帮助的助手"
)

# 流式输出 Agent 执行
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": "搜索 Python 教程"}]},
version="v2"
):
event_type = event["event"]

# 模型输出
if event_type == "on_chat_model_stream":
chunk = event["data"].get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)

# 工具调用
elif event_type == "on_tool_start":
tool_input = event["data"].get("input")
print(f"\n🔧 调用工具: {event['name']}")
print(f" 参数: {tool_input}")

elif event_type == "on_tool_end":
tool_output = event["data"].get("output")
print(f"\n📤 工具结果: {tool_output}")

asyncio.run(stream_agent())

使用 stream 方法

对于简单的 Agent 流式输出:

from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool

@tool
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city}:晴,25°C"

model = init_chat_model("openai:gpt-4o-mini")
agent = create_agent(
model=model,
tools=[get_weather],
)

# 使用 stream 方法
for event in agent.stream({"messages": [{"role": "user", "content": "北京天气怎么样?"}]}):
# 注意:节点名是 "model" 而不是 "agent"
if "model" in event:
for msg in event["model"]["messages"]:
if hasattr(msg, "content") and msg.content:
print(msg.content)

elif "tools" in event:
for msg in event["tools"]["messages"]:
print(f"[工具结果]: {msg.content}")

Web 框架集成

FastAPI 流式响应

将流式输出集成到 FastAPI:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
from pydantic import BaseModel

app = FastAPI()

model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)

class QueryRequest(BaseModel):
question: str

@app.post("/stream")
async def stream_answer(request: QueryRequest):
"""流式返回答案"""
async def generate():
async for chunk in chain.astream({"question": request.question}):
# SSE 格式
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream"
)

# 非流式端点用于对比
@app.post("/complete")
async def complete_answer(request: QueryRequest):
"""完整返回答案"""
result = await chain.ainvoke({"question": request.question})
return {"answer": result}

客户端调用

// JavaScript 前端
async function streamQuery(question) {
const response = await fetch('/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
// 解析 SSE 格式
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('流式输出完成');
} else {
// 显示内容
document.getElementById('output').textContent += data;
}
}
}
}
}

流式输出的前端体验优化

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
import json

app = FastAPI()
model = init_chat_model("openai:gpt-4o-mini")

chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)

@app.post("/stream-with-metadata")
async def stream_with_metadata(request: dict):
"""带元数据的流式响应"""
async def generate():
chunk_count = 0
async for chunk in chain.astream({"question": request["question"]}):
chunk_count += 1
# 返回 JSON 格式的块
data = {
"content": chunk,
"chunk_id": chunk_count,
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"

# 完成信号
yield f"data: {json.dumps({'done': True, 'total_chunks': chunk_count})}\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
)

流式输出最佳实践

1. 处理连接断开

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.post("/stream")
async def stream_with_disconnect_handling(request: dict):
"""处理客户端断开连接"""
async def generate():
try:
async for chunk in chain.astream({"question": request["question"]}):
# 检查连接是否还活跃
if await request.app.state.is_disconnected():
print("客户端已断开,停止生成")
break
yield f"data: {chunk}\n\n"
except asyncio.CancelledError:
print("请求被取消")
raise
except Exception as e:
yield f"data: ERROR: {str(e)}\n\n"

return StreamingResponse(generate(), media_type="text/event-stream")

2. 超时控制

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/stream-with-timeout")
async def stream_with_timeout(request: dict):
"""带超时的流式输出"""
async def generate():
try:
async for chunk in chain.astream({"question": request["question"]}):
yield f"data: {chunk}\n\n"
await asyncio.sleep(0) # 让出控制权,检查取消
except asyncio.TimeoutError:
yield "data: ERROR: 请求超时\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream"
)

# 或使用 asyncio.wait_for
async def stream_with_limit():
try:
async for chunk in asyncio.wait_for(
chain.astream({"question": "test"}),
timeout=60.0
):
yield chunk
except asyncio.TimeoutError:
yield "超时"

3. 错误恢复

import asyncio
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser

async def resilient_streaming(chain, input_data, max_retries=3):
"""带错误恢复的流式输出"""
for attempt in range(max_retries):
try:
full_content = []
async for chunk in chain.astream(input_data):
full_content.append(chunk)
yield chunk

# 成功完成,退出重试循环
return

except Exception as e:
if attempt < max_retries - 1:
print(f"尝试 {attempt + 1} 失败: {e},重试中...")
await asyncio.sleep(2 ** attempt) # 指数退避
else:
yield f"\n[错误] 多次重试后失败: {str(e)}"

# 使用
model = init_chat_model("openai:gpt-4o-mini")
chain = ChatPromptTemplate.from_template("{question}") | model | StrOutputParser()

async def safe_stream():
async for chunk in resilient_streaming(chain, {"question": "介绍 Python"}):
print(chunk, end="", flush=True)

asyncio.run(safe_stream())

4. 缓冲优化

对于某些场景,可以缓冲输出以优化网络效率:

import asyncio
from collections import deque

async def buffered_stream(chain, input_data, buffer_size=10, buffer_timeout=0.1):
"""带缓冲的流式输出"""
buffer = deque()

async def fill_buffer():
async for chunk in chain.astream(input_data):
buffer.append(chunk)

# 启动填充任务
fill_task = asyncio.create_task(fill_buffer())

last_yield = asyncio.get_event_loop().time()

while not fill_task.done() or buffer:
# 缓冲满了或超时,输出
current_time = asyncio.get_event_loop().time()

if len(buffer) >= buffer_size or (buffer and current_time - last_yield >= buffer_timeout):
while buffer:
yield buffer.popleft()
last_yield = current_time

await asyncio.sleep(0.01)

# 输出剩余内容
while buffer:
yield buffer.popleft()

调试流式输出

启用详细日志

import logging

# 启用 LangChain 调试日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("langchain")
logger.setLevel(logging.DEBUG)

# 或使用全局调试
from langchain.globals import set_debug
set_debug(True)

# 现在流式输出会显示详细日志
for chunk in model.stream("测试"):
print(chunk.content, end="")

使用 LangSmith 追踪

import os

# 启用 LangSmith
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "ls-xxx"

from langchain.chat_models import init_chat_model

model = init_chat_model("openai:gpt-4o-mini")

# 流式输出会自动追踪到 LangSmith
for chunk in model.stream("介绍人工智能"):
print(chunk.content, end="")

# 访问 smith.langchain.com 查看详细的流式追踪

常见问题

1. 流式输出中断

问题:流式输出在中间停止,没有完成

解决方案

  • 检查网络连接稳定性
  • 添加重试机制
  • 记录已输出内容,支持断点续传

2. 流式输出不显示

问题:后端正常返回,前端没有显示

解决方案

  • 检查前端是否正确处理 SSE 格式
  • 确认 HTTP 响应头设置正确
  • 禁用代理缓冲(如 Nginx)

3. 编码问题

问题:中文显示为乱码

解决方案

# 确保使用 UTF-8 编码
from fastapi.responses import StreamingResponse

async def generate():
async for chunk in chain.astream(input_data):
# 确保 chunk 是字符串
if isinstance(chunk, bytes):
chunk = chunk.decode('utf-8')
yield f"data: {chunk}\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream; charset=utf-8"
)

4. 内存泄漏

问题:长时间运行后内存增长

解决方案

# 及时清理已完成的事件
async def safe_stream():
try:
async for event in chain.astream_events(input_data, version="v2"):
yield event
finally:
# 清理资源
pass

下一步

参考资源