跳到主要内容

Agent 架构设计

构建一个高效、可靠的 AI Agent 需要深入理解其架构设计。本章将介绍 Agent 的核心架构模式、设计原则以及常见的设计决策。

核心架构模式

单 Agent 架构

最简单的 Agent 架构由一个 LLM 和一组工具组成:

这种架构适合处理单一领域的任务,实现简单,易于调试。但当任务变得复杂时,单个 Agent 可能难以有效管理。

适用场景

  • 功能明确的专业助手(如天气查询、翻译)
  • 流程相对固定的自动化任务
  • 原型开发和概念验证

多 Agent 协作架构

对于复杂任务,可以使用多个 Agent 协作完成:

每个 Agent 专注于特定领域,通过共享记忆或消息传递进行协作。

适用场景

  • 需要多领域知识的复杂任务
  • 大规模自动化流程
  • 需要分工合作的场景

层级 Agent 架构

层级架构模拟人类组织结构,由管理者 Agent 分配任务给执行者 Agent:

监督者负责任务分配和结果整合,规划者负责制定执行计划,执行者负责具体实施。

ReAct 模式

ReAct(Reasoning and Acting)是目前最流行的 Agent 执行模式,它将推理和行动紧密结合。

ReAct 的工作原理

ReAct 模式的核心思想是:Agent 在每一步都会先"思考"(Reason),然后选择一个"行动"(Act),观察结果后再进行下一轮思考。

一个典型的 ReAct 执行流程:

思考:用户想知道北京今天的天气,我需要查询天气信息。
行动:调用天气查询工具,参数:city="北京"
观察:北京今天晴,气温 15-25°C,空气质量良好
思考:已获取天气信息,可以回答用户问题了。
回答:北京今天天气晴朗,气温在 15 到 25 摄氏度之间,空气质量良好。

ReAct 的优势

  • 可解释性强:每一步推理过程都清晰可见
  • 调试方便:容易定位问题出在哪个环节
  • 自我纠错:能够根据观察结果调整策略
  • 灵活适应:可以处理各种意外情况

ReAct 的实现

使用 LangChain 实现 ReAct Agent:

from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from langchain_core.tools import tool

@tool
def search(query: str) -> str:
"""搜索互联网获取信息"""
return f"搜索结果:{query} 的相关信息..."

@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{e}"

tools = [search, calculator]

llm = ChatOpenAI(model="gpt-4", temperature=0)

prompt = hub.pull("hwchase17/react")

agent = create_react_agent(llm, tools, prompt)

agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True
)

result = agent_executor.invoke({
"input": "北京和上海的人口总和是多少?"
})

print(result["output"])

Function Calling 模式

Function Calling 是 OpenAI 推出的一种让 LLM 调用外部函数的机制,它比 ReAct 更加高效和可靠。

工作原理

函数定义格式

Function Calling 使用 JSON Schema 定义函数:

tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如:北京、上海"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位"
}
},
"required": ["city"]
}
}
}
]

完整示例

import json
from openai import OpenAI

client = OpenAI()

def get_weather(city: str, unit: str = "celsius") -> dict:
"""模拟天气查询函数"""
weather_data = {
"北京": {"temp": 22, "condition": "晴"},
"上海": {"temp": 26, "condition": "多云"},
"广州": {"temp": 30, "condition": "雨"}
}

if city in weather_data:
data = weather_data[city]
if unit == "fahrenheit":
data["temp"] = data["temp"] * 9/5 + 32
return data
return {"error": "未找到该城市"}

def run_conversation(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]

tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["city"]
}
}
}
]

response = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
tool_choice="auto"
)

response_message = response.choices[0].message
tool_calls = response_message.tool_calls

if tool_calls:
messages.append(response_message)

for tool_call in tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)

if function_name == "get_weather":
function_response = get_weather(
city=function_args.get("city"),
unit=function_args.get("unit", "celsius")
)

messages.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": json.dumps(function_response, ensure_ascii=False)
})

second_response = client.chat.completions.create(
model="gpt-4",
messages=messages
)

return second_response.choices[0].message.content

return response_message.content

result = run_conversation("北京今天天气怎么样?")
print(result)

规划模块设计

规划模块负责将复杂任务分解为可执行的步骤。

任务分解策略

单步分解:一次性将任务分解为所有子任务

def plan_task(task: str) -> list[str]:
prompt = f"""
请将以下任务分解为具体的执行步骤:
任务:{task}

请以 JSON 数组格式返回步骤列表。
"""

steps = llm.invoke(prompt)
return json.loads(steps)

动态规划:根据执行结果动态调整计划

def dynamic_planning(task: str, context: dict) -> str:
prompt = f"""
当前任务:{task}
已完成的步骤:{context['completed_steps']}
当前状态:{context['current_state']}

请确定下一步应该做什么,或者判断任务是否已完成。
"""

return llm.invoke(prompt)

自我反思机制

让 Agent 能够评估自己的执行结果:

def reflect(task: str, action: str, result: str) -> dict:
prompt = f"""
任务:{task}
执行的动作:{action}
得到的结果:{result}

请评估:
1. 这个结果是否完成了任务?
2. 如果没有,问题出在哪里?
3. 下一步应该怎么做?

以 JSON 格式返回评估结果。
"""

return json.loads(llm.invoke(prompt))

错误处理策略

Agent 在执行过程中可能遇到各种错误,需要设计健壮的错误处理机制。

重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_tool_with_retry(tool, *args, **kwargs):
try:
return tool(*args, **kwargs)
except Exception as e:
print(f"工具执行失败:{e},正在重试...")
raise

优雅降级

def execute_with_fallback(primary_tool, fallback_tool, *args, **kwargs):
try:
return primary_tool(*args, **kwargs)
except Exception as e:
print(f"主工具失败,使用备用工具:{e}")
return fallback_tool(*args, **kwargs)

错误反馈

将错误信息反馈给 LLM,让它尝试其他方法:

def handle_tool_error(error: Exception, agent: Agent, task: str) -> str:
error_message = f"""
执行工具时发生错误:{str(error)}

请尝试:
1. 检查参数是否正确
2. 使用其他工具完成任务
3. 如果无法完成,请告知用户原因
"""

return agent.invoke(error_message)

性能优化

并行工具调用

当多个工具调用相互独立时,可以并行执行:

import asyncio

async def execute_tools_parallel(tool_calls: list) -> list:
tasks = [execute_tool(call) for call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results

缓存机制

缓存工具调用结果,避免重复计算:

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_tool_call(tool_name: str, params_hash: str) -> str:
return execute_tool(tool_name, params_hash)

流式输出

使用流式输出提升用户体验:

async def stream_agent_response(agent: Agent, task: str):
async for chunk in agent.astream(task):
yield chunk

小结

设计一个优秀的 Agent 架构需要考虑多个方面:

  • 根据任务复杂度选择合适的架构模式
  • 理解 ReAct 和 Function Calling 两种核心执行模式
  • 设计有效的任务规划和自我反思机制
  • 建立健壮的错误处理和重试策略
  • 通过并行执行、缓存等手段优化性能

下一章我们将深入学习工具系统的设计与实现。

参考资料