中间件系统
中间件(Middleware)是 LangChain 1.0 提供的核心扩展机制,它允许你在 Agent 执行的不同阶段插入自定义逻辑。通过中间件,你可以实现状态处理、错误处理、日志记录、动态模型选择等高级功能。
什么是中间件?
中间件是一种设计模式,它允许你在不修改 Agent 核心逻辑的情况下,扩展或修改其行为。在 LangChain 1.0 中,中间件可以在以下关键点介入:
- 模型调用前:处理状态、注入上下文
- 模型调用后:修改响应、添加护栏
- 工具调用包装:处理错误、添加日志
- 动态提示词生成:根据状态生成系统提示
为什么需要中间件?
传统方式下,如果想在 Agent 执行过程中添加自定义逻辑,需要:
- 修改 Agent 源代码(难以维护)
- 创建自定义节点(增加复杂性)
- 在工具内部处理(分散逻辑)
中间件提供了一种更优雅的解决方案:
- 关注点分离:将横切关注点(日志、监控、错误处理)与业务逻辑分离
- 可组合性:多个中间件可以按顺序组合
- 可复用性:同一段中间件逻辑可以在多个 Agent 中复用
- 可测试性:中间件可以独立测试
两种钩子风格
LangChain 1.0 的中间件系统提供两种风格的钩子,你可以根据需要选择合适的方式:
Node-style 钩子
Node-style 钩子在特定的执行点按顺序运行,适合用于日志记录、验证和状态更新:
| 钩子 | 执行时机 | 用途 |
|---|---|---|
before_agent | Agent 启动前(每次调用执行一次) | 初始化、验证 |
before_model | 每次模型调用前 | 状态处理、上下文注入 |
after_model | 每次模型响应后 | 护栏、内容过滤 |
after_agent | Agent 完成后(每次调用执行一次) | 清理、日志 |
执行顺序:多个中间件的 before_* 钩子按列表顺序执行,after_* 钩子按逆序执行(类似洋葱模型)。
Wrap-style 钩子
Wrap-style 钩子包装每次模型或工具调用,让你可以控制何时执行处理程序。适合用于重试、缓存和转换:
| 钩子 | 执行时机 | 用途 |
|---|---|---|
wrap_model_call | 包装每次模型调用 | 重试、超时、缓存、动态模型选择 |
wrap_tool_call | 包装每次工具调用 | 错误处理、日志、权限检查 |
核心能力:你可以控制 handler 被调用的次数:
- 零次:短路,不执行实际调用(直接返回结果)
- 一次:正常流程
- 多次:实现重试逻辑
创建中间件的两种方式
LangChain 1.0 支持两种创建中间件的方式:
方式一:装饰器(推荐用于简单场景)
使用装饰器可以快速创建单钩子中间件:
from langchain.agents.middleware import before_model, after_model, wrap_tool_call, dynamic_prompt
from langchain.agents import create_agent
# Node-style 装饰器
@before_model
def log_before_model(state, context):
"""模型调用前记录日志"""
message_count = len(state.get("messages", []))
print(f"[Before Model] 消息数量: {message_count}")
return {} # 返回空字典表示不更新状态
@after_model
def log_after_model(state, context):
"""模型调用后记录日志"""
messages = state.get("messages", [])
if messages:
last_msg = messages[-1]
if hasattr(last_msg, "content"):
print(f"[After Model] 响应: {last_msg.content[:100]}...")
return {}
# Wrap-style 装饰器(使用 yield 语法)
@wrap_tool_call
def handle_tool_errors(request, handler):
"""工具调用错误处理"""
try:
result = yield handler(request)
return result
except Exception as e:
return f"工具执行失败: {str(e)}"
# 动态提示词装饰器
@dynamic_prompt
def create_dynamic_prompt(state, context):
"""动态生成系统提示词"""
from datetime import datetime
today = datetime.now().strftime("%Y年%m月%d日")
return f"你是一个智能助手。今天是{today}。"
# 使用装饰器创建的中间件
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
system_prompt=create_dynamic_prompt, # 动态提示词
middleware=[log_before_model, log_after_model, handle_tool_errors]
)
可用的装饰器:
Node-style:
@before_agent- Agent 启动前执行@before_model- 每次模型调用前执行@after_model- 每次模型响应后执行@after_agent- Agent 完成后执行
Wrap-style:
@wrap_model_call- 包装模型调用@wrap_tool_call- 包装工具调用
便捷装饰器:
@dynamic_prompt- 动态生成系统提示词
装饰器适用场景:
- 只需要单个钩子
- 无需复杂配置
- 快速原型开发
方式二:类(推荐用于复杂场景)
当需要多个钩子或复杂配置时,使用类来定义中间件:
from langchain.agents.middleware import AgentMiddleware
from langchain.agents.middleware.types import ExtendedModelResponse, Command
class MyMiddleware(AgentMiddleware):
"""自定义中间件类"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
def before_model(self, state, context):
"""模型调用前执行
参数:
- state: 当前 Agent 状态(字典)
- context: 运行时上下文
返回:状态更新字典,返回空字典表示不更新状态
"""
message_count = len(state.get("messages", []))
print(f"[Before Model] 消息数量: {message_count}")
return {}
def after_model(self, state, context):
"""模型调用后执行"""
messages = state.get("messages", [])
if messages:
last_msg = messages[-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
print(f"[Tool Calls] {last_msg.tool_calls}")
return {}
def wrap_model_call(self, request, handler):
"""包装模型调用(生成器方法)
你可以控制 handler 被调用的次数:
- 零次:短路,不执行实际调用
- 一次:正常流程
- 多次:重试逻辑
"""
# 可以在这里修改请求
# modified_request = request.copy()
# modified_request["model"] = other_model
response = yield handler(request)
# 可以返回 ExtendedModelResponse 来同时更新状态
# return ExtendedModelResponse(
# response=response,
# command=Command(update={"custom_field": "value"})
# )
return response
def wrap_tool_call(self, request, handler):
"""包装工具调用(生成器方法)"""
tool_name = request.get("name", "unknown")
try:
result = yield handler(request)
return result
except Exception as e:
# 返回友好的错误消息
return f"工具 {tool_name} 执行失败: {str(e)}"
# 使用类中间件
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[MyMiddleware(max_retries=3)]
)
类中间件适用场景:
- 需要定义同步和异步实现(同时定义
before_model和async_before_model) - 需要多个钩子协同工作
- 需要复杂配置(如可配置的阈值、自定义模型)
- 跨项目复用
基础示例
日志中间件
最简单的中间件是记录日志,我们使用装饰器方式实现:
from langchain.agents import create_agent
from langchain.agents.middleware import before_agent, before_model, after_model, after_agent, wrap_tool_call
from langchain.tools import tool
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agent")
# 使用装饰器定义各个钩子
@before_agent
def log_agent_start(state, context):
"""Agent 启动前记录"""
logger.info("[Agent Start] 开始处理请求")
return {}
@before_model
def log_before_model(state, context):
"""模型调用前记录"""
messages = state.get("messages", [])
message_count = len(messages)
logger.info(f"[Before Model] 消息数量: {message_count}")
# 可以在这里添加消息长度检查
total_length = sum(
len(m.content) if hasattr(m, 'content') else 0
for m in messages
)
logger.info(f"[Before Model] 总消息长度: {total_length}")
return {} # 返回空字典表示不更新状态
@after_model
def log_after_model(state, context):
"""模型调用后记录"""
messages = state.get("messages", [])
if messages:
last_msg = messages[-1]
# 记录响应内容
if hasattr(last_msg, "content") and last_msg.content:
logger.info(f"[After Model] 响应: {last_msg.content[:100]}...")
# 记录工具调用
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tc in last_msg.tool_calls:
logger.info(f"[Tool Call] {tc['name']}({tc['args']})")
return {}
@wrap_tool_call
def log_tool_call(request, handler):
"""包装工具调用,记录执行过程"""
tool_name = request.get("name", "unknown")
tool_args = request.get("args", {})
logger.info(f"[Tool Start] {tool_name} 参数: {tool_args}")
try:
# 执行工具调用
result = yield handler(request)
logger.info(f"[Tool End] {tool_name}: {str(result)[:100]}...")
return result
except Exception as e:
logger.error(f"[Tool Error] {tool_name}: {e}")
# 返回友好的错误消息,而不是抛出异常
return f"工具 {tool_name} 执行失败: {str(e)}"
@after_agent
def log_agent_end(state, context):
"""Agent 完成后记录"""
logger.info("[Agent End] 请求处理完成")
return {}
# 定义工具
@tool
def search(query: str) -> str:
"""搜索互联网获取信息"""
return f"搜索结果: {query}"
@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
return str(eval(expression))
except Exception as e:
raise ValueError(f"计算错误: {e}")
# 创建 Agent 并使用中间件
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[search, calculator],
middleware=[
log_agent_start,
log_before_model,
log_after_model,
log_tool_call,
log_agent_end
],
system_prompt="你是一个智能助手。"
)
# 测试
result = agent.invoke({
"messages": [{"role": "user", "content": "搜索 Python 教程,然后计算 2 + 2"}]
})
print(result["messages"][-1].content)
输出示例:
INFO:agent:[Agent Start] 开始处理请求
INFO:agent:[Before Model] 消息数量: 1
INFO:agent:[Before Model] 总消息长度: 25
INFO:agent:[Tool Call] search({'query': 'Python 教程'})
INFO:agent:[Tool Start] search 参数: {'query': 'Python 教程'}
INFO:agent:[Tool End] search: 搜索结果: Python 教程...
INFO:agent:[After Model] 响应: 根据搜索结果...
INFO:agent:[Agent End] 请求处理完成
使用类实现完整日志中间件
对于更复杂的场景,可以使用类来组织多个钩子:
from langchain.agents.middleware import AgentMiddleware
from langchain.agents import create_agent
from langchain.tools import tool
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agent")
class LoggingMiddleware(AgentMiddleware):
"""完整的日志记录中间件"""
def __init__(self, log_level: str = "INFO"):
self.log_level = log_level
def before_agent(self, state, context):
"""Agent 启动前"""
logger.info("[Agent Start] 开始处理请求")
return {}
def before_model(self, state, context):
"""模型调用前"""
messages = state.get("messages", [])
logger.info(f"[Before Model] 消息数量: {len(messages)}")
return {}
def after_model(self, state, context):
"""模型调用后"""
messages = state.get("messages", [])
if messages:
last_msg = messages[-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tc in last_msg.tool_calls:
logger.info(f"[Tool Call] {tc['name']}")
return {}
def wrap_tool_call(self, request, handler):
"""包装工具调用"""
tool_name = request.get("name", "unknown")
try:
result = yield handler(request)
logger.info(f"[Tool Success] {tool_name}")
return result
except Exception as e:
logger.error(f"[Tool Error] {tool_name}: {e}")
return f"工具 {tool_name} 执行失败: {str(e)}"
def after_agent(self, state, context):
"""Agent 完成后"""
logger.info("[Agent End] 请求处理完成")
return {}
# 使用类中间件
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[LoggingMiddleware(log_level="DEBUG")]
)
消息裁剪中间件
当对话历史过长时,自动裁剪消息:
from langchain.agents.middleware import AgentMiddleware, before_model
from langchain.messages import trim_messages, SystemMessage
from langchain.chat_models import init_chat_model
# 使用装饰器方式
@before_model
def trim_messages_middleware(state, context):
"""裁剪过长的消息历史"""
max_messages = 20
messages = state.get("messages", [])
if len(messages) > max_messages:
# 分离系统消息和普通消息
system_messages = [m for m in messages if isinstance(m, SystemMessage)]
other_messages = [m for m in messages if not isinstance(m, SystemMessage)]
# 保留最近的对话
recent_messages = other_messages[-(max_messages - len(system_messages)):]
# 如果裁剪幅度大,添加历史摘要
if len(other_messages) - len(recent_messages) > 5:
return {
"messages": [
*system_messages,
SystemMessage(content=f"[历史摘要] 之前讨论了 {len(other_messages) - len(recent_messages)} 条消息"),
*recent_messages
]
}
return {"messages": [*system_messages, *recent_messages]}
return {}
# 或使用类方式
class MessageTrimMiddleware(AgentMiddleware):
"""消息裁剪中间件"""
def __init__(self, max_messages: int = 20, max_tokens: int = 4000):
self.max_messages = max_messages
self.max_tokens = max_tokens
self.llm = init_chat_model("openai:gpt-4o-mini")
def before_model(self, state, context):
"""裁剪过长的消息历史"""
messages = state.get("messages", [])
if len(messages) > self.max_messages:
# 分离系统消息和普通消息
system_messages = [m for m in messages if isinstance(m, SystemMessage)]
other_messages = [m for m in messages if not isinstance(m, SystemMessage)]
# 保留最近的对话
recent_messages = other_messages[-(self.max_messages - len(system_messages)):]
# 如果裁剪幅度大,添加历史摘要
if len(other_messages) - len(recent_messages) > 5:
summary = self._summarize_old_messages(other_messages[:-len(recent_messages)])
return {
"messages": [
*system_messages,
SystemMessage(content=f"[历史摘要] {summary}"),
*recent_messages
]
}
return {"messages": [*system_messages, *recent_messages]}
return {}
def _summarize_old_messages(self, messages: list) -> str:
"""总结旧消息"""
# 简化实现,实际可以使用 LLM 生成
return f"之前的对话中讨论了 {len(messages)} 条消息"
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[MessageTrimMiddleware(max_messages=10)]
)
高级用法
动态模型选择
根据任务复杂度选择不同模型:
from langchain.agents.middleware import wrap_model_call, AgentMiddleware
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
# 使用装饰器方式
@wrap_model_call
def dynamic_model_selector(request, handler):
"""根据任务复杂度选择模型"""
model_request = request.get("model_request", {})
messages = model_request.get("messages", [])
# 分析最后一条用户消息的复杂度
is_complex = False
if messages:
for msg in reversed(messages):
if hasattr(msg, "type") and msg.type == "human":
content = msg.content.lower()
# 简单启发式:根据消息长度和关键词判断复杂度
is_complex = (
len(content) > 200 or
any(kw in content for kw in ["分析", "比较", "设计", "复杂", "详细"])
)
break
# 可以在这里修改模型(需要配合其他机制)
response = yield handler(request)
return response
# 使用类方式(更完整)
class DynamicModelMiddleware(AgentMiddleware):
"""动态模型选择中间件"""
def __init__(self):
self.default_model = init_chat_model("openai:gpt-4o-mini")
self.powerful_model = init_chat_model("openai:gpt-4o")
def wrap_model_call(self, request, handler):
"""根据任务复杂度选择模型"""
state = request.get("state", {})
messages = state.get("messages", [])
# 分析最后一条用户消息的复杂度
if messages:
last_user_msg = None
for msg in reversed(messages):
if hasattr(msg, "type") and msg.type == "human":
last_user_msg = msg.content
break
# 简单启发式:根据消息长度和关键词判断复杂度
is_complex = (
len(last_user_msg) > 200 or
any(kw in last_user_msg.lower() for kw in ["分析", "比较", "设计", "复杂"])
)
if is_complex:
print(f"[DynamicModel] 使用 GPT-4o 处理复杂任务")
# 修改模型请求,使用更强大的模型
modified_request = request.copy()
modified_request["model"] = self.powerful_model
return (yield handler(modified_request))
return (yield handler(request))
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[DynamicModelMiddleware()]
)
错误处理中间件
统一处理工具调用错误:
from langchain.agents.middleware import wrap_tool_call, AgentMiddleware
from langchain.messages import ToolMessage
# 使用装饰器方式
@wrap_tool_call
def error_handler(request, handler):
"""工具调用错误处理"""
tool_name = request.get("name", "unknown")
try:
result = yield handler(request)
return result
except Exception as e:
# 返回友好的错误消息
return f"工具 {tool_name} 执行失败: {str(e)}。请尝试其他方法。"
# 使用类方式(支持重试逻辑)
class ErrorHandlingMiddleware(AgentMiddleware):
"""错误处理中间件,支持重试"""
def __init__(self, max_retries: int = 2):
self.max_retries = max_retries
def wrap_tool_call(self, request, handler):
"""包装工具调用,添加重试逻辑"""
tool_id = request.get("id", "unknown")
tool_name = request.get("name", "unknown")
last_error = None
for attempt in range(self.max_retries + 1):
try:
result = yield handler(request)
return result
except Exception as e:
last_error = e
if attempt < self.max_retries:
print(f"[Retry] {tool_name} 第 {attempt + 1} 次重试...")
continue
# 超过重试次数,返回错误消息
return ToolMessage(
content=f"工具 {tool_name} 多次执行失败: {str(last_error)}。请尝试其他方法。",
tool_call_id=tool_id
)
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[ErrorHandlingMiddleware(max_retries=2)]
)
人工确认中间件
对敏感操作进行人工确认:
from langchain.agents.middleware import AgentMiddleware, after_model
from typing import Set
# 使用装饰器方式(配合中断机制)
@after_model
def check_sensitive_tools(state, context):
"""检查是否需要人工确认"""
sensitive_tools = {"send_email", "delete_file", "transfer_money"}
messages = state.get("messages", [])
if not messages:
return {}
last_msg = messages[-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tc in last_msg.tool_calls:
if tc["name"] in sensitive_tools:
print(f"[需要确认] 即将执行敏感操作: {tc['name']}")
# 设置中断标志
return {"__requires_confirmation__": True, "__tool__": tc}
return {}
# 使用类方式
class HumanApprovalMiddleware(AgentMiddleware):
"""人工确认中间件"""
def __init__(self, sensitive_tools: Set[str]):
self.sensitive_tools = sensitive_tools
def after_model(self, state, context):
"""检查是否需要人工确认"""
messages = state.get("messages", [])
if not messages:
return {}
last_msg = messages[-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
for tc in last_msg.tool_calls:
if tc["name"] in self.sensitive_tools:
# 返回中断标志
return {
"__interrupt__": True,
"__pending_tool__": tc
}
return {}
# 使用示例
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[send_email, delete_file, search],
middleware=[HumanApprovalMiddleware({"send_email", "delete_file"})],
interrupt_before=["tools"] # 配合中断机制
)
# 执行时需要处理中断
config = {"configurable": {"thread_id": "session-1"}}
result = agent.invoke({"messages": [{"role": "user", "content": "发送邮件"}]}, config)
# 检查是否需要确认
state = agent.get_state(config)
if state.values.get("__interrupt__"):
# 等待用户确认
user_input = input("确认执行此操作?(y/n): ")
if user_input.lower() == 'y':
result = agent.invoke(None, config) # 继续
内容过滤中间件
过滤敏感内容:
import re
from langchain.agents.middleware import AgentMiddleware, wrap_tool_call, after_model
from langchain.messages import ToolMessage
# 使用装饰器方式
@wrap_tool_call
def filter_sensitive_content(request, handler):
"""过滤工具返回中的敏感信息"""
result = yield handler(request)
if isinstance(result, ToolMessage):
filtered_content = _filter_pii(result.content)
if filtered_content != result.content:
result.content = filtered_content
return result
def _filter_pii(text: str) -> str:
"""过滤敏感信息"""
patterns = [
(r'\b\d{16,19}\b', '[卡号已隐藏]'),
(r'\b\d{17}[\dXx]\b', '[身份证已隐藏]'),
(r'\b1[3-9]\d{9}\b', '[手机号已隐藏]'),
(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[邮箱已隐藏]'),
]
for pattern, replacement in patterns:
text = re.sub(pattern, replacement, text)
return text
# 使用类方式
class ContentFilterMiddleware(AgentMiddleware):
"""内容过滤中间件"""
def __init__(self, custom_patterns: list = None):
self.patterns = [
(r'\b\d{16,19}\b', '[卡号已隐藏]'),
(r'\b\d{17}[\dXx]\b', '[身份证已隐藏]'),
(r'\b1[3-9]\d{9}\b', '[手机号已隐藏]'),
(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[邮箱已隐藏]'),
]
if custom_patterns:
self.patterns.extend(custom_patterns)
def after_model(self, state, context):
"""过滤模型响应中的敏感信息"""
messages = state.get("messages", [])
if not messages:
return {}
last_msg = messages[-1]
if hasattr(last_msg, "content") and last_msg.content:
filtered_content = self._filter(last_msg.content)
if filtered_content != last_msg.content:
last_msg.content = filtered_content
return {"messages": messages}
return {}
def wrap_tool_call(self, request, handler):
"""过滤工具返回中的敏感信息"""
result = yield handler(request)
if isinstance(result, ToolMessage):
filtered_content = self._filter(result.content)
if filtered_content != result.content:
result.content = filtered_content
return result
def _filter(self, text: str) -> str:
"""过滤敏感信息"""
for pattern, replacement in self.patterns:
text = re.sub(pattern, replacement, text)
return text
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[ContentFilterMiddleware()]
)
内置中间件
LangChain 提供了多个开箱即用的内置中间件,覆盖了常见的使用场景。
SummarizationMiddleware(对话总结)
自动总结对话历史,防止上下文超出限制:
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini", # 用于总结的模型
trigger=("tokens", 4000), # token 超过 4000 时触发
keep=("messages", 20), # 保留最近 20 条消息
)
]
)
触发条件(trigger):
("tokens", N)- token 数量超过 N("messages", N)- 消息数量超过 N("fraction", 0.8)- 达到上下文窗口的 80%
保留策略(keep):
("tokens", N)- 保留 N 个 token("messages", N)- 保留最近 N 条消息("fraction", 0.3)- 保留上下文窗口的 30%
ToolCallLimitMiddleware(工具调用限制)
限制工具调用次数,防止无限循环或滥用:
from langchain.agents.middleware import ToolCallLimitMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
ToolCallLimitMiddleware(
run_limit=10, # 单次调用最多 10 次工具
thread_limit=50, # 整个会话最多 50 次
on_exceed="continue" # 超限时的行为
)
]
)
on_exceed 选项:
"continue"- 继续执行,返回错误消息"error"- 抛出异常,停止执行"end"- 直接结束执行
PIIMiddleware(敏感信息过滤)
检测和处理 PII(个人身份信息):
from langchain.agents.middleware import PIIMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
PIIMiddleware("email", strategy="mask"), # 掩码邮箱
PIIMiddleware("phone", strategy="block"), # 阻止电话
PIIMiddleware("credit_card", strategy="hash"), # 哈希信用卡
]
)
内置 PII 类型:
email- 电子邮件phone- 电话号码credit_card- 信用卡号ssn- 社会安全号
处理策略(strategy):
"mask"- 替换为[邮箱已隐藏]等"block"- 阻止包含 PII 的内容"hash"- 使用哈希值替换
自定义检测器:
import re
def detect_ssn(content: str) -> list[dict]:
"""自定义 SSN 检测器"""
matches = []
pattern = r"\d{3}-\d{2}-\d{4}"
for match in re.finditer(pattern, content):
matches.append({
"text": match.group(),
"start": match.start(),
"end": match.end(),
})
return matches
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
PIIMiddleware("custom_ssn", detector=detect_ssn, strategy="mask")
]
)
LLMToolSelectorMiddleware(智能工具选择)
当工具数量很多时,使用 LLM 智能选择相关工具:
from langchain.agents.middleware import LLMToolSelectorMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[tool1, tool2, tool3, ..., tool20], # 20 个工具
middleware=[
LLMToolSelectorMiddleware(
model="openai:gpt-4o-mini", # 用于选择工具的模型
)
]
)
优点:
- 减少 token 消耗
- 提高模型准确性
- 避免工具过载
LLMToolEmulator(工具模拟)
用于测试,使用 LLM 模拟工具执行:
from langchain.agents.middleware import LLMToolEmulator
# 测试时模拟工具,避免调用真实 API
agent = create_agent(
model="openai:gpt-4o",
tools=[expensive_api_tool, production_database_tool],
middleware=[
LLMToolEmulator(tools=["expensive_api_tool"]) # 只模拟特定工具
]
)
# 模拟所有工具
agent = create_agent(
model="openai:gpt-4o",
tools=[tool1, tool2],
middleware=[LLMToolEmulator()] # 模拟全部
)
自定义内置中间件组合
多个中间件可以组合使用:
from langchain.agents import create_agent
from langchain.agents.middleware import (
SummarizationMiddleware,
ToolCallLimitMiddleware,
PIIMiddleware,
)
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini",
trigger=("fraction", 0.8),
keep=("messages", 20),
),
ToolCallLimitMiddleware(run_limit=15),
PIIMiddleware("email", strategy="mask"),
PIIMiddleware("phone", strategy="mask"),
]
)
动态系统提示词
使用 @dynamic_prompt 装饰器创建动态系统提示词:
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt
from datetime import datetime
@dynamic_prompt
def create_system_prompt(state, context):
"""动态生成系统提示词"""
today = datetime.now().strftime("%Y年%m月%d日")
weekday = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"][datetime.now().weekday()]
# 获取用户信息
user_id = context.get("configurable", {}).get("user_id", "guest")
return f"""你是一个智能助手。
当前时间:{today} {weekday}
用户 ID:{user_id}
请根据用户的请求提供帮助。"""
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
system_prompt=create_system_prompt
)
基于状态的动态提示词
from langchain.agents.middleware import dynamic_prompt
@dynamic_prompt
def context_aware_prompt(state, context):
"""根据对话状态生成提示词"""
messages = state.get("messages", [])
# 分析对话主题
topics = set()
for msg in messages:
if hasattr(msg, "content"):
content = msg.content.lower()
if "代码" in content or "编程" in content:
topics.add("编程")
if "天气" in content:
topics.add("天气")
if "新闻" in content:
topics.add("新闻")
base_prompt = "你是一个智能助手。"
if topics:
topic_str = "、".join(topics)
base_prompt += f"\n\n当前对话涉及以下主题:{topic_str}。请提供相关的专业建议。"
return base_prompt
中间件组合
多个中间件按顺序执行:
from langchain.agents import create_agent
# 定义多个中间件
logging_middleware = LoggingMiddleware()
trim_middleware = MessageTrimMiddleware(max_messages=20)
error_middleware = ErrorHandlingMiddleware()
filter_middleware = ContentFilterMiddleware()
# 中间件按顺序执行
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
middleware=[
logging_middleware, # 1. 先记录日志
trim_middleware, # 2. 裁剪消息
error_middleware, # 3. 处理错误
filter_middleware, # 4. 过滤内容
]
)
执行顺序:
请求 → 日志 → 裁剪 → 错误处理 → 过滤 → 模型 → 过滤 → 错误处理 → 裁剪 → 日志 → 响应
↓ ↓ ↓ ↓ ↑ ↑ ↑ ↑
before before wrap wrap after after wrap wrap
model model tool model model model tool tool
最佳实践
1. 单一职责
每个中间件只做一件事:
# 好:职责清晰
from langchain.agents.middleware import AgentMiddleware
class LoggingMiddleware(AgentMiddleware):
"""只负责日志"""
pass
class ErrorHandlingMiddleware(AgentMiddleware):
"""只负责错误处理"""
pass
# 不好:职责混杂
class LoggingAndErrorMiddleware(AgentMiddleware):
"""既做日志又做错误处理"""
pass
2. 无状态设计
中间件应该是无状态的,状态应该存储在 Agent 的 state 中:
from langchain.agents.middleware import AgentMiddleware
# 好:使用 state
class CounterMiddleware(AgentMiddleware):
def before_model(self, state, context):
count = state.get("call_count", 0)
return {"call_count": count + 1}
# 不好:使用实例变量(多个请求会共享)
class BadMiddleware(AgentMiddleware):
def __init__(self):
self.count = 0 # 多个请求会共享这个变量
def before_model(self, state, context):
self.count += 1 # 可能导致竞态条件
return {}
3. 返回值规范
from langchain.agents.middleware import AgentMiddleware
class MyMiddleware(AgentMiddleware):
def before_model(self, state, context):
# 返回空字典表示不更新状态
if condition:
return {}
# 返回状态更新
return {"key": "value"}
# 不要返回 None(虽然会被视为空字典,但不够明确)
# return None
4. 错误处理
在中间件中妥善处理异常:
from langchain.agents.middleware import AgentMiddleware, wrap_tool_call
from langchain.messages import ToolMessage
import logging
logger = logging.getLogger(__name__)
@wrap_tool_call
def safe_tool_execution(request, handler):
"""安全的工具执行"""
try:
result = yield handler(request)
return result
except Exception as e:
# 记录错误但不中断执行
logger.error(f"Middleware error: {e}")
# 返回默认结果
return ToolMessage(
content="处理过程中出现错误,请重试",
tool_call_id=request.get("id")
)
5. 执行顺序理解
理解多个中间件的执行顺序很重要:
# 假设有三个中间件 [M1, M2, M3]
# before_* 钩子执行顺序:M1 → M2 → M3
# after_* 钩子执行顺序:M3 → M2 → M1(逆序)
# wrap_* 钩子嵌套执行:M1(M2(M3(handler)))
# 示例:
# M1.before_model() → M2.before_model() → M3.before_model()
# → 模型调用 →
# M3.after_model() → M2.after_model() → M1.after_model()
关键规则:
before_*钩子:按列表顺序执行(第一个到最后一个)after_*钩子:按逆序执行(最后一个到第一个)wrap_*钩子:嵌套执行(第一个中间件包装所有其他)
6. 动态提示词最佳实践
使用 @dynamic_prompt 装饰器创建动态系统提示词:
from langchain.agents.middleware import dynamic_prompt
@dynamic_prompt
def context_aware_prompt(state, context):
"""根据对话状态生成提示词"""
from datetime import datetime
today = datetime.now().strftime("%Y年%m月%d日")
messages = state.get("messages", [])
# 分析对话主题
topics = set()
for msg in messages:
if hasattr(msg, "content"):
content = msg.content.lower()
if "代码" in content or "编程" in content:
topics.add("编程")
if "天气" in content:
topics.add("天气")
base_prompt = f"你是一个智能助手。今天是{today}。"
if topics:
topic_str = "、".join(topics)
base_prompt += f"\n\n当前对话涉及以下主题:{topic_str}。请提供相关的专业建议。"
return base_prompt
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[...],
system_prompt=context_aware_prompt
)
小结
中间件是 LangChain 1.0 的核心扩展机制:
- Middleware 类:继承此类并实现需要的钩子方法
- 钩子方法:
before_model、after_model、wrap_model_call、wrap_tool_call等 - 典型应用:日志记录、消息裁剪、错误处理、人工确认、内容过滤
- 动态提示词:使用
@dynamic_prompt装饰器 - 组合使用:多个中间件按顺序执行
- 最佳实践:单一职责、无状态设计、规范返回值
合理使用中间件可以让 Agent 更加灵活、健壮、可维护。