跳到主要内容

多 Agent 协作

当任务变得复杂时,单个 Agent 可能难以有效处理。多 Agent 协作系统通过让多个专业化的 Agent 分工合作,能够更好地完成复杂任务。本章将介绍多 Agent 系统的设计模式和实现方法。

为什么需要多 Agent 协作

单个 Agent 面临的挑战:

  • 能力边界:一个 Agent 难以精通所有领域
  • 上下文限制:复杂任务的上下文可能超出 LLM 的处理能力
  • 并行处理:单 Agent 只能串行执行任务
  • 专业化需求:不同子任务需要不同的专业知识

多 Agent 协作的优势:

  • 分工明确:每个 Agent 专注于自己擅长的领域
  • 并行执行:多个 Agent 可以同时工作
  • 知识整合:综合多个专业领域的知识
  • 容错能力:一个 Agent 失败不影响整体系统

协作模式

顺序协作

Agent 按顺序执行,前一个 Agent 的输出作为后一个 Agent 的输入:

适用场景:流水线式任务,如内容创作、数据分析报告。

层级协作

一个主控 Agent 负责任务分配和结果整合:

适用场景:需要协调多个专业领域的复杂任务。

对等协作

多个 Agent 平等协作,通过消息传递交流:

适用场景:需要频繁交互和协商的任务。

使用 LangGraph 实现多 Agent 系统

顺序协作示例

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator

class ResearchState(TypedDict):
messages: Annotated[list, operator.add]
research_result: str
draft: str
final_output: str

llm = ChatOpenAI(model="gpt-4", temperature=0)

def research_agent(state: ResearchState) -> dict:
"""研究 Agent:收集信息"""
messages = state["messages"]

prompt = f"""你是一个研究助手。请根据用户请求收集和整理相关信息。

用户请求:{messages[0].content}

请提供详细的研究结果。"""

response = llm.invoke(prompt)

return {
"research_result": response.content,
"messages": [AIMessage(content=f"[研究结果]\n{response.content}")]
}

def writing_agent(state: ResearchState) -> dict:
"""写作 Agent:撰写初稿"""
research_result = state["research_result"]

prompt = f"""你是一个写作助手。请根据研究结果撰写文章初稿。

研究结果:
{research_result}

请撰写一篇结构清晰、内容丰富的文章。"""

response = llm.invoke(prompt)

return {
"draft": response.content,
"messages": [AIMessage(content=f"[文章初稿]\n{response.content[:200]}...")]
}

def review_agent(state: ResearchState) -> dict:
"""审核 Agent:审核并优化"""
draft = state["draft"]

prompt = f"""你是一个审核助手。请审核文章并提出改进建议,然后输出最终版本。

文章初稿:
{draft}

请审核文章的:
1. 内容准确性
2. 结构合理性
3. 语言流畅性

然后输出优化后的最终版本。"""

response = llm.invoke(prompt)

return {
"final_output": response.content,
"messages": [AIMessage(content=f"[最终版本]\n{response.content}")]
}

workflow = StateGraph(ResearchState)

workflow.add_node("research", research_agent)
workflow.add_node("writing", writing_agent)
workflow.add_node("review", review_agent)

workflow.set_entry_point("research")
workflow.add_edge("research", "writing")
workflow.add_edge("writing", "review")
workflow.add_edge("review", END)

app = workflow.compile()

inputs = {
"messages": [HumanMessage(content="写一篇关于人工智能发展历史的文章")],
"research_result": "",
"draft": "",
"final_output": ""
}

result = app.invoke(inputs)

print("=== 最终输出 ===")
print(result["final_output"])

层级协作示例

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator
import json

class HierarchicalState(TypedDict):
messages: Annotated[list, operator.add]
task: str
subtasks: List[dict]
results: List[dict]
final_output: str

llm = ChatOpenAI(model="gpt-4", temperature=0)

def supervisor_agent(state: HierarchicalState) -> dict:
"""主控 Agent:任务分解和结果整合"""
task = state["task"]

