错误处理与重试
构建生产级 LLM 应用需要健壮的错误处理机制。LLM 调用可能因网络问题、API 限流、模型错误等多种原因失败,合理的重试策略和错误处理能够显著提升应用的可靠性。本章将详细介绍 LangChain 中的错误处理和重试机制。
常见错误类型
LLM 应用中的错误大致可分为以下几类:
1. 网络相关错误
from langchain.chat_models import init_chat_model
from langchain_core.exceptions import LangChainException
model = init_chat_model("openai:gpt-4o-mini")
try:
response = model.invoke("你好")
except ConnectionError as e:
# 网络连接失败
print(f"网络错误: {e}")
except TimeoutError as e:
# 请求超时
print(f"超时错误: {e}")
典型场景:
- 服务器无响应
- DNS 解析失败
- 防火墙阻断
- 代理配置错误
2. API 相关错误
from openai import AuthenticationError, RateLimitError, APIError
try:
response = model.invoke("你好")
except AuthenticationError:
# API Key 无效或过期
print("认证失败:请检查 API Key")
except RateLimitError as e:
# 请求频率超限
print(f"限流错误: {e}")
# 通常需要等待一段时间后重试
except APIError as e:
# API 服务端错误
print(f"API 错误: {e}")
HTTP 状态码对应:
| 状态码 | 错误类型 | 说明 | 处理建议 |
|---|---|---|---|
| 401 | AuthenticationError | 认证失败 | 检查 API Key |
| 403 | PermissionError | 权限不足 | 检查账户权限 |
| 429 | RateLimitError | 请求过多 | 等待后重试 |
| 500 | APIError | 服务内部错误 | 重试或联系支持 |
| 502/503 | ServiceUnavailableError | 服务不可用 | 等待后重试 |
3. 输入输出错误
from langchain_core.exceptions import OutputParserException
from pydantic import ValidationError
try:
# 结构化输出解析失败
result = structured_model.invoke("提取信息")
except OutputParserException as e:
print(f"输出解析失败: {e}")
except ValidationError as e:
print(f"数据验证失败: {e}")
4. 上下文长度错误
from openai import BadRequestError
try:
response = model.invoke("很长的输入...")
except BadRequestError as e:
if "maximum context length" in str(e):
print("上下文长度超限,需要缩短输入或使用更大上下文的模型")
内置重试机制
LangChain 的模型默认内置了重试机制:
默认重试配置
from langchain.chat_models import init_chat_model
# 默认配置:最多重试 6 次
model = init_chat_model("openai:gpt-4o-mini")
# 自动重试的错误类型:
# - 网络错误(ConnectionError, TimeoutError)
# - 429 错误(RateLimitError)
# - 5xx 错误(APIError)
自定义重试参数
from langchain.chat_models import init_chat_model
model = init_chat_model(
"openai:gpt-4o-mini",
max_retries=10, # 最多重试 10 次
timeout=120, # 超时时间 120 秒
)
重试参数说明:
| 参数 | 说明 | 默认值 |
|---|---|---|
max_retries | 最大重试次数 | 6 |
timeout | 单次请求超时时间(秒) | 无限制 |
request_timeout | 请求超时(包括重试) | 无限制 |
RunnableRetry 高级重试
使用 RunnableRetry 可以更精细地控制重试行为:
基本用法
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableRetry
from langchain.output_parsers import StrOutputParser
model = init_chat_model("openai:gpt-4o-mini")
chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)
# 添加重试
chain_with_retry = RunnableRetry(
bound=chain,
max_attempt_number=5, # 最多尝试 5 次
wait_exponential_jitter=True, # 指数退避 + 抖动
)
result = chain_with_retry.invoke({"question": "什么是 AI?"})
使用 with_retry 方法
# 更简洁的写法
chain_with_retry = chain.with_retry(
stop_after_attempt=3, # 最多重试 3 次
wait_exponential_jitter=True, # 指数退避
retry_on=(ConnectionError, TimeoutError), # 只重试这些错误
)
自定义等待策略
from tenacity import (
wait_random_exponential,
wait_fixed,
wait_incrementing,
stop_after_attempt,
stop_after_delay,
retry_if_exception_type,
RetryError,
)
# 指数退避(推荐)
chain_with_retry = chain.with_retry(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, min=2, max=60),
)
# 固定等待时间
chain_with_retry = chain.with_retry(
stop=stop_after_attempt(3),
wait=wait_fixed(5), # 每次等待 5 秒
)
# 递增等待时间
chain_with_retry = chain.with_retry(
stop=stop_after_attempt(4),
wait=wait_incrementing(start=1, increment=2), # 1s, 3s, 5s, ...
)
# 组合停止条件
chain_with_retry = chain.with_retry(
stop=stop_after_attempt(5) | stop_after_delay(60), # 最多 5 次或总时间 60 秒
)
条件重试
只对特定错误进行重试:
from openai import RateLimitError, APIError
# 只重试限流错误
chain_with_retry = chain.with_retry(
stop_after_attempt=5,
retry_on=(RateLimitError,),
)
# 重试多种错误
chain_with_retry = chain.with_retry(
stop_after_attempt=3,
retry_on=(
ConnectionError,
TimeoutError,
RateLimitError,
),
)
# 使用条件函数
def should_retry(exception):
"""自定义重试条件"""
# 总是重试网络错误
if isinstance(exception, (ConnectionError, TimeoutError)):
return True
# 限流错误检查重试头
if isinstance(exception, RateLimitError):
# 某些限流是不可恢复的
return getattr(exception, 'retryable', True)
return False
chain_with_retry = chain.with_retry(
stop_after_attempt=5,
retry_on=should_retry,
)
重试回调
监控重试过程:
from tenacity import before_sleep_log, after_log
import logging
logger = logging.getLogger(__name__)
chain_with_retry = chain.with_retry(
stop=stop_after_attempt(5),
wait=wait_random_exponential(min=1, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING), # 重试前记录
after=after_log(logger, logging.INFO), # 成功后记录
)
Fallback 回退机制
当主链失败时,自动切换到备用方案:
基本用法
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableFallback
from langchain.output_parsers import StrOutputParser
# 主模型和备用模型
primary_model = init_chat_model("openai:gpt-4o-mini")
fallback_model = init_chat_model("anthropic:claude-sonnet-4-5-20250929")
# 主链
primary_chain = (
ChatPromptTemplate.from_template("回答:{question}")
| primary_model
| StrOutputParser()
)
# 备用链
fallback_chain = (
ChatPromptTemplate.from_template("回答:{question}")
| fallback_model
| StrOutputParser()
)
# 组合回退
robust_chain = RunnableFallback(
bound=primary_chain,
fallbacks=[fallback_chain],
)
# 或使用 with_fallbacks 方法
robust_chain = primary_chain.with_fallbacks([fallback_chain])
# 使用:如果 primary_chain 失败,自动使用 fallback_chain
result = robust_chain.invoke({"question": "什么是机器学习?"})
多级回退
设置多个备用方案:
from langchain_community.chat_models import ChatOllama
# 模型优先级
api_model = init_chat_model("openai:gpt-4o-mini")
backup_api_model = init_chat_model("anthropic:claude-sonnet-4-5-20250929")
local_model = ChatOllama(model="llama3")
# 创建链
api_chain = ChatPromptTemplate.from_template("{question}") | api_model
backup_chain = ChatPromptTemplate.from_template("{question}") | backup_api_model
local_chain = ChatPromptTemplate.from_template("{question}") | local_model
# 三级回退
robust_chain = api_chain.with_fallbacks([backup_chain, local_chain])
# 执行时会依次尝试:
# 1. api_chain (OpenAI)
# 2. backup_chain (Claude) - 如果 OpenAI 失败
# 3. local_chain (本地模型) - 如果前两者都失败
条件回退
只在特定错误时触发回退:
from openai import RateLimitError, AuthenticationError
# 只在限流时回退
robust_chain = primary_chain.with_fallbacks(
[fallback_chain],
exceptions_to_handle=(RateLimitError,),
)
# 认证错误不回退(因为备用模型也需要认证)
# 上述配置不会处理 AuthenticationError
回退降级策略
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
# 定义不同复杂度的链
# 高质量链(可能失败)
high_quality_chain = (
ChatPromptTemplate.from_template("""
请详细分析以下问题,提供深入见解:
{question}
""")
| init_chat_model("openai:gpt-4o") # 更强大的模型
| StrOutputParser()
)
# 简化链(更稳定)
simple_chain = (
ChatPromptTemplate.from_template("简洁回答:{question}")
| init_chat_model("openai:gpt-4o-mini") # 更稳定的模型
| StrOutputParser()
)
# 缓存链(最稳定)
def cached_response(question):
"""预定义的简单回答"""
return "抱歉,服务暂时繁忙,请稍后再试。"
from langchain_core.runnables import RunnableLambda
cache_chain = RunnableLambda(lambda x: cached_response(x["question"]))
# 降级链
graceful_chain = high_quality_chain.with_fallbacks([simple_chain, cache_chain])
结构化输出的错误处理
结构化输出是 LLM 应用中容易出错的地方,LangChain 提供了专门的处理机制:
自动重试解析
from langchain.chat_models import init_chat_model
from pydantic import BaseModel, Field
from typing import Literal
class ProductReview(BaseModel):
rating: int = Field(description="评分 1-5", ge=1, le=5)
sentiment: Literal["positive", "negative", "neutral"]
summary: str = Field(description="一句话总结")
model = init_chat_model("openai:gpt-4o-mini")
# 结构化输出会自动重试解析
structured_model = model.with_structured_output(ProductReview)
# 如果输出不符合 schema,会自动重试
result = structured_model.invoke("这个产品太棒了,我非常喜欢!")
自定义解析错误处理
from langchain.chat_models import init_chat_model
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from pydantic import BaseModel, Field
class Person(BaseModel):
name: str
age: int
occupation: str
parser = PydanticOutputParser(pydantic_object=Person)
# 创建自动修复解析器
fixing_parser = OutputFixingParser.from_llm(
parser=parser,
llm=init_chat_model("openai:gpt-4o-mini"),
)
# 使用
result = fixing_parser.parse("张三,28岁,软件工程师")
# 如果格式有问题,会自动调用 LLM 修复
RetryWithErrorOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser, RetryWithErrorOutputParser
class Task(BaseModel):
name: str
priority: int
deadline: str
parser = PydanticOutputParser(pydantic_object=Task)
prompt = ChatPromptTemplate.from_template("{query}\n\n{format_instructions}")
# 创建带重试的解析器
retry_parser = RetryWithErrorOutputParser.from_llm(
parser=parser,
llm=init_chat_model("openai:gpt-4o-mini"),
max_retries=3,
)
# 使用
chain = prompt | model
response = chain.invoke({
"query": "创建任务:完成报告,优先级高,明天截止",
"format_instructions": parser.get_format_instructions(),
})
# 解析(失败时会重试)
try:
result = retry_parser.parse_with_prompt(response.content, prompt_value=prompt)
except Exception as e:
print(f"解析失败: {e}")
全局异常处理
FastAPI 中的统一错误处理
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from openai import RateLimitError, AuthenticationError, APIError
from langchain_core.exceptions import OutputParserException
app = FastAPI()
@app.exception_handler(RateLimitError)
async def rate_limit_handler(request: Request, exc: RateLimitError):
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"message": "请求过于频繁,请稍后再试",
"retry_after": getattr(exc, 'retry_after', 60),
}
)
@app.exception_handler(AuthenticationError)
async def auth_error_handler(request: Request, exc: AuthenticationError):
return JSONResponse(
status_code=401,
content={
"error": "authentication_failed",
"message": "API 认证失败,请检查配置",
}
)
@app.exception_handler(OutputParserException)
async def parser_error_handler(request: Request, exc: OutputParserException):
return JSONResponse(
status_code=422,
content={
"error": "output_parsing_failed",
"message": "输出格式解析失败",
"details": str(exc),
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
# 记录未知错误
import logging
logging.error(f"未处理的异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "internal_error",
"message": "服务器内部错误",
}
)
上下文管理器模式
from contextlib import contextmanager
from typing import Optional
@contextmanager
def llm_error_handler(default_response: Optional[str] = None):
"""LLM 调用的上下文管理器"""
try:
yield
except RateLimitError:
print("警告:API 限流,请稍后重试")
if default_response:
return default_response
raise
except AuthenticationError:
raise ValueError("API Key 无效或已过期")
except TimeoutError:
print("警告:请求超时")
if default_response:
return default_response
raise
except Exception as e:
print(f"未知错误: {e}")
raise
# 使用
with llm_error_handler(default_response="抱歉,服务暂时不可用"):
result = model.invoke("你好")
最佳实践
1. 分层错误处理
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.output_parsers import StrOutputParser
model = init_chat_model("openai:gpt-4o-mini")
# 第一层:基础链
base_chain = (
ChatPromptTemplate.from_template("回答:{question}")
| model
| StrOutputParser()
)
# 第二层:添加重试
retry_chain = base_chain.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
)
# 第三层:添加回退
fallback_model = init_chat_model("anthropic:claude-sonnet-4-5-20250929")
fallback_chain = (
ChatPromptTemplate.from_template("回答:{question}")
| fallback_model
| StrOutputParser()
)
robust_chain = retry_chain.with_fallbacks([fallback_chain])
# 第四层:添加超时
import asyncio
async def safe_invoke(question: str, timeout: float = 30.0):
try:
result = await asyncio.wait_for(
robust_chain.ainvoke({"question": question}),
timeout=timeout
)
return result
except asyncio.TimeoutError:
return "抱歉,响应超时,请简化问题后重试"
except Exception as e:
return f"服务暂时不可用: {str(e)}"
2. 错误日志与监控
import logging
from datetime import datetime
from typing import Callable
logger = logging.getLogger("llm_errors")
class ErrorMonitor:
"""错误监控类"""
def __init__(self):
self.error_counts = {}
self.last_errors = []
def record_error(self, error_type: str, error: Exception, context: dict = None):
"""记录错误"""
# 计数
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# 保存最近错误
self.last_errors.append({
"type": error_type,
"message": str(error),
"timestamp": datetime.now().isoformat(),
"context": context,
})
# 只保留最近 100 条
self.last_errors = self.last_errors[-100:]
# 日志
logger.error(f"LLM 错误 [{error_type}]: {error}", extra={"context": context})
def get_stats(self):
"""获取统计信息"""
return {
"error_counts": self.error_counts,
"recent_errors": self.last_errors[-10:],
}
monitor = ErrorMonitor()
def with_monitoring(func: Callable):
"""带监控的装饰器"""
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
error_type = type(e).__name__
monitor.record_error(error_type, e, {"args": str(args)[:100]})
raise
return wrapper
3. 优雅降级
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
class GracefulLLMService:
"""优雅降级的 LLM 服务"""
def __init__(self):
self.primary_model = init_chat_model("openai:gpt-4o")
self.fallback_model = init_chat_model("openai:gpt-4o-mini")
# 预定义的降级响应
self.cache_responses = {
"greeting": "你好!有什么我可以帮助你的吗?",
"farewell": "再见!祝你愉快!",
}
def _match_cache(self, question: str) -> str | None:
"""匹配缓存响应"""
question_lower = question.lower()
if any(word in question_lower for word in ["你好", "hello", "hi"]):
return self.cache_responses["greeting"]
if any(word in question_lower for word in ["再见", "bye", "goodbye"]):
return self.cache_responses["farewell"]
return None
async def ask(self, question: str) -> str:
"""带降级的问答"""
# 第一优先级:缓存响应
cached = self._match_cache(question)
if cached:
return cached
# 第二优先级:主模型
try:
chain = ChatPromptTemplate.from_template("{question}") | self.primary_model
result = await chain.ainvoke({"question": question})
return result.content
except Exception as e:
print(f"主模型失败: {e}")
# 第三优先级:备用模型
try:
chain = ChatPromptTemplate.from_template("{question}") | self.fallback_model
result = await chain.ainvoke({"question": question})
return result.content
except Exception as e:
print(f"备用模型失败: {e}")
# 最后降级:默认响应
return "抱歉,我暂时无法回答这个问题,请稍后再试。"
# 使用
service = GracefulLLMService()
answer = await service.ask("你好")
4. 断路器模式
import time
from enum import Enum
from typing import Optional
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 断开(拒绝请求)
HALF_OPEN = "half_open" # 半开(尝试恢复)
class CircuitBreaker:
"""断路器"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[float] = None
def record_success(self):
"""记录成功"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def should_allow_request(self) -> bool:
"""是否允许请求"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# 检查是否可以尝试恢复
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN:允许一个请求通过
return True
# 使用断路器
circuit_breaker = CircuitBreaker()
async def call_with_circuit_breaker(chain, input_data):
"""带断路器的调用"""
if not circuit_breaker.should_allow_request():
raise Exception("服务暂时不可用(断路器打开)")
try:
result = await chain.ainvoke(input_data)
circuit_breaker.record_success()
return result
except Exception as e:
circuit_breaker.record_failure()
raise
错误处理检查清单
构建生产级应用时,确保处理了以下场景:
- 网络错误(连接失败、超时)
- API 认证错误
- 请求限流
- 服务端错误(5xx)
- 上下文长度超限
- 输出解析失败
- 结构化输出验证失败
- 工具调用失败
- 内存不足
- 请求超时
下一步
参考资源
- [LangChain Error Handling](https://python.langchain.com/docs/how_to/' handling_errors/)
- Tenacity 重试库文档
- OpenAI API 错误码