跳到主要内容

最佳实践案例

本章汇总了 AI Agent 开发中的最佳实践和实战经验,帮助你构建更可靠、更高效的 Agent 应用。

设计原则

1. 单一职责原则

每个 Agent 应该有明确的职责范围,不要试图让一个 Agent 做所有事情。

不好的设计

# 一个 Agent 做太多事情
agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[search, email, calendar, database, file_ops, ...],
prompt="你是一个全能助手..." # 职责不清晰
)

好的设计

# 专门的 Agent 处理专门的任务
research_agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[search, web_scrape],
prompt="你是一个研究助手,专门负责信息收集和分析。"
)

communication_agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[email, slack],
prompt="你是一个沟通助手,专门负责发送消息和通知。"
)

# 主 Agent 协调子 Agent
coordinator = create_coordinator(
agents={
"research": research_agent,
"communication": communication_agent,
}
)

2. 工具设计原则

工具描述要精确

工具的描述直接决定了 LLM 能否正确使用它。好的描述应该包含:

  • 工具的功能说明
  • 适用场景
  • 参数的详细要求
  • 使用示例
@tool
def query_database(sql: str) -> str:
"""执行 SQL 查询并返回结果。

功能:
- 执行只读的 SELECT 查询
- 支持多表 JOIN 和聚合函数
- 最多返回 1000 行结果

参数:
- sql: SQL 查询语句,必须是合法的 SELECT 语句

使用示例:
- 查询用户: "SELECT * FROM users WHERE active = true"
- 统计订单: "SELECT COUNT(*) FROM orders WHERE date > '2024-01-01'"

注意:
- 不支持 INSERT、UPDATE、DELETE 操作
- 复杂查询可能需要较长执行时间
"""
# 实现逻辑
pass

工具参数要可验证

使用 Pydantic 进行参数验证:

from pydantic import BaseModel, Field, field_validator

class SearchInput(BaseModel):
query: str = Field(
description="搜索关键词",
min_length=2,
max_length=100
)
limit: int = Field(
default=10,
description="返回结果数量",
ge=1,
le=50
)
language: str = Field(
default="zh",
description="语言代码,如 zh、en"
)

@field_validator('query')
@classmethod
def sanitize_query(cls, v):
# 移除潜在的恶意字符
return v.replace('<', '').replace('>', '')

@tool(args_schema=SearchInput)
def safe_search(query: str, limit: int = 10, language: str = "zh") -> str:
"""安全的网络搜索"""
# 实现逻辑
pass

工具要有合理的错误处理

from langgraph.prebuilt import ToolNode

@tool
def call_external_api(endpoint: str, params: dict) -> str:
"""调用外部 API"""
try:
response = requests.get(endpoint, params=params, timeout=10)
response.raise_for_status()
return response.text
except requests.Timeout:
return "API 请求超时,请稍后重试"
except requests.ConnectionError:
return "无法连接到 API 服务器"
except requests.HTTPError as e:
return f"API 返回错误: {e.response.status_code}"
except Exception as e:
return f"未知错误: {str(e)}"

# 使用 ToolNode 的错误处理
tool_node = ToolNode(
[call_external_api],
handle_tool_errors="工具执行失败,请尝试其他方法或稍后重试。"
)

3. 提示词工程

提供清晰的上下文

from datetime import datetime

# 动态生成包含上下文的提示词
def create_system_prompt():
today = datetime.now().strftime("%Y-%m-%d")
return f"""你是一个专业的客服助手。

当前日期:{today}

你的职责:
1. 回答用户关于产品的问题
2. 处理订单相关查询
3. 收集用户反馈

回答要求:
- 语气友好、专业
- 回答简洁,不超过 200 字
- 如果不确定,告知用户并提供替代方案

禁止行为:
- 不要承诺无法保证的事项
- 不要透露内部系统信息
- 不要处理退款(转接人工)
"""

agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[...],
prompt=create_system_prompt()
)

使用 Few-shot 示例