if not state.get("subtasks"):
prompt = f"""你是一个项目主管。请将以下任务分解为子任务,并分配给合适的专家。

任务:{task}

可用的专家:
- 研究专家:负责信息收集和分析
- 写作专家:负责内容创作
- 数据专家:负责数据处理和计算

请以 JSON 格式返回子任务列表,格式如下:
{{"subtasks": [{{"expert": "专家类型", "task": "子任务描述"}}]}}"""

response = llm.invoke(prompt)

try:
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
subtasks = json.loads(content.strip())["subtasks"]
except:
subtasks = [{"expert": "研究专家", "task": task}]

return {"subtasks": subtasks}

if state.get("results") and len(state["results"]) == len(state["subtasks"]):
results_text = "\n".join([
f"- {r['expert']}: {r['result'][:200]}..."
for r in state["results"]
])

prompt = f"""你是一个项目主管。请整合以下专家的工作成果,生成最终输出。

原始任务:{task}

专家成果:
{results_text}

请生成完整、连贯的最终输出。"""

response = llm.invoke(prompt)

return {
"final_output": response.content,
"messages": [AIMessage(content=response.content)]
}

return {}

def research_expert(state: HierarchicalState) -> dict:
"""研究专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}

subtask = state["subtasks"][current_idx]
if subtask["expert"] != "研究专家":
return {}

prompt = f"""你是一个研究专家。请完成以下任务:

任务:{subtask['task']}

请提供详细的研究结果。"""

response = llm.invoke(prompt)

return {
"results": [{"expert": "研究专家", "result": response.content}]
}

def writing_expert(state: HierarchicalState) -> dict:
"""写作专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}

subtask = state["subtasks"][current_idx]
if subtask["expert"] != "写作专家":
return {}

prompt = f"""你是一个写作专家。请完成以下任务:

任务:{subtask['task']}

请提供高质量的内容。"""

response = llm.invoke(prompt)

return {
"results": [{"expert": "写作专家", "result": response.content}]
}

def data_expert(state: HierarchicalState) -> dict:
"""数据专家"""
current_idx = len(state.get("results", []))
if current_idx >= len(state["subtasks"]):
return {}

subtask = state["subtasks"][current_idx]
if subtask["expert"] != "数据专家":
return {}

prompt = f"""你是一个数据专家。请完成以下任务:

任务:{subtask['task']}

请提供数据分析和处理结果。"""

response = llm.invoke(prompt)

return {
"results": [{"expert": "数据专家", "result": response.content}]
}

def route_to_expert(state: HierarchicalState) -> str:
"""路由到下一个专家"""
if not state.get("subtasks"):
return "supervisor"

results_count = len(state.get("results", []))

if results_count >= len(state["subtasks"]):
return "supervisor"

next_subtask = state["subtasks"][results_count]
expert_type = next_subtask["expert"]

if expert_type == "研究专家":
return "research"
elif expert_type == "写作专家":
return "writing"
elif expert_type == "数据专家":
return "data"

return "supervisor"

workflow = StateGraph(HierarchicalState)

workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_expert)
workflow.add_node("writing", writing_expert)
workflow.add_node("data", data_expert)

workflow.set_entry_point("supervisor")

workflow.add_conditional_edges("supervisor", route_to_expert, {
"research": "research",
"writing": "writing",
"data": "data",
"supervisor": END
})

workflow.add_edge("research", "supervisor")
workflow.add_edge("writing", "supervisor")
workflow.add_edge("data", "supervisor")

app = workflow.compile()

inputs = {
"messages": [HumanMessage(content="帮我完成一个市场分析报告")],
"task": "完成一个市场分析报告",
"subtasks": [],
"results": [],
"final_output": ""
}

result = app.invoke(inputs)

print("=== 最终输出 ===")
print(result["final_output"])

Agent 通信机制

共享状态

最简单的通信方式是通过共享状态:

from typing import TypedDict, List

class SharedState(TypedDict):
"""共享状态"""
task: str
agent_outputs: dict
shared_knowledge: List[str]

