跳到主要内容

生产部署

将 AI Agent 从原型转化为生产级应用需要考虑可靠性、可扩展性、安全性和可维护性。本章将介绍 Agent 生产部署的各种方案和最佳实践。

部署前准备

生产级 Agent 检查清单

在部署前,确保你的 Agent 满足以下条件:

  • 错误处理完善,不会因异常而崩溃
  • 有合理的超时和重试机制
  • 敏感信息已加密或脱敏
  • 有适当的日志记录
  • 通过了评估测试
  • 有监控和告警机制

代码示例:生产级 Agent

from langchain.agents import create_agent, Middleware
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
import logging
import os

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 定义工具
@tool
def search(query: str) -> str:
"""搜索信息"""
try:
# 实际的搜索逻辑
return f"搜索结果:{query}"
except Exception as e:
logger.error(f"搜索失败: {e}")
return "搜索服务暂时不可用,请稍后重试"

@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
# 安全的计算方式
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
return "表达式包含非法字符"
result = eval(expression)
return str(result)
except Exception as e:
logger.error(f"计算失败: {e}")
return f"计算错误:{e}"

# 生产级中间件
class ProductionMiddleware(Middleware):
"""生产环境中间件"""

def before_model_call(self, request):
"""调用模型前的处理"""
logger.info(f"调用模型,消息数量:{len(request.messages)}")

# 限制消息长度
total_length = sum(len(m.content) for m in request.messages if hasattr(m, 'content'))
if total_length > 50000:
logger.warning(f"消息过长:{total_length}")

return request

def after_model_call(self, response):
"""调用模型后的处理"""
logger.info(f"模型响应完成")
return response

def on_tool_error(self, error, tool_call):
"""工具执行错误处理"""
logger.error(f"工具 {tool_call.name} 执行失败: {error}")
return f"工具执行出错,已记录日志。请尝试其他方式完成任务。"

# 创建 Agent
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
timeout=30,
max_retries=3
)

agent = create_agent(
model=llm,
tools=[search, calculator],
system_prompt="你是一个智能助手,请用专业、礼貌的语气回答用户问题。",
middleware=[ProductionMiddleware()]
)

LangSmith Deployment

LangSmith Deployment 是 LangChain 官方的 Agent 部署平台,专为长时间运行的 Agent 工作负载设计。

核心优势

  • 持久化执行:支持长时间运行的任务,即使网络中断也能恢复
  • 自动扩缩容:根据负载自动调整资源
  • 内置监控:与 LangSmith 可观测性无缝集成
  • 状态管理:自动保存和恢复 Agent 状态

部署步骤

# 安装 LangGraph CLI
pip install langgraph-cli

# 初始化项目
langgraph new my-agent-project

# 进入项目目录
cd my-agent-project

# 配置 langgraph.json

配置文件

创建 langgraph.json 配置文件:

{
"python_version": "3.11",
"dependencies": [
"langchain",
"langchain-openai",
"langgraph"
],
"graphs": {
"agent": "./agent.py:graph"
},
"env": ".env"
}

Agent 定义

创建 agent.py

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver

@tool
def search(query: str) -> str:
"""搜索互联网信息"""
return f"搜索结果:{query}"

@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
return str(eval(expression))
except:
return "计算错误"

# 创建模型
llm = ChatOpenAI(model="gpt-4", temperature=0)

# 创建带持久化的 Agent
checkpointer = MemorySaver()

graph = create_agent(
model=llm,
tools=[search, calculator],
system_prompt="你是一个智能助手。",
checkpointer=checkpointer
)

部署命令

# 本地测试
langgraph dev

# 部署到 LangSmith
langgraph deploy

# 查看部署状态
langgraph status

API 调用

部署后可以通过 API 调用 Agent:

import requests

API_URL = "https://your-agent.langchain.run"
API_KEY = "your-api-key"

# 创建线程(会话)
thread_response = requests.post(
f"{API_URL}/threads",
headers={"Authorization": f"Bearer {API_KEY}"}
)
thread_id = thread_response.json()["thread_id"]

# 发送消息
response = requests.post(
f"{API_URL}/threads/{thread_id}/runs/stream",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
json={
"assistant_id": "agent",
"input": {
"messages": [
{"role": "user", "content": "你好,请介绍一下自己"}
]
}
},
stream=True
)

# 处理流式响应
for line in response.iter_lines():
if line:
print(line.decode())

LangServe 部署

LangServe 可以将任何 LangChain 应用部署为 REST API。

安装

pip install langserve

基本部署

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langserve import add_routes
from fastapi import FastAPI
import uvicorn

@tool
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果:{query}"

@tool
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
return str(eval(expression))
except:
return "计算错误"

# 创建 Agent
llm = ChatOpenAI(model="gpt-4", temperature=0)
agent = create_agent(
model=llm,
tools=[search, calculator],
system_prompt="你是一个智能助手。"
)

# 创建 FastAPI 应用
app = FastAPI(title="AI Agent API")

# 添加 Agent 路由
add_routes(
app,
agent,
path="/agent",
enable_feedback_endpoint=True,
enable_trace_endpoint=True
)