prompt = """你是一个数据分析师助手。请按照以下格式处理查询请求:

示例 1:
用户:查询上个月的销售额
思考:需要查询 orders 表,筛选上个月的数据,计算总和
行动:query_database("SELECT SUM(amount) FROM orders WHERE date >= DATE_SUB(CURDATE(), INTERVAL 1 MONTH)")
观察:查询结果为 1,234,567 元
回答:上个月的销售额为 1,234,567 元

示例 2:
用户:哪个产品卖得最好?
思考:需要统计各产品的销量并排序
行动:query_database("SELECT product_name, COUNT(*) as sales FROM orders GROUP BY product_name ORDER BY sales DESC LIMIT 1")
观察:查询结果为 "产品A: 1000件"
回答:产品A 卖得最好,共售出 1000 件

现在请处理用户的请求。
"""

记忆管理最佳实践

短期记忆策略

from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import trim_messages, SystemMessage

# 生产环境使用数据库持久化
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

class MemoryManager:
"""记忆管理器"""

def __init__(self, max_messages: int = 20, max_tokens: int = 4000):
self.max_messages = max_messages
self.max_tokens = max_tokens

def manage_messages(self, messages: list) -> list:
"""管理消息历史"""
# 如果消息数量超过限制,进行裁剪
if len(messages) > self.max_messages:
# 保留系统消息和最近的对话
system_messages = [m for m in messages if isinstance(m, SystemMessage)]
recent_messages = messages[-(self.max_messages - len(system_messages)):]

# 如果裁剪幅度大,生成总结
if len(messages) - len(recent_messages) > 10:
summary = self._summarize_old_messages(messages[:-len(recent_messages)])
return system_messages + [SystemMessage(content=f"[历史摘要] {summary}")] + recent_messages

return system_messages + recent_messages

return messages

def _summarize_old_messages(self, messages: list) -> str:
"""总结旧消息"""
# 使用 LLM 生成总结
return "之前的对话中讨论了..."

长期记忆策略

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings
import uuid
from datetime import datetime

class LongTermMemory:
"""长期记忆管理"""

def __init__(self):
self.store = InMemoryStore(
index={
"embed": OpenAIEmbeddings().embed_documents,
"dims": 1536,
}
)

def save_memory(self, user_id: str, content: str, metadata: dict = None):
"""保存记忆"""
namespace = ("memories", user_id)
memory_id = str(uuid.uuid4())

self.store.put(
namespace,
memory_id,
{
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
}
)

return memory_id

def recall_memories(self, user_id: str, query: str, limit: int = 5) -> list:
"""检索记忆"""
namespace = ("memories", user_id)

# 使用语义搜索
results = self.store.search(namespace, query=query, limit=limit)

return [
{
"content": r.value["content"],
"timestamp": r.value["timestamp"],
"score": r.score
}
for r in results
]

def cleanup_old_memories(self, user_id: str, days: int = 90):
"""清理过期记忆"""
namespace = ("memories", user_id)
all_memories = self.store.search(namespace, limit=1000)

cutoff = datetime.now() - timedelta(days=days)

for memory in all_memories:
timestamp = datetime.fromisoformat(memory.value["timestamp"])
if timestamp < cutoff:
self.store.delete(namespace, memory.key)

错误处理最佳实践

分层错误处理

from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, create_react_agent
import logging

logger = logging.getLogger(__name__)

# 第一层:工具内部错误处理
@tool
def robust_api_call(endpoint: str) -> str:
"""健壮的 API 调用"""
try:
response = requests.get(endpoint, timeout=10)
response.raise_for_status()
return response.text
except requests.Timeout:
logger.warning(f"API 超时: {endpoint}")
return "请求超时,请稍后重试"
except requests.HTTPError as e:
logger.error(f"HTTP 错误: {e}")
return f"服务暂时不可用 (HTTP {e.response.status_code})"
except Exception as e:
logger.exception(f"未知错误: {e}")
return "发生未知错误,请联系管理员"

# 第二层:ToolNode 错误处理
tool_node = ToolNode(
[robust_api_call],
handle_tool_errors=lambda e, tc: f"工具执行失败: {str(e)[:100]}"
)

# 第三层:Agent 级别的错误处理
def safe_invoke(agent, user_input: str, config: dict = None) -> dict:
"""安全的 Agent 调用"""
try:
return agent.invoke(
{"messages": [{"role": "user", "content": user_input}]},
config=config
)
except Exception as e:
logger.exception(f"Agent 执行失败: {e}")
return {
"messages": [{
"role": "assistant",
"content": "抱歉,处理您的请求时出现问题。请稍后重试或联系客服。"
}]
}

重试和降级策略

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ResilientAgent:
"""弹性 Agent"""

