多 Agent 协作
当任务变得复杂时,单个 Agent 可能难以有效处理。多 Agent 协作系统通过让多个专业化的 Agent 分工合作,能够更好地完成复杂任务。本章将介绍多 Agent 系统的设计模式和实现方法。
为什么需要多 Agent 协作
单个 Agent 面临的挑战:
- 能力边界:一个 Agent 难以精通所有领域
- 上下文限制:复杂任务的上下文可能超出 LLM 的处理能力
- 并行处理:单 Agent 只能串行执行任务
- 专业化需求:不同子任务需要不同的专业知识
多 Agent 协作的优势:
- 分工明确:每个 Agent 专注于自己擅长的领域
- 并行执行:多个 Agent 可以同时工作
- 知识整合:综合多个专业领域的知识
- 容错能力:一个 Agent 失败不影响整体系统
协作模式
顺序协作
Agent 按顺序执行,前一个 Agent 的输出作为后一个 Agent 的输入:
适用场景:流水线式任务,如内容创作、数据分析报告。
层级协作
一个主控 Agent 负责任务分配和结果整合:
适用场景:需要协调多个专业领域的复杂任务。
对等协作
多个 Agent 平等协作,通过消息传递交流:
适用场景:需要频繁交互和协商的任务。
使用 LangGraph 实现多 Agent 系统
顺序协作示例
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator
class ResearchState(TypedDict):
messages: Annotated[list, operator.add]
research_result: str
draft: str
final_output: str
llm = ChatOpenAI(model="gpt-4", temperature=0)
def research_agent(state: ResearchState) -> dict:
"""研究 Agent:收集信息"""
messages = state["messages"]
prompt = f"""你是一个研究助手。请根据用户请求收集和整理相关信息。
用户请求:{messages[0].content}
请提供详细的研究结果。"""
response = llm.invoke(prompt)
return {
"research_result": response.content,
"messages": [AIMessage(content=f"[研究结果]\n{response.content}")]
}
def writing_agent(state: ResearchState) -> dict:
"""写作 Agent:撰写初稿"""
research_result = state["research_result"]
prompt = f"""你是一个写作助手。请根据研究结果撰写文章初稿。
研究结果:
{research_result}
请撰写一篇结构清晰、内容丰富的文章。"""
response = llm.invoke(prompt)
return {
"draft": response.content,
"messages": [AIMessage(content=f"[文章初稿]\n{response.content[:200]}...")]
}
def review_agent(state: ResearchState) -> dict:
"""审核 Agent:审核并优化"""
draft = state["draft"]
prompt = f"""你是一个审核助手。请审核文章并提出改进建议,然后输出最终版本。
文章初稿:
{draft}
请审核文章的:
1. 内容准确性
2. 结构合理性
3. 语言流畅性
然后输出优化后的最终版本。"""
response = llm.invoke(prompt)
return {
"final_output": response.content,
"messages": [AIMessage(content=f"[最终版本]\n{response.content}")]
}
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_agent)
workflow.add_node("writing", writing_agent)
workflow.add_node("review", review_agent)
workflow.set_entry_point("research")
workflow.add_edge("research", "writing")
workflow.add_edge("writing", "review")
workflow.add_edge("review", END)
app = workflow.compile()
inputs = {
"messages": [HumanMessage(content="写一篇关于人工智能发展历史的文章")],
"research_result": "",
"draft": "",
"final_output": ""
}
result = app.invoke(inputs)
print("=== 最终输出 ===")
print(result["final_output"])
层级协作示例
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator
import json
class HierarchicalState(TypedDict):
messages: Annotated[list, operator.add]
task: str
subtasks: List[dict]
results: List[dict]
final_output: str
llm = ChatOpenAI(model="gpt-4", temperature=0)
def supervisor_agent(state: HierarchicalState) -> dict:
"""主控 Agent:任务分解和结果整合"""
task = state["task"]
if not state.get("subtasks"):
prompt = f"""你是一个项目主管。请将以下任务分解为子任务,并分配给合适的专家。
任务:{task}
可用的专家:
- 研究专家:负责信息收集和分析
- 写作专家:负责内容创作
- 数据专家:负责数据处理和计算
请以 JSON 格式返回子任务列表,格式如下:
{{"subtasks": [{{"expert": "专家类型", "task": "子任务描述"}}]}}"""
response = llm.invoke(prompt)
try:
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
subtasks = json.loads(content.strip())["subtasks"]
except:
subtasks = [{"expert": "研究专家", "task": task}]
return {"subtasks": subtasks}
if state.get("results") and len(state["results"]) == len(state["subtasks"]):
results_text = "\n".join([
f"- {r['expert']}: {r['result'][:200]}..."
for r in state["results"]
])
prompt = f"""你是一个项目主管。请整合以下专家的工作成果,生成最终输出。
原始任务:{task}
专家成果:
{results_text}
请生成完整、连贯的最终输出。"""
response = llm.invoke(prompt)
return {
"final_output": response.content,
"messages": [AIMessage(content=response.content)]
}
return {}
def research_expert(state: HierarchicalState) -> dict:
"""研究专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}
subtask = state["subtasks"][current_idx]
if subtask["expert"] != "研究专家":
return {}
prompt = f"""你是一个研究专家。请完成以下任务:
任务:{subtask['task']}
请提供详细的研究结果。"""
response = llm.invoke(prompt)
return {
"results": [{"expert": "研究专家", "result": response.content}]
}
def writing_expert(state: HierarchicalState) -> dict:
"""写作专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}
subtask = state["subtasks"][current_idx]
if subtask["expert"] != "写作专家":
return {}
prompt = f"""你是一个写作专家。请完成以下任务:
任务:{subtask['task']}
请提供高质量的内容。"""
response = llm.invoke(prompt)
return {
"results": [{"expert": "写作专家", "result": response.content}]
}
def data_expert(state: HierarchicalState) -> dict:
"""数据专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}
subtask = state["subtasks"][current_idx]
if subtask["expert"] != "数据专家":
return {}
prompt = f"""你是一个数据专家。请完成以下任务:
任务:{subtask['task']}
请提供数据分析和处理结果。"""
response = llm.invoke(prompt)
return {
"results": [{"expert": "数据专家", "result": response.content}]
}
def route_to_expert(state: HierarchicalState) -> str:
"""路由到下一个专家"""
if not state.get("subtasks"):
return "supervisor"
results_count = len(state.get("results", []))
if results_count >= len(state["subtasks"]):
return "supervisor"
next_subtask = state["subtasks"][results_count]
expert_type = next_subtask["expert"]
if expert_type == "研究专家":
return "research"
elif expert_type == "写作专家":
return "writing"
elif expert_type == "数据专家":
return "data"
return "supervisor"
workflow = StateGraph(HierarchicalState)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_expert)
workflow.add_node("writing", writing_expert)
workflow.add_node("data", data_expert)
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route_to_expert, {
"research": "research",
"writing": "writing",
"data": "data",
"supervisor": END
})
workflow.add_edge("research", "supervisor")
workflow.add_edge("writing", "supervisor")
workflow.add_edge("data", "supervisor")
app = workflow.compile()
inputs = {
"messages": [HumanMessage(content="帮我完成一个市场分析报告")],
"task": "完成一个市场分析报告",
"subtasks": [],
"results": [],
"final_output": ""
}
result = app.invoke(inputs)
print("=== 最终输出 ===")
print(result["final_output"])
Agent 通信机制
共享状态
最简单的通信方式是通过共享状态:
from typing import TypedDict, List
class SharedState(TypedDict):
"""共享状态"""
task: str
agent_outputs: dict
shared_knowledge: List[str]
def agent_a(state: SharedState) -> dict:
"""Agent A 写入共享状态"""
result = f"Agent A 的处理结果"
return {
"agent_outputs": {**state.get("agent_outputs", {}), "A": result},
"shared_knowledge": state.get("shared_knowledge", []) + ["A 完成了任务"]
}
def agent_b(state: SharedState) -> dict:
"""Agent B 读取共享状态"""
a_result = state["agent_outputs"].get("A", "")
result = f"Agent B 基于 A 的结果:{a_result}"
return {
"agent_outputs": {**state["agent_outputs"], "B": result}
}
消息传递
使用消息队列进行 Agent 间通信:
from typing import TypedDict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AgentMessage:
"""Agent 消息"""
sender: str
receiver: str
content: str
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class MessageBus:
"""消息总线"""
def __init__(self):
self.queues: dict[str, List[AgentMessage]] = {}
def register(self, agent_name: str):
"""注册 Agent"""
self.queues[agent_name] = []
def send(self, message: AgentMessage):
"""发送消息"""
if message.receiver in self.queues:
self.queues[message.receiver].append(message)
def receive(self, agent_name: str) -> List[AgentMessage]:
"""接收消息"""
messages = self.queues.get(agent_name, [])
self.queues[agent_name] = []
return messages
message_bus = MessageBus()
message_bus.register("agent_a")
message_bus.register("agent_b")
message_bus.send(AgentMessage(
sender="agent_a",
receiver="agent_b",
content="请处理这个任务"
))
messages = message_bus.receive("agent_b")
for msg in messages:
print(f"收到来自 {msg.sender} 的消息:{msg.content}")
黑板模式
使用共享的"黑板"进行异步协作:
from typing import TypedDict, Any, List
from datetime import datetime
import json
class Blackboard:
"""黑板系统"""
def __init__(self):
self.data: dict[str, Any] = {}
self.history: List[dict] = []
def write(self, key: str, value: Any, agent: str):
"""写入数据"""
self.data[key] = {
"value": value,
"author": agent,
"timestamp": datetime.now().isoformat()
}
self.history.append({
"action": "write",
"key": key,
"agent": agent,
"timestamp": datetime.now().isoformat()
})
def read(self, key: str) -> Any:
"""读取数据"""
entry = self.data.get(key)
return entry["value"] if entry else None
def get_updates(self, since: str = None) -> List[dict]:
"""获取更新"""
if since is None:
return self.history
return [h for h in self.history if h["timestamp"] > since]
blackboard = Blackboard()
blackboard.write("task", "分析市场数据", "supervisor")
blackboard.write("data", {"sales": 1000, "costs": 500}, "data_agent")
print(blackboard.read("task"))
print(blackboard.read("data"))
冲突解决
当多个 Agent 产生冲突结果时,需要解决机制:
投票机制
from typing import List
from collections import Counter
def resolve_by_voting(results: List[str]) -> str:
"""投票解决冲突"""
vote_counts = Counter(results)
return vote_counts.most_common(1)[0][0]
agent_results = [
"答案是 A",
"答案是 A",
"答案是 B",
"答案是 A",
]
final_result = resolve_by_voting(agent_results)
print(f"投票结果:{final_result}")
仲裁机制
from langchain_openai import ChatOpenAI
def resolve_by_arbitrator(results: List[dict], task: str) -> str:
"""仲裁解决冲突"""
llm = ChatOpenAI(model="gpt-4", temperature=0)
results_text = "\n".join([
f"- {r['agent']}: {r['result']}"
for r in results
])
prompt = f"""多个专家对同一任务给出了不同的结果,请作为仲裁者选择最佳结果或综合给出最终答案。
任务:{task}
专家结果:
{results_text}
请给出最终答案并说明理由。"""
return llm.invoke(prompt).content
conflict_results = [
{"agent": "专家A", "result": "方案一:快速实现,成本低"},
{"agent": "专家B", "result": "方案二:质量高,但周期长"},
]
final_decision = resolve_by_arbitrator(conflict_results, "选择最佳实现方案")
print(final_decision)
完整示例:软件开发团队
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator
class DevTeamState(TypedDict):
messages: Annotated[list, operator.add]
requirement: str
design_doc: str
code: str
test_cases: str
review_comments: str
final_output: str
llm = ChatOpenAI(model="gpt-4", temperature=0)
def product_manager(state: DevTeamState) -> dict:
"""产品经理:分析需求"""
requirement = state["requirement"]
prompt = f"""你是一个产品经理。请分析以下需求并输出需求文档。
需求:{requirement}
请输出:
1. 功能列表
2. 用户故事
3. 验收标准"""
response = llm.invoke(prompt)
return {
"messages": [AIMessage(content=f"[需求文档]\n{response.content}")]
}
def architect(state: DevTeamState) -> dict:
"""架构师:设计系统"""
pm_doc = state["messages"][-1].content
prompt = f"""你是一个系统架构师。请根据需求文档设计系统架构。
{pm_doc}
请输出:
1. 系统架构图描述
2. 模块划分
3. 技术选型
4. 接口设计"""
response = llm.invoke(prompt)
return {
"design_doc": response.content,
"messages": [AIMessage(content=f"[设计文档]\n{response.content[:300]}...")]
}
def developer(state: DevTeamState) -> dict:
"""开发工程师:编写代码"""
design_doc = state["design_doc"]
prompt = f"""你是一个开发工程师。请根据设计文档编写代码。
设计文档:
{design_doc}
请输出核心代码实现(伪代码或示例代码)。"""
response = llm.invoke(prompt)
return {
"code": response.content,
"messages": [AIMessage(content=f"[代码]\n{response.content[:300]}...")]
}
def tester(state: DevTeamState) -> dict:
"""测试工程师:编写测试用例"""
code = state["code"]
prompt = f"""你是一个测试工程师。请根据代码编写测试用例。
代码:
{code}
请输出:
1. 单元测试用例
2. 集成测试用例
3. 边界条件测试"""
response = llm.invoke(prompt)
return {
"test_cases": response.content,
"messages": [AIMessage(content=f"[测试用例]\n{response.content[:200]}...")]
}
def code_reviewer(state: DevTeamState) -> dict:
"""代码审查员:审查代码"""
code = state["code"]
test_cases = state["test_cases"]
prompt = f"""你是一个代码审查员。请审查以下代码和测试用例。
代码:
{code}
测试用例:
{test_cases}
请输出:
1. 代码质量评估
2. 潜在问题
3. 改进建议"""
response = llm.invoke(prompt)
return {
"review_comments": response.content,
"messages": [AIMessage(content=f"[审查意见]\n{response.content}")]
}
workflow = StateGraph(DevTeamState)
workflow.add_node("pm", product_manager)
workflow.add_node("architect", architect)
workflow.add_node("developer", developer)
workflow.add_node("tester", tester)
workflow.add_node("reviewer", code_reviewer)
workflow.set_entry_point("pm")
workflow.add_edge("pm", "architect")
workflow.add_edge("architect", "developer")
workflow.add_edge("developer", "tester")
workflow.add_edge("tester", "reviewer")
workflow.add_edge("reviewer", END)
app = workflow.compile()
inputs = {
"messages": [HumanMessage(content="开发一个用户登录功能")],
"requirement": "开发一个用户登录功能,支持用户名密码登录和手机验证码登录",
"design_doc": "",
"code": "",
"test_cases": "",
"review_comments": "",
"final_output": ""
}
result = app.invoke(inputs)
print("=== 开发流程完成 ===")
for msg in result["messages"]:
if isinstance(msg, AIMessage):
print(f"\n{msg.content[:200]}...")
小结
多 Agent 协作是处理复杂任务的有效方式:
- 根据任务特点选择合适的协作模式(顺序、层级、对等)
- 使用 LangGraph 可以方便地实现多 Agent 系统
- 设计合理的通信机制(共享状态、消息传递、黑板)
- 建立冲突解决机制(投票、仲裁)
- 每个 Agent 应该有明确的职责和专业领域
下一章我们将学习 AI Agent 开发的最佳实践。