LangGraph Agent 实战
LangGraph 是 LangChain 生态中用于构建有状态 Agent 的核心框架。LangChain 提供了两种创建 Agent 的方式:langchain.agents.create_agent 和 langgraph.prebuilt.create_react_agent。本章将通过实际案例,带你掌握 LangGraph Agent 的开发技巧。
两种 API 对比
create_agent(langchain.agents):LangChain 1.0 新 API,支持 middleware,参数名是system_promptcreate_react_agent(langgraph.prebuilt):LangGraph 经典 API,简洁易用,参数名是prompt
两种 API 都可以正常使用,根据需求选择:
- 需要 middleware 扩展时选择
create_agent - 简单场景选择
create_react_agent
环境准备
首先安装必要的依赖:
pip install langchain langchain-openai langgraph langsmith
配置 API Key:
import os
os.environ["OPENAI_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_API_KEY"] = "your-langsmith-key" # 可选,用于追踪
os.environ["LANGSMITH_TRACING"] = "true" # 启用追踪
os.environ["LANGSMITH_PROJECT"] = "my-agent-project"
第一个 Agent
让我们使用 LangChain 1.0 的 create_agent 创建一个简单的数学计算助手:
from langchain.agents import create_agent
from langchain.tools import tool
# 定义工具
@tool
def add(a: int, b: int) -> int:
"""将两个数字相加"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""将两个数字相减"""
return a - b
@tool
def multiply(a: int, b: int) -> int:
"""将两个数字相乘"""
return a * b
@tool
def divide(a: float, b: float) -> float:
"""将两个数字相除"""
if b == 0:
return "错误:除数不能为零"
return a / b
# 创建 Agent(使用模型字符串,最简单的方式)
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[add, subtract, multiply, divide],
system_prompt="你是一个数学助手,帮助用户进行数学计算。"
)
# 调用 Agent
result = agent.invoke({
"messages": [{"role": "user", "content": "计算 (23 + 17) * 5 - 10 的结果"}]
})
print(f"最终答案:{result['messages'][-1].content}")
代码解释:
@tool装饰器将普通函数转换为 LangChain 工具,自动从函数签名和文档字符串提取参数信息model="openai:gpt-4o-mini"使用简洁的字符串格式指定模型system_prompt定义了 Agent 的角色和行为方式
查看执行过程
使用流式输出可以看到 Agent 的完整执行过程:
# 流式输出(可以看到 Agent 的思考过程)
for chunk in agent.stream({
"messages": [{"role": "user", "content": "计算 100 / 4 + 20"}]
}):
# 注意:在 create_agent 中,模型节点的名称取决于具体实现
if "agent" in chunk:
for msg in chunk["agent"]["messages"]:
if hasattr(msg, "tool_calls") and msg.tool_calls:
print(f"[调用工具] {msg.tool_calls[0]['name']}")
print(f"[参数] {msg.tool_calls[0]['args']}")
elif msg.content:
print(f"[回答] {msg.content}")
elif "tools" in chunk:
for msg in chunk["tools"]["messages"]:
print(f"[工具结果] {msg.content}")
输出示例:
[调用工具] divide
[参数] {'a': 100, 'b': 4}
[工具结果] 25.0
[调用工具] add
[参数] {'a': 25, 'b': 20}
[工具结果] 45
[回答] 计算结果是 45。
构建多功能助手
让我们构建一个更实用的 Agent,集成多种工具:
from langchain.agents import create_agent
from langchain.tools import tool
from datetime import datetime
@tool
def web_search(query: str) -> str:
"""
在互联网上搜索信息
参数:
query: 搜索关键词
"""
# 实际项目中替换为真实的搜索 API
return f"搜索 '{query}' 的结果:找到相关信息..."
@tool
def get_weather(city: str) -> str:
"""
获取城市天气信息
参数:
city: 城市名称
"""
weather_data = {
"北京": {"temp": 22, "condition": "晴"},
"上海": {"temp": 26, "condition": "多云"},
"广州": {"temp": 30, "condition": "雨"},
}
data = weather_data.get(city, {"temp": 20, "condition": "未知"})
return f"{city}:{data['condition']},温度 {data['temp']}°C"
@tool
def calculate(expression: str) -> str:
"""
计算数学表达式
参数:
expression: 数学表达式,如 "2 + 3 * 4"
"""
try:
# 安全检查
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
return "错误:表达式包含非法字符"
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{str(e)}"
@tool
def get_current_time() -> str:
"""获取当前时间"""
now = datetime.now()
return f"当前时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
@tool
def translate_text(text: str, target_lang: str = "英语") -> str:
"""
翻译文本
参数:
text: 要翻译的文本
target_lang: 目标语言
"""
# 实际项目中替换为真实的翻译 API
return f"[翻译结果-{target_lang}]:{text}"
# 创建 Agent
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[web_search, get_weather, calculate, get_current_time, translate_text],
system_prompt="""你是一个智能助手,可以帮助用户完成多种任务:
1. 搜索互联网信息
2. 查询天气
3. 数学计算
4. 查看时间
5. 翻译文本
请根据用户的需求选择合适的工具完成任务。
如果用户的问题不需要使用工具,直接回答即可。"""
)
# 测试各种功能
def chat(user_input: str) -> str:
result = agent.invoke({
"messages": [{"role": "user", "content": user_input}]
})
return result["messages"][-1].content
print(chat("北京今天天气怎么样?"))
print(chat("帮我计算 123 * 456"))
print(chat("现在几点了?"))
print(chat("把'你好,世界'翻译成英语"))
添加记忆功能
LangGraph 的 Agent 通过 checkpointer 自动维护对话历史,可以通过线程 ID 区分不同会话:
短期记忆(对话历史)
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def save_note(content: str) -> str:
"""保存笔记"""
return f"已保存笔记:{content[:50]}..."
@tool
def search_notes(query: str) -> str:
"""搜索笔记"""
return f"找到与 '{query}' 相关的笔记..."
# 创建带持久化的 Agent
# 使用内存存储(生产环境可替换为数据库)
checkpointer = MemorySaver()
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[save_note, search_notes],
system_prompt="你是一个笔记助手,可以帮助用户管理笔记。",
checkpointer=checkpointer
)
# 配置线程(会话)
config = {"configurable": {"thread_id": "user-session-001"}}
# 第一次对话
result1 = agent.invoke(
{"messages": [{"role": "user", "content": "帮我保存一条笔记:明天下午3点开会"}]},
config=config
)
print(result1["messages"][-1].content)
# 第二次对话(Agent 记得之前的内容)
result2 = agent.invoke(
{"messages": [{"role": "user", "content": "我刚才保存了什么笔记?"}]},
config=config
)
print(result2["messages"][-1].content)
# 新的会话(不记得之前的内容)
new_config = {"configurable": {"thread_id": "user-session-002"}}
result3 = agent.invoke(
{"messages": [{"role": "user", "content": "我刚才保存了什么笔记?"}]},
config=new_config
)
print(result3["messages"][-1].content)
长期记忆(跨会话)
from langchain.agents import create_agent
from langgraph.store.memory import InMemoryStore
from langchain.tools import tool
from langchain_core.runnables import RunnableConfig
import uuid
# 创建存储
store = InMemoryStore()
@tool
def save_memory(content: str, config: RunnableConfig) -> str:
"""保存重要信息到长期记忆"""
from langgraph.config import get_store
memory_store = get_store()
user_id = config.get("configurable", {}).get("user_id", "default")
memory_store.put(
("memories", user_id),
str(uuid.uuid4()),
{"content": content}
)
return f"已保存:{content}"
@tool
def recall_memories(config: RunnableConfig) -> str:
"""检索长期记忆"""
from langgraph.config import get_store
memory_store = get_store()
user_id = config.get("configurable", {}).get("user_id", "default")
memories = memory_store.search(("memories", user_id))
if not memories:
return "暂无记忆"
return "\n".join([f"- {m.value['content']}" for m in memories])
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[save_memory, recall_memories],
store=store,
system_prompt="你是一个智能助手,可以记住用户的信息。"
)
# 用户 A 的会话
config_a = {"configurable": {"user_id": "user-a", "thread_id": "session-1"}}
agent.invoke(
{"messages": [{"role": "user", "content": "请记住:我喜欢喝咖啡"}]},
config=config_a
)
# 用户 A 的新会话(仍能回忆起记忆)
config_a2 = {"configurable": {"user_id": "user-a", "thread_id": "session-2"}}
result = agent.invoke(
{"messages": [{"role": "user", "content": "我记得什么?"}]},
config=config_a2
)
print(result["messages"][-1].content) # 输出包含"喜欢喝咖啡"
使用 LangGraph 构建自定义工作流
对于复杂场景,可以直接使用 LangGraph 的 StateGraph 构建自定义工作流:
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph.message import add_messages
# 定义状态
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
# 定义工具
@tool
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果:{query}"
@tool
def calculator(expression: str) -> str:
"""计算表达式"""
try:
return str(eval(expression))
except Exception:
return "计算错误"
tools = [search, calculator]
# 创建模型
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# 定义节点函数
def agent_node(state: AgentState) -> dict:
"""Agent 推理节点"""
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
# 构建工作流
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
# 设置入口
workflow.set_entry_point("agent")
# 添加边
workflow.add_conditional_edges(
"agent",
tools_condition, # 自动判断是否需要调用工具
{"tools": "tools", "__end__": END}
)
workflow.add_edge("tools", "agent")
# 编译
app = workflow.compile()
# 执行
result = app.invoke({
"messages": [{"role": "user", "content": "搜索 Python 教程,然后计算 2 的 10 次方"}]
})
print(result["messages"][-1].content)
结构化输出
让 Agent 返回结构化的结果,适用于需要明确输出格式的场景:
from langchain.agents import create_agent
from pydantic import BaseModel, Field
from typing import List
class SearchResult(BaseModel):
"""搜索结果结构"""
query: str = Field(description="搜索查询")
results: List[str] = Field(description="搜索结果列表")
total: int = Field(description="结果总数")
class AnalysisResult(BaseModel):
"""分析结果结构"""
summary: str = Field(description="摘要")
key_points: List[str] = Field(description="关键点")
sentiment: str = Field(description="情感倾向")
@tool
def analyze_text(text: str) -> str:
"""分析文本内容"""
return f"分析结果:{text[:100]}..."
# 创建带结构化输出的 Agent
agent = create_agent(
model="openai:gpt-4o-mini",
tools=[analyze_text],
system_prompt="你是一个文本分析助手。",
response_format=AnalysisResult
)
result = agent.invoke({
"messages": [{"role": "user", "content": "分析这段文本:今天天气真好,阳光明媚,心情很愉快。"}]
})
# 访问结构化输出
print(result["structured_response"])
错误处理
LangGraph 的 ToolNode 提供了内置的错误处理机制:
from langgraph.prebuilt import create_react_agent, ToolNode
from langchain_core.tools import tool
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@tool
def unreliable_api(query: str) -> str:
"""可能失败的 API 调用"""
import random
if random.random() < 0.3:
raise Exception("API 调用失败")
return f"API 结果:{query}"
# 方式一:在工具内部处理错误
@tool
def safe_api_call(query: str) -> str:
"""安全的 API 调用,内置错误处理"""
try:
import random
if random.random() < 0.3:
raise Exception("API 连接超时")
return f"API 结果:{query}"
except Exception as e:
logger.error(f"API 调用失败: {e}")
return f"API 暂时不可用,请稍后重试或尝试其他方法。"
# 方式二:使用 ToolNode 的错误处理
tools = [unreliable_api]
tool_node = ToolNode(tools, handle_tool_errors=True)
agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=tools,
prompt="你是一个助手。如果工具调用失败,请尝试其他方法或告知用户。"
)
result = agent.invoke({
"messages": [{"role": "user", "content": "调用 API 查询信息"}]
})
print(result["messages"][-1].content)
自定义错误消息
# 自定义错误消息
tool_node = ToolNode(
tools,
handle_tool_errors="工具执行出错,请尝试其他方法完成任务。"
)
# 或使用函数生成动态错误消息
def custom_error_handler(error, tool_call):
return f"工具 {tool_call['name']} 执行失败:{str(error)}。请尝试其他方法。"
tool_node = ToolNode(tools, handle_tool_errors=custom_error_handler)
人工确认(中断)
在某些场景下,我们需要在执行关键操作前让用户确认:
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.tools import tool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送电子邮件"""
return f"邮件已发送至 {to}"
@tool
def delete_file(file_path: str) -> str:
"""删除文件"""
return f"已删除文件:{file_path}"
checkpointer = MemorySaver()
# 在执行工具前中断,等待用户确认
agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[send_email, delete_file],
checkpointer=checkpointer,
interrupt_before=["tools"] # 在工具执行前中断
)
config = {"configurable": {"thread_id": "session-001"}}
# 第一次调用,Agent 会规划要执行的工具,但不会真正执行
result = agent.invoke(
{"messages": [{"role": "user", "content": "发送邮件给 [email protected],主题是测试"}]},
config=config
)
# 检查下一步要执行的操作
state = agent.get_state(config)
print(f"待执行的操作: {state.next}")
# 如果用户确认,继续执行
user_input = input("确认发送邮件?(y/n): ")
if user_input.lower() == 'y':
result = agent.invoke(None, config=config) # 传入 None 继续执行
print(result["messages"][-1].content)
else:
# 用户取消,可以更新状态或执行其他操作
print("操作已取消")
完整示例:智能研究助手
让我们构建一个完整的研究助手 Agent,综合运用前面学到的知识:
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig
from typing import Dict
import json
import uuid
# 模拟数据库
notes_db: Dict[str, str] = {}
# 定义工具
@tool
def search_web(query: str) -> str:
"""
在互联网上搜索信息
参数:
query: 搜索关键词
"""
mock_results = [
{"title": f"{query} - 概述", "snippet": f"关于 {query} 的基本概念和定义..."},
{"title": f"{query} - 深入研究", "snippet": f"{query} 的最新研究进展..."},
{"title": f"{query} - 应用案例", "snippet": f"{query} 在实际中的应用..."},
]
return json.dumps(mock_results, ensure_ascii=False)
@tool
def save_note(title: str, content: str) -> str:
"""
保存研究笔记
参数:
title: 笔记标题
content: 笔记内容
"""
notes_db[title] = content
return f"已保存笔记《{title}》"
@tool
def list_notes() -> str:
"""列出所有保存的笔记"""
if not notes_db:
return "暂无保存的笔记"
return json.dumps(
[{"title": title, "preview": content[:50] + "..."} for title, content in notes_db.items()],
ensure_ascii=False
)
@tool
def read_note(title: str) -> str:
"""
读取指定笔记
参数:
title: 笔记标题
"""
if title in notes_db:
return notes_db[title]
return f"未找到笔记《{title}》"
@tool
def summarize(text: str) -> str:
"""
总结文本内容
参数:
text: 要总结的文本
"""
return f"【摘要】{text[:100]}..."
@tool
def save_memory(content: str, config: RunnableConfig) -> str:
"""保存重要信息到长期记忆"""
from langgraph.config import get_store
store = get_store()
user_id = config.get("configurable", {}).get("user_id", "default")
store.put(
("memories", user_id),
str(uuid.uuid4()),
{"content": content}
)
return f"已保存到记忆:{content}"
# 创建 Agent
checkpointer = MemorySaver()
store = InMemoryStore()
agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[search_web, save_note, list_notes, read_note, summarize, save_memory],
prompt="""你是一个智能研究助手,可以帮助用户:
1. 搜索和收集信息
2. 整理和保存研究笔记
3. 总结和分析内容
请根据用户需求灵活使用工具,提供专业的研究支持。""",
checkpointer=checkpointer,
store=store
)
# 使用助手
def research_assistant(user_input: str, user_id: str = "default", session_id: str = "default") -> str:
config = {
"configurable": {
"user_id": user_id,
"thread_id": session_id
}
}
result = agent.invoke(
{"messages": [{"role": "user", "content": user_input}]},
config=config
)
return result["messages"][-1].content
# 测试
print("=== 搜索信息 ===")
print(research_assistant("帮我搜索关于人工智能的最新进展"))
print("\n=== 保存笔记 ===")
print(research_assistant("把这些信息保存为笔记,标题是'AI进展'"))
print("\n=== 查看笔记 ===")
print(research_assistant("我有哪些笔记?"))
print("\n=== 总结笔记 ===")
print(research_assistant("读取'AI进展'这篇笔记并总结"))
使用 LangSmith 追踪
LangSmith 是 LangChain 官方的可观测性平台,可以帮助追踪和调试 Agent:
import os
# 启用 LangSmith 追踪
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "your-api-key"
os.environ["LANGSMITH_PROJECT"] = "my-agent-project"
# Agent 的所有执行都会自动记录到 LangSmith
# 可以在 https://smith.langchain.com 查看详细的执行追踪
小结
本章介绍了使用 LangGraph 构建 AI Agent 的核心技能:
create_react_agent是推荐的 Agent 创建方式,简洁易用- 工具定义使用
@tool装饰器,LangChain 自动提取描述和参数 - 短期记忆通过 checkpointer 和 thread_id 管理
- 长期记忆通过 store 实现,支持跨会话数据存储
- ToolNode 提供了灵活的错误处理机制
- 结构化输出让结果更易于程序处理
interrupt_before可以实现人工确认- 对于复杂场景,可以直接使用 StateGraph 构建自定义工作流
- LangSmith 提供强大的追踪和调试能力
下一章我们将学习多 Agent 协作的高级主题。