def __init__(self, primary_model, fallback_model, tools):
self.primary_agent = create_react_agent(
model=primary_model,
tools=tools
)
self.fallback_agent = create_react_agent(
model=fallback_model,
tools=tools
)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(ConnectionError)
)
def invoke(self, user_input: str, config: dict = None):
"""带重试的调用"""
try:
return self.primary_agent.invoke(
{"messages": [{"role": "user", "content": user_input}]},
config=config
)
except Exception as e:
logger.warning(f"主模型失败,切换到备用模型: {e}")
return self.fallback_agent.invoke(
{"messages": [{"role": "user", "content": user_input}]},
config=config
)

性能优化最佳实践

1. 并行工具调用

LangGraph 默认支持并行执行工具调用:

from langgraph.prebuilt import create_react_agent

@tool
def get_weather(city: str) -> str:
"""获取天气"""
return f"{city}: 晴"

@tool
def get_news(topic: str) -> str:
"""获取新闻"""
return f"{topic} 新闻..."

# Agent 会自动并行执行多个工具调用
agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_weather, get_news]
)

# 用户请求多个信息时,工具会并行执行
result = agent.invoke({
"messages": [{"role": "user", "content": "北京天气怎么样?最近有什么科技新闻?"}]
})

2. 流式输出

使用流式输出提升用户体验:

async def stream_agent_response(agent, user_input: str):
"""流式输出响应"""
async for chunk in agent.astream({
"messages": [{"role": "user", "content": user_input}]
}):
if "agent" in chunk:
for msg in chunk["agent"]["messages"]:
if msg.content:
yield msg.content

elif "tools" in chunk:
for msg in chunk["tools"]["messages"]:
yield f"\n[工具结果] {msg.content[:100]}...\n"

3. 缓存策略

from functools import lru_cache
from datetime import datetime, timedelta
import hashlib

class ToolCache:
"""工具结果缓存"""

def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds

def get_key(self, tool_name: str, args: dict) -> str:
"""生成缓存键"""
args_str = str(sorted(args.items()))
return hashlib.md5(f"{tool_name}:{args_str}".encode()).hexdigest()

def get(self, tool_name: str, args: dict) -> str | None:
"""获取缓存"""
key = self.get_key(tool_name, args)
if key in self.cache:
value, timestamp = self.cache[key]
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
return value
else:
del self.cache[key]
return None

def set(self, tool_name: str, args: dict, result: str):
"""设置缓存"""
key = self.get_key(tool_name, args)
self.cache[key] = (result, datetime.now())

# 使用缓存的工具
cache = ToolCache()

@tool
def cached_search(query: str) -> str:
"""带缓存的搜索"""
# 检查缓存
cached = cache.get("search", {"query": query})
if cached:
return cached

# 执行搜索
result = expensive_search(query)

# 存入缓存
cache.set("search", {"query": query}, result)

return result

安全最佳实践

1. 输入验证

from pydantic import BaseModel, Field, field_validator
import re

class SecureInput(BaseModel):
"""安全输入验证"""
message: str = Field(..., max_length=5000)

@field_validator('message')
@classmethod
def sanitize(cls, v):
# 检测并阻止潜在的注入攻击
dangerous_patterns = [
r'<script.*?>.*?</script>', # XSS
r'UNION.*?SELECT', # SQL 注入
r'\$\{.*?\}', # 模板注入
]

for pattern in dangerous_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise ValueError("输入包含不允许的内容")

return v

2. 敏感信息过滤

import re

def sanitize_output(text: str) -> str:
"""过滤敏感信息"""
patterns = [
# 银行卡号
(r'\b\d{16,19}\b', '[卡号已隐藏]'),
# 身份证号
(r'\b\d{17}[\dXx]\b', '[身份证已隐藏]'),
# 手机号
(r'\b1[3-9]\d{9}\b', '[手机号已隐藏]'),
# 邮箱
(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[邮箱已隐藏]'),
# API 密钥
(r'(api[_-]?key|token|secret)["\']?\s*[:=]\s*["\']?\w+', '[密钥已隐藏]'),
]

for pattern, replacement in patterns:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)

return text

@tool
def safe_output_tool(input: str) -> str:
"""带敏感信息过滤的工具"""
result = process_input(input)
return sanitize_output(result)

3. 访问控制

