部署上线
将 LangChain 应用部署到生产环境需要考虑 API 暴露、性能优化、可观测性等多个方面。本章将介绍几种主流的部署方案。
部署方案概览
| 方案 | 适用场景 | 复杂度 | 推荐程度 |
|---|---|---|---|
| LangGraph Platform | Agent 应用、需要状态管理 | 中等 | ⭐⭐⭐⭐⭐ |
| LangServe | 简单 Chain、快速原型 | 低 | ⭐⭐⭐ |
| FastAPI 直接集成 | 需要完全控制 | 中等 | ⭐⭐⭐⭐ |
| 云平台部署 | 无需运维 | 低 | ⭐⭐⭐⭐ |
对于新项目,官方推荐使用 LangGraph Platform 而非 LangServe。LangServe 主要用于简单 Runnable 的部署,而 LangGraph Platform 提供了更完整的企业级功能支持。
LangGraph Platform 部署
LangGraph Platform 是部署 LangGraph Agent 应用的官方推荐方式,提供了完整的 API 服务、状态持久化和流式输出支持。
架构概述
LangGraph Platform 需要以下基础设施:
- PostgreSQL:存储对话线程、运行状态和长期记忆
- Redis:支持实时流式输出的消息代理
- LangGraph Server:API 服务进程
环境准备
安装 LangGraph CLI:
pip install -U langgraph-cli
应用结构
LangGraph 应用需要特定的目录结构:
my-agent-app/
├── langgraph.json # 配置文件
├── requirements.txt # Python 依赖
├── .env # 环境变量
└── src/
└── agent/
├── __init__.py
└── graph.py # Agent 定义
langgraph.json 配置示例:
{
"python_version": "3.11",
"dependencies": ["./requirements.txt"],
"graphs": {
"agent": "./src/agent/graph.py:app"
},
"env": ".env"
}
src/agent/graph.py:
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.tools import tool
from langgraph.checkpoint.memory import MemorySaver
@tool
def get_weather(city: str) -> str:
"""获取城市天气"""
return f"{city}今天晴,25°C"
@tool
def search_web(query: str) -> str:
"""搜索网络信息"""
return f"搜索结果:{query}..."
# 定义 Agent
model = init_chat_model("openai:gpt-4o-mini")
tools = [get_weather, search_web]
app = create_agent(
model=model,
tools=tools,
checkpointer=MemorySaver(),
system_prompt="你是一个智能助手"
)
本地开发
启动本地开发服务器:
# 启动开发服务器
langgraph dev
# 指定端口
langgraph dev --port 8123
访问 http://localhost:8123 查看 API 文档。
构建 Docker 镜像
生产环境使用 Docker 部署:
# 构建镜像
langgraph build -t my-agent-app
# 运行容器(需要 Redis 和 Postgres)
docker run \
--env-file .env \
-p 8123:8000 \
-e REDIS_URI="redis://redis:6379" \
-e DATABASE_URI="postgres://user:pass@postgres:5432/langgraph" \
-e LANGSMITH_API_KEY="ls-xxx" \
my-agent-app
Docker Compose 部署
完整的 Docker Compose 配置:
# docker-compose.yml
version: "3.8"
volumes:
langgraph-data:
driver: local
services:
langgraph-redis:
image: redis:6
healthcheck:
test: redis-cli ping
interval: 5s
timeout: 1s
retries: 5
langgraph-postgres:
image: postgres:16
environment:
POSTGRES_DB: langgraph
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- langgraph-data:/var/lib/postgresql/data
healthcheck:
test: pg_isready -U postgres
interval: 5s
timeout: 1s
retries: 5
langgraph-api:
image: my-agent-app
ports:
- "8123:8000"
depends_on:
langgraph-redis:
condition: service_healthy
langgraph-postgres:
condition: service_healthy
env_file:
- .env
environment:
REDIS_URI: redis://langgraph-redis:6379
DATABASE_URI: postgres://postgres:postgres@langgraph-postgres:5432/langgraph?sslmode=disable
# 可选:Nginx 反向代理
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- langgraph-api
启动服务:
docker compose up -d
环境变量
LangGraph Platform 需要以下关键环境变量:
# .env 文件
# LLM API Keys
OPENAI_API_KEY=sk-xxx
ANTHROPIC_API_KEY=sk-ant-xxx
# LangSmith(可选,用于追踪)
LANGSMITH_TRACING=true
LANGSMITH_API_KEY=ls-xxx
# 数据库连接
REDIS_URI=redis://localhost:6379
DATABASE_URI=postgres://user:pass@localhost:5432/langgraph
# LangGraph Cloud 许可证(企业版需要)
# LANGGRAPH_CLOUD_LICENSE_KEY=xxx
API 端点
LangGraph Platform 自动暴露以下 API:
| 端点 | 方法 | 说明 |
|---|---|---|
/threads | POST | 创建对话线程 |
/threads/{thread_id}/runs | POST | 执行 Agent |
/threads/{thread_id}/runs/stream | POST | 流式执行 |
/threads/{thread_id}/state | GET | 获取状态 |
/assistants | GET | 列出助手 |
创建线程并执行示例:
import requests
BASE_URL = "http://localhost:8123"
# 创建线程
thread = requests.post(f"{BASE_URL}/threads").json()
thread_id = thread["thread_id"]
# 执行 Agent
response = requests.post(
f"{BASE_URL}/threads/{thread_id}/runs/stream",
json={
"assistant_id": "agent",
"input": {
"messages": [{"role": "user", "content": "北京天气怎么样?"}]
}
},
stream=True
)
# 流式读取响应
for line in response.iter_lines():
if line:
print(line.decode())
LangServe 部署
LangServe 是将 LangChain Runnable 快速部署为 REST API 的工具,适合简单的 Chain 和快速原型。
LangServe 已不再积极开发新功能,新项目推荐使用 LangGraph Platform。
安装
# 完整安装
pip install "langserve[all]"
# 或分别安装
pip install "langserve[server]" # 服务端
pip install "langserve[client]" # 客户端
创建服务
# server.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
from langserve import add_routes
# 创建 FastAPI 应用
app = FastAPI(
title="LangChain API Server",
version="1.0",
description="LangChain 应用 API"
)
# 配置 CORS(允许浏览器访问)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化模型
model = init_chat_model("openai:gpt-4o-mini")
# 创建链
prompt = ChatPromptTemplate.from_template("你是一个{role},请回答:{question}")
chain = prompt | model | StrOutputParser()
# 添加路由
add_routes(
app,
chain,
path="/chain",
enable_feedback_endpoint=True, # 启用反馈端点
)
# 直接暴露模型
add_routes(
app,
model,
path="/chat",
)
# 启动服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
启动服务:
python server.py
# 或使用 uvicorn
uvicorn server:app --host 0.0.0.0 --port 8000 --reload
自动生成的 API 端点
LangServe 为每个路由自动创建以下端点:
| 端点 | 方法 | 说明 |
|---|---|---|
/path/invoke | POST | 单次调用 |
/path/batch | POST | 批量调用 |
/path/stream | POST | 流式输出 |
/path/stream_log | POST | 流式输出(含中间步骤) |
/path/input_schema | GET | 输入格式定义 |
/path/output_schema | GET | 输出格式定义 |
/path/playground/ | GET | 可视化测试页面 |
客户端调用
Python SDK:
from langserve import RemoteRunnable
# 连接远程服务
chain = RemoteRunnable("http://localhost:8000/chain")
# 调用
result = chain.invoke({"role": "Python专家", "question": "什么是装饰器?"})
print(result)
# 流式调用
for chunk in chain.stream({"role": "AI助手", "question": "讲个故事"}):
print(chunk, end="", flush=True)
# 批量调用
results = chain.batch([
{"role": "翻译", "question": "Hello"},
{"role": "翻译", "question": "World"}
])
HTTP 直接调用:
# 使用 curl
curl -X POST "http://localhost:8000/chain/invoke" \
-H "Content-Type: application/json" \
-d '{"input": {"role": "专家", "question": "什么是Python?"}}'
# 使用 Python requests
import requests
response = requests.post(
"http://localhost:8000/chain/invoke",
json={"input": {"role": "专家", "question": "什么是Python?"}}
)
print(response.json())
JavaScript/TypeScript:
import { RemoteRunnable } from "@langchain/core/runnables/remote";
const chain = new RemoteRunnable({
url: "http://localhost:8000/chain",
});
const result = await chain.invoke({
role: "专家",
question: "什么是Python?",
});
添加认证
使用 FastAPI 的依赖注入实现认证:
from fastapi import FastAPI, Depends, HTTPException, Header
from langserve import add_routes
app = FastAPI()
def verify_api_key(x_api_key: str = Header(...)):
"""验证 API Key"""
valid_keys = ["key1", "key2"]
if x_api_key not in valid_keys:
raise HTTPException(status_code=401, detail="Invalid API Key")
return x_api_key
# 方式一:全局认证
@app.middleware("http")
async def auth_middleware(request, call_next):
# 排除文档页面
if request.url.path in ["/docs", "/openapi.json"]:
return await call_next(request)
api_key = request.headers.get("X-API-Key")
if not api_key or api_key not in ["key1", "key2"]:
return JSONResponse(
status_code=401,
content={"detail": "Invalid API Key"}
)
return await call_next(request)
# 方式二:路由级认证
add_routes(
app,
chain,
path="/chain",
dependencies=[Depends(verify_api_key)]
)
FastAPI 直接集成
对于需要完全控制 API 结构的场景,可以直接使用 FastAPI 构建:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import StrOutputParser
import asyncio
app = FastAPI(title="My LangChain API")
# 初始化
model = init_chat_model("openai:gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("回答问题:{question}")
chain = prompt | model | StrOutputParser()
# 请求模型
class QueryRequest(BaseModel):
question: str
temperature: float = 0.7
class QueryResponse(BaseModel):
answer: str
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""同步问答"""
try:
result = chain.invoke({"question": request.question})
return QueryResponse(answer=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query/stream")
async def query_stream(request: QueryRequest):
"""流式问答"""
async def generate():
try:
async for chunk in chain.astream({"question": request.question}):
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"data: ERROR: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy"}
生产级配置
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
openai_api_key: str
model_name: str = "gpt-4o-mini"
max_tokens: int = 1000
rate_limit: int = 100 # 每分钟请求数
class Config:
env_file = ".env"
settings = Settings()
# main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import time
from collections import defaultdict
# 速率限制
rate_limit_store = defaultdict(list)
def check_rate_limit(client_id: str, limit: int = 100) -> bool:
"""简单的速率限制"""
now = time.time()
window = 60 # 1分钟窗口
# 清理过期记录
rate_limit_store[client_id] = [
t for t in rate_limit_store[client_id]
if now - t < window
]
if len(rate_limit_store[client_id]) >= limit:
return False
rate_limit_store[client_id].append(now)
return True
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化
print("Starting up...")
yield
# 关闭时清理
print("Shutting down...")
app = FastAPI(lifespan=lifespan)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-domain.com"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.client.host
if not check_rate_limit(client_id):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)
return await call_next(request)
云平台部署
AWS 部署
使用 AWS Copilot:
# 安装 copilot
pip install aws-copilot
# 初始化应用
copilot init --app my-langchain-app --name api --type 'Load Balanced Web Service' --dockerfile './Dockerfile' --deploy
Azure 部署
使用 Azure Container Apps:
# 部署到 Azure
az containerapp up \
--name my-langchain-api \
--source . \
--resource-group my-rg \
--environment my-env \
--ingress external \
--target-port 8000 \
--env-vars OPENAI_API_KEY=your_key
Google Cloud 部署
使用 Cloud Run:
# 部署到 Cloud Run
gcloud run deploy my-langchain-api \
--source . \
--port 8000 \
--allow-unauthenticated \
--region us-central1 \
--set-env-vars OPENAI_API_KEY=your_key
生产环境最佳实践
1. 环境配置
# 使用环境变量管理敏感信息
OPENAI_API_KEY=sk-xxx
LANGSMITH_TRACING=true
LANGSMITH_API_KEY=ls-xxx
# 生产环境配置
LANGCHAIN_CALLBACKS_BACKGROUND=false # 服务端同步发送追踪
2. 健康检查和就绪检查
from fastapi import FastAPI
from datetime import datetime
app = FastAPI()
start_time = datetime.now()
@app.get("/health")
async def health():
"""存活检查"""
return {"status": "alive"}
@app.get("/ready")
async def ready():
"""就绪检查"""
# 检查必要的服务连接
checks = {
"database": check_database_connection(),
"redis": check_redis_connection(),
"llm": check_llm_availability()
}
all_ready = all(checks.values())
return {
"ready": all_ready,
"checks": checks,
"uptime": str(datetime.now() - start_time)
}
3. 日志和监控
import logging
from langsmith import Client
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# LangSmith 监控
client = Client()
def log_request(request_id: str, input_data: dict, output: str):
"""记录请求日志"""
logger.info(f"Request {request_id}: {input_data}")
# 发送到监控系统
client.create_run(
name="api_request",
run_type="llm",
inputs=input_data,
outputs={"result": output}
)
4. 错误处理
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from langchain_core.exceptions import OutputParserException
app = FastAPI()
@app.exception_handler(OutputParserException)
async def output_parser_handler(request: Request, exc: OutputParserException):
return JSONResponse(
status_code=422,
content={
"detail": "Output parsing failed",
"error": str(exc)
}
)
@app.exception_handler(Exception)
async def general_handler(request: Request, exc: Exception):
logger.error(f"Unhandled error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
5. 资源限制
from fastapi import FastAPI
import resource
app = FastAPI()
# 限制内存使用
def set_memory_limit(mb: int):
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (mb * 1024 * 1024, hard))
# 在启动时设置
# set_memory_limit(1024) # 限制为 1GB
6. 超时控制
from fastapi import FastAPI
from langchain_core.runnables import RunnableConfig
import asyncio
app = FastAPI()
@app.post("/query")
async def query(request: QueryRequest):
try:
# 设置超时
result = await asyncio.wait_for(
chain.ainvoke({"question": request.question}),
timeout=30.0 # 30秒超时
)
return {"answer": result}
except asyncio.TimeoutError:
return {"error": "Request timeout"}
部署检查清单
在部署到生产环境前,确认以下事项:
- 环境变量正确配置,敏感信息不暴露在代码中
- API 认证机制已实现
- 速率限制已配置
- 日志和监控已设置
- 健康检查端点可访问
- 数据库和缓存服务正常运行
- 错误处理完善,异常不会导致服务崩溃
- 超时控制已设置,避免长时间运行的请求
- Docker 镜像已优化,体积合理
- 负载测试已通过