def agent_a(state: SharedState) -> dict:
"""Agent A 写入共享状态"""
result = f"Agent A 的处理结果"
return {
"agent_outputs": {**state.get("agent_outputs", {}), "A": result},
"shared_knowledge": state.get("shared_knowledge", []) + ["A 完成了任务"]
}

def agent_b(state: SharedState) -> dict:
"""Agent B 读取共享状态"""
a_result = state["agent_outputs"].get("A", "")
result = f"Agent B 基于 A 的结果:{a_result}"
return {
"agent_outputs": {**state["agent_outputs"], "B": result}
}

消息传递

使用消息队列进行 Agent 间通信:

from typing import TypedDict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class AgentMessage:
"""Agent 消息"""
sender: str
receiver: str
content: str
timestamp: datetime = None

def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()

class MessageBus:
"""消息总线"""

def __init__(self):
self.queues: dict[str, List[AgentMessage]] = {}

def register(self, agent_name: str):
"""注册 Agent"""
self.queues[agent_name] = []

def send(self, message: AgentMessage):
"""发送消息"""
if message.receiver in self.queues:
self.queues[message.receiver].append(message)

def receive(self, agent_name: str) -> List[AgentMessage]:
"""接收消息"""
messages = self.queues.get(agent_name, [])
self.queues[agent_name] = []
return messages

message_bus = MessageBus()
message_bus.register("agent_a")
message_bus.register("agent_b")

message_bus.send(AgentMessage(
sender="agent_a",
receiver="agent_b",
content="请处理这个任务"
))

messages = message_bus.receive("agent_b")
for msg in messages:
print(f"收到来自 {msg.sender} 的消息:{msg.content}")

黑板模式

使用共享的"黑板"进行异步协作:

from typing import TypedDict, Any, List
from datetime import datetime
import json

class Blackboard:
"""黑板系统"""

def __init__(self):
self.data: dict[str, Any] = {}
self.history: List[dict] = []

def write(self, key: str, value: Any, agent: str):
"""写入数据"""
self.data[key] = {
"value": value,
"author": agent,
"timestamp": datetime.now().isoformat()
}
self.history.append({
"action": "write",
"key": key,
"agent": agent,
"timestamp": datetime.now().isoformat()
})

def read(self, key: str) -> Any:
"""读取数据"""
entry = self.data.get(key)
return entry["value"] if entry else None

def get_updates(self, since: str = None) -> List[dict]:
"""获取更新"""
if since is None:
return self.history
return [h for h in self.history if h["timestamp"] > since]

blackboard = Blackboard()

blackboard.write("task", "分析市场数据", "supervisor")
blackboard.write("data", {"sales": 1000, "costs": 500}, "data_agent")

print(blackboard.read("task"))
print(blackboard.read("data"))

冲突解决

当多个 Agent 产生冲突结果时,需要解决机制:

投票机制

from typing import List
from collections import Counter

def resolve_by_voting(results: List[str]) -> str:
"""投票解决冲突"""
vote_counts = Counter(results)
return vote_counts.most_common(1)[0][0]

agent_results = [
"答案是 A",
"答案是 A",
"答案是 B",
"答案是 A",
]

final_result = resolve_by_voting(agent_results)
print(f"投票结果:{final_result}")

仲裁机制

from langchain_openai import ChatOpenAI

def resolve_by_arbitrator(results: List[dict], task: str) -> str:
"""仲裁解决冲突"""
llm = ChatOpenAI(model="gpt-4", temperature=0)

results_text = "\n".join([
f"- {r['agent']}: {r['result']}"
for r in results
])

prompt = f"""多个专家对同一任务给出了不同的结果,请作为仲裁者选择最佳结果或综合给出最终答案。

任务:{task}

专家结果:
{results_text}

请给出最终答案并说明理由。"""

return llm.invoke(prompt).content

conflict_results = [
{"agent": "专家A", "result": "方案一:快速实现,成本低"},
{"agent": "专家B", "result": "方案二:质量高,但周期长"},
]

final_decision = resolve_by_arbitrator(conflict_results, "选择最佳实现方案")
print(final_decision)

完整示例:软件开发团队

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import operator