from typing import Annotated
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig

@tool
def admin_only_action(action: str, config: RunnableConfig) -> str:
"""仅管理员可执行的操作"""
user_role = config.get("configurable", {}).get("user_role", "user")

if user_role != "admin":
return "权限不足:此操作需要管理员权限"

# 执行管理员操作
return f"已执行: {action}"

# 在调用时传递用户角色
result = agent.invoke(
{"messages": [{"role": "user", "content": "删除所有用户"}]},
config={"configurable": {"user_role": "admin"}}
)

监控与可观测性

关键指标监控

from dataclasses import dataclass
from datetime import datetime
import statistics

@dataclass
class AgentMetrics:
"""Agent 性能指标"""
request_count: int = 0
success_count: int = 0
error_count: int = 0
latencies: list = None
token_counts: list = None

def __post_init__(self):
self.latencies = []
self.token_counts = []

def record(self, success: bool, latency: float, tokens: int):
"""记录一次执行"""
self.request_count += 1
if success:
self.success_count += 1
else:
self.error_count += 1
self.latencies.append(latency)
self.token_counts.append(tokens)

def get_stats(self) -> dict:
"""获取统计信息"""
return {
"total_requests": self.request_count,
"success_rate": self.success_count / self.request_count if self.request_count > 0 else 0,
"avg_latency": statistics.mean(self.latencies) if self.latencies else 0,
"p95_latency": statistics.quantiles(self.latencies, n=20)[-1] if len(self.latencies) > 20 else 0,
"avg_tokens": statistics.mean(self.token_counts) if self.token_counts else 0,
}

# 使用示例
metrics = AgentMetrics()

def monitored_invoke(agent, user_input: str, config: dict = None):
"""带监控的调用"""
start_time = datetime.now()

try:
result = agent.invoke(
{"messages": [{"role": "user", "content": user_input}]},
config=config
)

latency = (datetime.now() - start_time).total_seconds()
tokens = sum(
msg.response_metadata.get("token_usage", {}).get("total_tokens", 0)
for msg in result.get("messages", [])
if hasattr(msg, "response_metadata")
)

metrics.record(success=True, latency=latency, tokens=tokens)
return result

except Exception as e:
latency = (datetime.now() - start_time).total_seconds()
metrics.record(success=False, latency=latency, tokens=0)
raise

完整示例:生产级 Agent

from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("production-agent")

# 工具定义
@tool
def search(query: str) -> str:
"""搜索信息"""
try:
# 实际的搜索逻辑
return f"搜索结果:{query}"
except Exception as e:
logger.error(f"搜索失败: {e}")
return "搜索服务暂时不可用"

@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
return "错误:表达式包含非法字符"
return str(eval(expression))
except Exception as e:
return f"计算错误: {e}"

# 创建生产级 Agent
def create_production_agent():
"""创建生产级 Agent"""

# 配置持久化
checkpointer = MemorySaver() # 生产环境使用数据库
store = InMemoryStore() # 生产环境使用数据库

# 动态提示词
def get_system_prompt():
today = datetime.now().strftime("%Y年%m月%d日")
return f"""你是一个专业的智能助手。

当前日期:{today}

你的职责:
1. 准确回答用户问题
2. 在需要时使用工具获取信息
3. 给出清晰、有帮助的回答

回答要求:
- 语气友好、专业
- 回答简洁明了
- 如果不确定,诚实告知

如果工具调用失败,请告知用户并尝试其他方法。"""

agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[search, calculate],
prompt=get_system_prompt(),
checkpointer=checkpointer,
store=store,
)

return agent

# 使用示例
agent = create_production_agent()

config = {"configurable": {"thread_id": "user-123-session-1"}}

result = agent.invoke(
{"messages": [{"role": "user", "content": "你好,请介绍一下自己"}]},
config=config
)

print(result["messages"][-1].content)

小结

构建生产级 Agent 需要关注以下方面:

  • 设计原则:单一职责、清晰的工具描述、有效的提示词
  • 记忆管理:合理管理短期和长期记忆
  • 错误处理:分层处理、重试降级
  • 性能优化:并行执行、流式输出、缓存
  • 安全性:输入验证、敏感信息过滤、访问控制
  • 可观测性:监控关键指标、完善的日志

遵循这些最佳实践,可以构建出可靠、高效、安全的 Agent 应用。

参考资料