Agentic RAG
Agentic RAG(智能检索增强生成)是传统 RAG 的升级版。它让 Agent 能够自主决定是否需要检索外部知识、如何优化检索查询、以及如何评估检索结果的质量。本章将详细介绍 Agentic RAG 的概念、实现方法和最佳实践。
什么是 Agentic RAG
传统 RAG(Retrieval-Augmented Generation)的工作流程是固定的:用户提问 → 检索文档 → 生成回答。这种方式存在几个问题:
- 盲目检索:无论问题是否需要,都会进行检索
- 检索质量不可控:检索到的文档可能不相关,但系统无法判断
- 缺乏自我修正:如果检索结果不好,无法改进检索策略
Agentic RAG 通过引入 Agent 的决策能力解决了这些问题:
Agentic RAG vs 传统 RAG
| 特性 | 传统 RAG | Agentic RAG |
|---|---|---|
| 检索决策 | 总是检索 | Agent 自主判断是否需要检索 |
| 查询优化 | 直接使用原问题 | 可重写查询以提高检索效果 |
| 结果评估 | 无评估机制 | 评估文档相关性,不相关则重试 |
| 自我修正 | 无 | 可以多次迭代优化 |
| 灵活性 | 低 | 高 |
核心组件
Agentic RAG 系统包含以下核心组件:
1. 决策节点
决定是否需要检索外部知识:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
def generate_query_or_respond(state: dict) -> dict:
"""
决策节点:判断是否需要检索,或直接回答
工作原理:
1. 分析用户问题
2. 判断是否需要外部知识
3. 需要则生成检索查询,否则直接回答
"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 绑定检索工具,让模型知道可以调用
llm_with_tools = llm.bind_tools([retriever_tool])
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
2. 检索工具
从知识库中检索相关文档:
from langchain_core.tools import tool
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=split_docs, # 预处理后的文档块
embedding=embeddings,
persist_directory="./chroma_db"
)
@tool
def retriever_tool(query: str) -> str:
"""
从知识库中检索相关文档
参数:
- query: 搜索查询
返回与查询最相关的文档内容
"""
docs = vectorstore.similarity_search(query, k=3)
if not docs:
return "未找到相关文档"
result = []
for i, doc in enumerate(docs):
result.append(f"文档 {i+1}:\n{doc.page_content}\n")
return "\n---\n".join(result)
3. 文档评估节点
评估检索到的文档是否与问题相关:
from pydantic import BaseModel, Field
from typing import Literal
class GradeDocuments(BaseModel):
"""文档相关性评估结果"""
score: Literal["relevant", "not_relevant"] = Field(
description="判断文档是否与问题相关"
)
def grade_documents(state: dict) -> str:
"""
文档评估节点:判断检索结果是否相关
工作原理:
1. 获取最后一个工具调用的结果
2. 使用 LLM 评估相关性
3. 返回下一步路由
"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 获取检索结果
messages = state["messages"]
last_message = messages[-1]
if not hasattr(last_message, "content"):
return "generate_answer"
# 获取原始问题
question = None
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
question = msg.content
break
if not question:
return "generate_answer"
# 使用结构化输出评估相关性
structured_llm = llm.with_structured_output(GradeDocuments)
prompt = f"""请评估以下文档是否与问题相关:
问题:{question}
检索到的文档:
{last_message.content}
如果文档包含可以回答问题的关键信息,返回 'relevant'。
如果文档与问题无关或无法帮助回答问题,返回 'not_relevant'。"""
result = structured_llm.invoke(prompt)
if result.score == "relevant":
return "generate_answer"
else:
return "rewrite_question"
4. 查询重写节点
当检索结果不理想时,重写查询以获得更好的结果:
def rewrite_question(state: dict) -> dict:
"""
查询重写节点:优化检索查询
工作原理:
1. 分析原始问题和失败的检索结果
2. 生成更好的检索查询
3. 用新查询替换原问题
"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
messages = state["messages"]
# 获取原始问题
question = None
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
question = msg.content
break
# 获取失败的检索结果
last_tool_result = None
for msg in reversed(messages):
if hasattr(msg, "name") and msg.name == "retriever_tool":
last_tool_result = msg.content
break
prompt = f"""原始问题:{question}
之前的检索结果不够好:
{last_tool_result}
请重写问题,使其更容易检索到相关信息。
要求:
1. 保持原始意图不变
2. 使用更具体的关键词
3. 消除歧义
只输出重写后的问题:"""
response = llm.invoke(prompt)
# 创建新的用户消息,替换原问题
new_question = HumanMessage(content=response.content)
# 保留原始问题用于后续生成答案
return {
"messages": [new_question],
"original_question": question
}
5. 答案生成节点
基于检索到的上下文生成最终答案:
def generate_answer(state: dict) -> dict:
"""
答案生成节点:基于检索结果生成最终答案
工作原理:
1. 获取相关文档和原始问题
2. 基于上下文生成准确答案
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
messages = state["messages"]
# 获取原始问题(可能是重写前的)
question = state.get("original_question")
if not question:
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
question = msg.content
break
# 获取检索到的文档
retrieved_docs = None
for msg in reversed(messages):
if hasattr(msg, "name") and msg.name == "retriever_tool":
retrieved_docs = msg.content
break
prompt = f"""基于以下文档回答用户问题。
问题:{question}
相关文档:
{retrieved_docs}
要求:
1. 答案必须基于文档内容
2. 如果文档信息不足,诚实说明
3. 引用文档中的具体内容
请回答:"""
response = llm.invoke(prompt)
return {"messages": [response]}
完整实现
以下是使用 LangGraph 构建完整 Agentic RAG 系统的代码:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pydantic import BaseModel, Field
import operator
# ============ 1. 文档预处理 ============
def load_and_split_documents(urls: list[str]) -> list:
"""
加载并分割文档
参数:
- urls: 文档 URL 列表
返回分割后的文档块列表
"""
# 加载文档
loader = WebBaseLoader(urls)
docs = loader.load()
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
split_docs = text_splitter.split_documents(docs)
return split_docs
# ============ 2. 创建向量存储和检索工具 ============
def create_retriever_tool(split_docs: list) -> tuple:
"""
创建检索工具
参数:
- split_docs: 分割后的文档块
返回:(检索工具函数, 向量存储)
"""
# 创建嵌入模型和向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=split_docs,
embedding=embeddings
)
@tool
def retriever_tool(query: str) -> str:
"""从知识库中检索相关文档
参数:
- query: 搜索查询
"""
docs = vectorstore.similarity_search(query, k=3)
if not docs:
return "未找到相关文档"
result = []
for i, doc in enumerate(docs):
content = doc.page_content
source = doc.metadata.get("source", "未知来源")
result.append(f"[来源: {source}]\n{content}")
return "\n---\n".join(result)
return retriever_tool, vectorstore
# ============ 3. 定义状态 ============
class AgentState(TypedDict):
"""Agent 状态定义"""
messages: Annotated[list, add_messages]
original_question: str # 保存原始问题
# ============ 4. 定义节点函数 ============
def generate_query_or_respond(state: AgentState) -> dict:
"""决策节点:判断是否需要检索"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools([retriever_tool])
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
# 文档相关性评估的结构
class GradeDocuments(BaseModel):
score: str = Field(description="'relevant' 或 'not_relevant'")
def grade_documents(state: AgentState) -> str:
"""文档评估节点:判断检索结果是否相关"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
messages = state["messages"]
# 获取检索结果(最后一个 ToolMessage)
tool_messages = [m for m in messages if isinstance(m, ToolMessage)]
if not tool_messages:
return "generate_answer"
last_tool_result = tool_messages[-1].content
# 获取原始问题
question = None
for msg in messages:
if isinstance(msg, HumanMessage):
question = msg.content
break
if not question:
return "generate_answer"
# 评估相关性
structured_llm = llm.with_structured_output(GradeDocuments)
prompt = f"""评估以下文档是否与问题相关:
问题:{question}
文档内容:
{last_tool_result}
如果文档包含回答问题的关键信息,返回 'relevant'。
否则返回 'not_relevant'。只返回一个词。"""
result = structured_llm.invoke(prompt)
return "generate_answer" if result.score == "relevant" else "rewrite_question"
def rewrite_question(state: AgentState) -> dict:
"""查询重写节点:优化检索查询"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
messages = state["messages"]
# 获取原始问题
question = None
original_question = state.get("original_question")
for msg in messages:
if isinstance(msg, HumanMessage):
question = msg.content
break
# 保存原始问题
if not original_question:
original_question = question
# 获取检索结果
tool_messages = [m for m in messages if isinstance(m, ToolMessage)]
last_result = tool_messages[-1].content if tool_messages else ""
prompt = f"""原始问题:{question}
检索结果不理想:
{last_result}
请重写问题以获得更好的检索结果。
要求:
1. 保持原始意图
2. 使用更具体的关键词
3. 消除歧义
只输出重写后的问题:"""
response = llm.invoke(prompt)
# 清除之前的工具消息,保留对话历史
new_messages = [m for m in messages if not isinstance(m, ToolMessage)]
new_messages = [m for m in new_messages if not isinstance(m, AIMessage) or not m.tool_calls]
# 添加新的问题
new_messages.append(HumanMessage(content=response.content))
return {
"messages": new_messages,
"original_question": original_question
}
def generate_answer(state: AgentState) -> dict:
"""答案生成节点:基于检索结果生成最终答案"""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
messages = state["messages"]
original_question = state.get("original_question")
# 确定问题
question = original_question
if not question:
for msg in messages:
if isinstance(msg, HumanMessage):
question = msg.content
break
# 获取检索结果
tool_messages = [m for m in messages if isinstance(m, ToolMessage)]
retrieved_docs = tool_messages[-1].content if tool_messages else ""
prompt = f"""基于检索到的文档回答用户问题。
问题:{question}
相关文档:
{retrieved_docs}
要求:
1. 答案必须基于文档内容
2. 准确引用文档信息
3. 如果信息不足,诚实说明
请回答:"""
response = llm.invoke(prompt)
return {"messages": [response]}
# ============ 5. 构建工作流图 ============
def build_agentic_rag_graph():
"""构建 Agentic RAG 工作流图"""
# 创建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("generate_query_or_respond", generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node("grade_documents", lambda state: {"__next__": grade_documents(state)})
workflow.add_node("rewrite_question", rewrite_question)
workflow.add_node("generate_answer", generate_answer)
# 设置入口
workflow.set_entry_point("generate_query_or_respond")
# 添加条件边:是否需要检索
workflow.add_conditional_edges(
"generate_query_or_respond",
tools_condition,
{
"tools": "retrieve",
"__end__": END
}
)
# 检索后评估文档
workflow.add_edge("retrieve", "grade_documents")
# 根据评估结果路由
workflow.add_conditional_edges(
"grade_documents",
lambda state: state.get("__next__", "generate_answer"),
{
"generate_answer": "generate_answer",
"rewrite_question": "rewrite_question"
}
)
# 重写问题后重新检索
workflow.add_edge("rewrite_question", "generate_query_or_respond")
# 生成答案后结束
workflow.add_edge("generate_answer", END)
return workflow.compile()
# ============ 6. 使用示例 ============
# 准备文档
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-defense/",
]
split_docs = load_and_split_documents(urls)
retriever_tool, vectorstore = create_retriever_tool(split_docs)
# 构建图
app = build_agentic_rag_graph()
# 测试
result = app.invoke({
"messages": [HumanMessage(content="什么是 Task Decomposition?")]
})
print(result["messages"][-1].content)
运行示例
让我们看一个完整的运行示例:
# 示例 1:需要检索的问题
result = app.invoke({
"messages": [HumanMessage(content="请解释一下 Chain of Thought 的概念")]
})
print("=== 答案 ===")
print(result["messages"][-1].content)
# 示例 2:不需要检索的问题
result = app.invoke({
"messages": [HumanMessage(content="你好,你是谁?")]
})
print("=== 答案 ===")
print(result["messages"][-1].content)
# 示例 3:需要多次检索的问题
result = app.invoke({
"messages": [HumanMessage(content="如何保护 LLM 应用免受对抗攻击?")]
})
print("=== 答案 ===")
print(result["messages"][-1].content)
执行过程示例:
用户问题: 什么是 Task Decomposition?
步骤 1 [generate_query_or_respond]:
- 分析问题,判断需要检索
- 调用 retriever_tool
步骤 2 [retrieve]:
- 从向量存储检索相关文档
- 返回 3 个相关文档块
步骤 3 [grade_documents]:
- 评估文档相关性
- 结果: relevant
步骤 4 [generate_answer]:
- 基于检索结果生成答案
- 返回最终答案
高级特性
自适应检索
根据问题复杂度动态调整检索参数:
@tool
def adaptive_retriever(query: str, detail_level: str = "normal") -> str:
"""
自适应检索工具
参数:
- query: 搜索查询
- detail_level: 详细程度 (brief/normal/detailed)
"""
# 根据详细程度调整返回文档数量
k_map = {"brief": 2, "normal": 3, "detailed": 5}
k = k_map.get(detail_level, 3)
docs = vectorstore.similarity_search(query, k=k)
if not docs:
return "未找到相关文档"
result = []
for doc in docs:
result.append(doc.page_content)
return "\n---\n".join(result)
混合检索
结合关键词搜索和语义搜索:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
def create_hybrid_retriever(docs: list):
"""创建混合检索器"""
# 语义检索器
vectorstore = Chroma.from_documents(docs, OpenAIEmbeddings())
semantic_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 关键词检索器
keyword_retriever = BM25Retriever.from_documents(docs)
keyword_retriever.k = 3
# 混合检索器(加权组合)
ensemble_retriever = EnsembleRetriever(
retrievers=[semantic_retriever, keyword_retriever],
weights=[0.6, 0.4] # 语义搜索权重更高
)
return ensemble_retriever
检索缓存
缓存常见查询的检索结果:
from functools import lru_cache
import hashlib
class CachedRetriever:
"""带缓存的检索器"""
def __init__(self, vectorstore, max_cache_size: int = 100):
self.vectorstore = vectorstore
self.cache = {}
self.max_cache_size = max_cache_size
def _get_cache_key(self, query: str) -> str:
"""生成缓存键"""
return hashlib.md5(query.encode()).hexdigest()
def retrieve(self, query: str, k: int = 3) -> list:
"""检索文档(带缓存)"""
cache_key = self._get_cache_key(query)
# 检查缓存
if cache_key in self.cache:
return self.cache[cache_key]
# 执行检索
docs = self.vectorstore.similarity_search(query, k=k)
# 更新缓存(LRU 策略)
if len(self.cache) >= self.max_cache_size:
# 删除最旧的缓存
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = docs
return docs
最佳实践
1. 合理的系统提示
明确告知 Agent 何时应该检索:
SYSTEM_PROMPT = """你是一个基于知识库的智能问答助手。
工作流程:
1. 分析用户问题
2. 判断是否需要从知识库检索信息
3. 如果需要检索,调用 retriever_tool
4. 基于检索结果生成准确回答
检索决策标准:
- 涉及具体概念、定义、方法 → 需要检索
- 一般性问候、闲聊 → 不需要检索
- 常识性问题 → 不需要检索
请始终基于检索到的文档内容回答,不要编造信息。"""
2. 文档预处理
确保文档质量:
def preprocess_documents(docs: list) -> list:
"""文档预处理"""
processed = []
for doc in docs:
# 清理内容
content = doc.page_content.strip()
# 移除过短的内容
if len(content) < 100:
continue
# 移除重复内容
if content in [d.page_content for d in processed]:
continue
processed.append(doc)
return processed
3. 检索质量监控
记录和监控检索效果:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agentic-rag")
def log_retrieval_metrics(question: str, docs: list, was_relevant: bool):
"""记录检索指标"""
logger.info(f"问题: {question[:50]}...")
logger.info(f"检索文档数: {len(docs)}")
logger.info(f"相关性: {'相关' if was_relevant else '不相关'}")
4. 迭代次数限制
防止无限循环:
from langgraph.managed import IsLastStep
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
original_question: str
is_last_step: IsLastStep # 跟踪迭代次数
# 在构建图时设置递归限制
app = workflow.compile()
app.recursion_limit = 10 # 最多 10 次迭代
常见问题
问题 1:检索结果不相关
症状:Agent 多次重写查询但仍无法获得相关结果
解决方案:
- 检查文档索引质量
- 优化查询重写策略
- 增加文档评估的严格程度
问题 2:直接回答了需要检索的问题
症状:简单问题 Agent 总是检索,复杂问题反而不检索
解决方案:
- 优化系统提示中的检索决策标准
- 在提示中添加示例
问题 3:上下文过长
症状:检索了太多文档,超出模型上下文限制
解决方案:
- 限制检索文档数量
- 使用文档摘要而非完整内容
- 实现上下文压缩
小结
Agentic RAG 是传统 RAG 的智能升级:
- Agent 自主决定是否需要检索
- 支持自适应检索策略
- 可以动态调整检索参数
- 具备自我修正能力
结合 LangGraph 的状态管理和条件路由能力,可以构建更智能、更灵活的检索增强生成系统。