class DevTeamState(TypedDict):
messages: Annotated[list, operator.add]
requirement: str
design_doc: str
code: str
test_cases: str
review_comments: str
final_output: str

llm = ChatOpenAI(model="gpt-4", temperature=0)

def product_manager(state: DevTeamState) -> dict:
"""产品经理:分析需求"""
requirement = state["requirement"]

prompt = f"""你是一个产品经理。请分析以下需求并输出需求文档。

需求:{requirement}

请输出:
1. 功能列表
2. 用户故事
3. 验收标准"""

response = llm.invoke(prompt)

return {
"messages": [AIMessage(content=f"[需求文档]\n{response.content}")]
}

def architect(state: DevTeamState) -> dict:
"""架构师:设计系统"""
pm_doc = state["messages"][-1].content

prompt = f"""你是一个系统架构师。请根据需求文档设计系统架构。

{pm_doc}

请输出:
1. 系统架构图描述
2. 模块划分
3. 技术选型
4. 接口设计"""

response = llm.invoke(prompt)

return {
"design_doc": response.content,
"messages": [AIMessage(content=f"[设计文档]\n{response.content[:300]}...")]
}

def developer(state: DevTeamState) -> dict:
"""开发工程师:编写代码"""
design_doc = state["design_doc"]

prompt = f"""你是一个开发工程师。请根据设计文档编写代码。

设计文档:
{design_doc}

请输出核心代码实现(伪代码或示例代码)。"""

response = llm.invoke(prompt)

return {
"code": response.content,
"messages": [AIMessage(content=f"[代码]\n{response.content[:300]}...")]
}

def tester(state: DevTeamState) -> dict:
"""测试工程师:编写测试用例"""
code = state["code"]

prompt = f"""你是一个测试工程师。请根据代码编写测试用例。

代码:
{code}

请输出:
1. 单元测试用例
2. 集成测试用例
3. 边界条件测试"""

response = llm.invoke(prompt)

return {
"test_cases": response.content,
"messages": [AIMessage(content=f"[测试用例]\n{response.content[:200]}...")]
}

def code_reviewer(state: DevTeamState) -> dict:
"""代码审查员:审查代码"""
code = state["code"]
test_cases = state["test_cases"]

prompt = f"""你是一个代码审查员。请审查以下代码和测试用例。

代码:
{code}

测试用例:
{test_cases}

请输出:
1. 代码质量评估
2. 潜在问题
3. 改进建议"""

response = llm.invoke(prompt)

return {
"review_comments": response.content,
"messages": [AIMessage(content=f"[审查意见]\n{response.content}")]
}

workflow = StateGraph(DevTeamState)

workflow.add_node("pm", product_manager)
workflow.add_node("architect", architect)
workflow.add_node("developer", developer)
workflow.add_node("tester", tester)
workflow.add_node("reviewer", code_reviewer)

workflow.set_entry_point("pm")
workflow.add_edge("pm", "architect")
workflow.add_edge("architect", "developer")
workflow.add_edge("developer", "tester")
workflow.add_edge("tester", "reviewer")
workflow.add_edge("reviewer", END)

app = workflow.compile()

inputs = {
"messages": [HumanMessage(content="开发一个用户登录功能")],
"requirement": "开发一个用户登录功能,支持用户名密码登录和手机验证码登录",
"design_doc": "",
"code": "",
"test_cases": "",
"review_comments": "",
"final_output": ""
}

result = app.invoke(inputs)

print("=== 开发流程完成 ===")
for msg in result["messages"]:
if isinstance(msg, AIMessage):
print(f"\n{msg.content[:200]}...")

小结

多 Agent 协作是处理复杂任务的有效方式:

  • 根据任务特点选择合适的协作模式(顺序、层级、对等)
  • 使用 LangGraph 可以方便地实现多 Agent 系统
  • 设计合理的通信机制(共享状态、消息传递、黑板)
  • 建立冲突解决机制(投票、仲裁)
  • 每个 Agent 应该有明确的职责和专业领域

下一章我们将学习 AI Agent 开发的最佳实践。

参考资料