Agent 架构设计
构建一个高效、可靠的 AI Agent 需要深入理解其架构设计。本章将介绍 Agent 的核心架构模式、设计原则以及常见的设计决策。
核心架构模式
单 Agent 架构
最简单的 Agent 架构由一个 LLM 和一组工具组成:
这种架构适合处理单一领域的任务,实现简单,易于调试。但当任务变得复杂时,单个 Agent 可能难以有效管理。
适用场景:
- 功能明确的专业助手(如天气查询、翻译)
- 流程相对固定的自动化任务
- 原型开发和概念验证
多 Agent 协作架构
对于复杂任务,可以使用多个 Agent 协作完成:
每个 Agent 专注于特定领域,通过共享记忆或消息传递进行协作。
适用场景:
- 需要多领域知识的复杂任务
- 大规模自动化流程
- 需要分工合作的场景
层级 Agent 架构
层级架构模拟人类组织结构,由管理者 Agent 分配任务给执行者 Agent:
监督者负责任务分配和结果整合,规划者负责制定执行计划,执行者负责具体实施。
ReAct 模式
ReAct(Reasoning and Acting)是目前最流行的 Agent 执行模式,它将推理和行动紧密结合。
ReAct 的工作原理
ReAct 模式的核心思想是:Agent 在每一步都会先"思考"(Reason),然后选择一个"行动"(Act),观察结果后再进行下一轮思考。
一个典型的 ReAct 执行流程:
思考:用户想知道北京今天的天气,我需要查询天气信息。
行动:调用天气查询工具,参数:city="北京"
观察:北京今天晴,气温 15-25°C,空气质量良好
思考:已获取天气信息,可以回答用户问题了。
回答:北京今天天气晴朗,气温在 15 到 25 摄氏度之间,空气质量良好。
ReAct 的优势
- 可解释性强:每一步推理过程都清晰可见
- 调试方便:容易定位问题出在哪个环节
- 自我纠错:能够根据观察结果调整策略
- 灵活适应:可以处理各种意外情况
ReAct 的实现
使用 LangChain 实现 ReAct Agent:
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from langchain_core.tools import tool
@tool
def search(query: str) -> str:
"""搜索互联网获取信息"""
return f"搜索结果:{query} 的相关信息..."
@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{e}"
tools = [search, calculator]
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)
result = agent_executor.invoke({
"input": "北京和上海的人口总和是多少?"
})
print(result["output"])
Function Calling 模式
Function Calling 是 OpenAI 推出的一种让 LLM 调用外部函数的机制,它比 ReAct 更加高效和可靠。
工作原理
函数定义格式
Function Calling 使用 JSON Schema 定义函数:
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如:北京、上海"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["city"]
}
}
}
]
完整示例
import json
from openai import OpenAI
client = OpenAI()
def get_weather(city: str, unit: str = "celsius") -> dict:
"""模拟天气查询函数"""
weather_data = {
"北京": {"temp": 22, "condition": "晴"},
"上海": {"temp": 26, "condition": "多云"},
"广州": {"temp": 30, "condition": "雨"}
}
if city in weather_data:
data = weather_data[city]
if unit == "fahrenheit":
data["temp"] = data["temp"] * 9/5 + 32
return data
return {"error": "未找到该城市"}
def run_conversation(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["city"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
tool_choice="auto"
)
response_message = response.choices[0].message
tool_calls = response_message.tool_calls
if tool_calls:
messages.append(response_message)
for tool_call in tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
if function_name == "get_weather":
function_response = get_weather(
city=function_args.get("city"),
unit=function_args.get("unit", "celsius")
)
messages.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": json.dumps(function_response, ensure_ascii=False)
})
second_response = client.chat.completions.create(
model="gpt-4",
messages=messages
)
return second_response.choices[0].message.content
return response_message.content
result = run_conversation("北京今天天气怎么样?")
print(result)
规划模块设计
规划模块负责将复杂任务分解为可执行的步骤。
任务分解策略
单步分解:一次性将任务分解为所有子任务
def plan_task(task: str) -> list[str]:
prompt = f"""
请将以下任务分解为具体的执行步骤:
任务:{task}
请以 JSON 数组格式返回步骤列表。
"""
steps = llm.invoke(prompt)
return json.loads(steps)
动态规划:根据执行结果动态调整计划
def dynamic_planning(task: str, context: dict) -> str:
prompt = f"""
当前任务:{task}
已完成的步骤:{context['completed_steps']}
当前状态:{context['current_state']}
请确定下一步应该做什么,或者判断任务是否已完成。
"""
return llm.invoke(prompt)
自我反思机制
让 Agent 能够评估自己的执行结果:
def reflect(task: str, action: str, result: str) -> dict:
prompt = f"""
任务:{task}
执行的动作:{action}
得到的结果:{result}
请评估:
1. 这个结果是否完成了任务?
2. 如果没有,问题出在哪里?
3. 下一步应该怎么做?
以 JSON 格式返回评估结果。
"""
return json.loads(llm.invoke(prompt))
错误处理策略
Agent 在执行过程中可能遇到各种错误,需要设计健壮的错误处理机制。
重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_tool_with_retry(tool, *args, **kwargs):
try:
return tool(*args, **kwargs)
except Exception as e:
print(f"工具执行失败:{e},正在重试...")
raise
优雅降级
def execute_with_fallback(primary_tool, fallback_tool, *args, **kwargs):
try:
return primary_tool(*args, **kwargs)
except Exception as e:
print(f"主工具失败,使用备用工具:{e}")
return fallback_tool(*args, **kwargs)
错误反馈
将错误信息反馈给 LLM,让它尝试其他方法:
def handle_tool_error(error: Exception, agent: Agent, task: str) -> str:
error_message = f"""
执行工具时发生错误:{str(error)}
请尝试:
1. 检查参数是否正确
2. 使用其他工具完成任务
3. 如果无法完成,请告知用户原因
"""
return agent.invoke(error_message)
性能优化
并行工具调用
当多个工具调用相互独立时,可以并行执行:
import asyncio
async def execute_tools_parallel(tool_calls: list) -> list:
tasks = [execute_tool(call) for call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
缓存机制
缓存工具调用结果,避免重复计算:
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_tool_call(tool_name: str, params_hash: str) -> str:
return execute_tool(tool_name, params_hash)
流式输出
使用流式输出提升用户体验:
async def stream_agent_response(agent: Agent, task: str):
async for chunk in agent.astream(task):
yield chunk
小结
设计一个优秀的 Agent 架构需要考虑多个方面:
- 根据任务复杂度选择合适的架构模式
- 理解 ReAct 和 Function Calling 两种核心执行模式
- 设计有效的任务规划和自我反思机制
- 建立健壮的错误处理和重试策略
- 通过并行执行、缓存等手段优化性能
下一章我们将深入学习工具系统的设计与实现。