# 添加健康检查
@app.get("/health")
def health_check():
return {"status": "healthy"}

# 启动服务
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

客户端调用

from langserve import RemoteRunnable

# 连接到远程 Agent
remote_agent = RemoteRunnable("http://localhost:8000/agent")

# 调用 Agent
result = remote_agent.invoke({
"messages": [{"role": "user", "content": "计算 123 * 456"}]
})

print(result["messages"][-1].content)

# 流式调用
for chunk in remote_agent.stream({
"messages": [{"role": "user", "content": "介绍一下 Python"}]
}):
print(chunk)

高级配置

from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="AI Agent API")

# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# 添加认证中间件
from fastapi import Depends, HTTPException, Header

async def verify_api_key(api_key: str = Header(...)):
if api_key != os.environ.get("API_KEY"):
raise HTTPException(status_code=401, detail="Invalid API Key")
return api_key

# 需要认证的路由
@app.post("/secure-agent")
async def secure_agent_endpoint(request: dict, api_key: str = Depends(verify_api_key)):
return agent.invoke(request)

容器化部署

Docker 部署

创建 Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 设置环境变量
ENV PYTHONUNBUFFERED=1

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

创建 requirements.txt

langchain>=0.3.0
langchain-openai>=0.2.0
langgraph>=0.2.0
langserve>=0.3.0
fastapi>=0.109.0
uvicorn>=0.27.0

创建 docker-compose.yml

version: '3.8'

services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGSMITH_API_KEY=${LANGSMITH_API_KEY}
- LANGSMITH_TRACING=true
- LANGSMITH_PROJECT=production
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3

# Redis 用于状态持久化
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data

volumes:
redis_data:

Kubernetes 部署

创建 k8s/deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-api
labels:
app: agent-api
spec:
replicas: 3
selector:
matchLabels:
app: agent-api
template:
metadata:
labels:
app: agent-api
spec:
containers:
- name: agent-api
image: your-registry/agent-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: LANGSMITH_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: langsmith-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: agent-api-service
spec:
selector:
app: agent-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-api
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70

监控与可观测性

LangSmith 监控

import os

# 启用 LangSmith 追踪
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "production"

# Agent 的所有执行都会自动记录到 LangSmith

Prometheus 指标

from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

# 定义指标
REQUEST_COUNT = Counter(
'agent_requests_total',
'Total agent requests',
['method', 'endpoint']
)

REQUEST_LATENCY = Histogram(
'agent_request_latency_seconds',
'Request latency',
['endpoint']
)

TOOL_CALLS = Counter(
'agent_tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)

# 在 Agent 中记录指标
@app.middleware("http")
async def add_metrics(request, call_next):
REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path).inc()

with REQUEST_LATENCY.labels(endpoint=request.url.path).time():
response = await call_next(request)

return response

# 暴露指标端点
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)

日志记录

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName
}

if hasattr(record, 'extra'):
log_data.update(record.extra)

return json.dumps(log_data)

# 配置日志
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())

logger = logging.getLogger("agent")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

安全考虑

API 密钥管理

import os
from cryptography.fernet import Fernet

class SecretManager:
"""密钥管理器"""

def __init__(self):
key = os.environ.get("ENCRYPTION_KEY")
self.cipher = Fernet(key.encode()) if key else None

def encrypt(self, value: str) -> str:
if not self.cipher:
return value
return self.cipher.encrypt(value.encode()).decode()

def decrypt(self, value: str) -> str:
if not self.cipher:
return value
return self.cipher.decrypt(value.encode()).decode()

输入验证

from pydantic import BaseModel, Field, validator
import re

class AgentRequest(BaseModel):
"""Agent 请求验证"""

message: str = Field(..., max_length=10000)
user_id: str = Field(..., pattern=r'^[a-zA-Z0-9-]+$')

@validator('message')
def validate_message(cls, v):
# 检查是否包含敏感信息
sensitive_patterns = [
r'\b\d{16,19}\b', # 银行卡号
r'\b\d{17}[\dXx]\b', # 身份证号
r'\b[\w\.-]+@[\w\.-]+\.\w+\b', # 邮箱
]

for pattern in sensitive_patterns:
if re.search(pattern, v):
raise ValueError('消息包含敏感信息,请删除后重试')

return v

@app.post("/chat")
async def chat(request: AgentRequest):
result = agent.invoke({
"messages": [{"role": "user", "content": request.message}]
})
return {"response": result["messages"][-1].content}

速率限制

from fastapi import FastAPI, Request
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import redis.asyncio as redis

app = FastAPI()

@app.on_event("startup")
async def startup():
redis_client = redis.from_url("redis://localhost:6379")
await FastAPILimiter.init(redis_client)

@app.post("/chat", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def chat(request: dict):
# 限制每分钟 10 次请求
return agent.invoke(request)

小结

生产部署是将 Agent 从原型转化为实际应用的关键步骤:

  • 使用 LangSmith Deployment 获得托管服务体验
  • 使用 LangServe 快速构建 REST API
  • 容器化部署确保环境一致性
  • 完善监控和日志系统
  • 重视安全性,保护敏感信息

参考资料