RAG 高级技术
随着 RAG 技术的发展,出现了许多增强和优化方案。本章将介绍混合检索增强、多模态 RAG、Graph RAG、自适应 RAG 等前沿技术,帮助你构建更强大的 RAG 系统。
混合检索增强
稀疏 + 密集混合
结合 BM25(稀疏)和向量检索(密集)的优势:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from rank_bm25 import BM25Okapi
class HybridRetriever:
def __init__(self, vectorstore, documents, alpha=0.5):
"""
alpha: 向量检索权重 (0-1)
0 = 纯 BM25
1 = 纯向量检索
"""
self.vectorstore = vectorstore
self.bm25 = BM25Okapi([doc.page_content.split() for doc in documents])
self.documents = documents
self.alpha = alpha
def retrieve(self, query, k=10):
# 向量检索
vector_results = self.vectorstore.similarity_search_with_score(query, k=k*2)
vector_scores = {doc.metadata["id"]: 1/(1+score) for doc, score in vector_results}
# BM25 检索
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_results = {self.documents[i].metadata["id"]: score
for i, score in enumerate(bm25_scores)}
# 归一化
vector_scores = self._normalize(vector_scores)
bm25_results = self._normalize(bm25_results)
# 合并分数
all_ids = set(vector_scores.keys()) | set(bm25_results.keys())
combined = {}
for doc_id in all_ids:
v_score = vector_scores.get(doc_id, 0)
b_score = bm25_results.get(doc_id, 0)
combined[doc_id] = self.alpha * v_score + (1 - self.alpha) * b_score
# 排序返回
sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
return [self._get_doc_by_id(doc_id) for doc_id in sorted_ids[:k]]
def _normalize(self, scores):
if not scores:
return scores
max_score = max(scores.values())
min_score = min(scores.values())
if max_score == min_score:
return {k: 1.0 for k in scores}
return {k: (v - min_score) / (max_score - min_score) for k, v in scores.items()}
Colbert 晚期交互
ColBERT 使用细粒度的词级别交互,比整文档向量更精确:
# ColBERT 风格的检索
# 每个词一个向量,查询时计算每个词的最佳匹配
from ragatouille import RAGPretrainedModel
# 加载 ColBERT 模型
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
# 索引文档
RAG.index(
collection=[doc.page_content for doc in documents],
document_ids=[doc.metadata["id"] for doc in documents],
index_name="my_index"
)
# 检索
results = RAG.search(query="如何提高团队效率", k=5)
多模态 RAG
处理文本、图像、表格等多种类型的数据。
图像检索
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
class MultiModalRAG:
def __init__(self):
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.text_embeddings = []
self.image_embeddings = []
def add_image(self, image_path, text_description):
"""添加图像和描述"""
image = Image.open(image_path)
# 编码图像
inputs = self.clip_processor(images=image, return_tensors="pt")
image_embedding = self.clip_model.get_image_features(**inputs)
# 编码文本
text_inputs = self.clip_processor(text=[text_description], return_tensors="pt")
text_embedding = self.clip_model.get_text_features(**text_inputs)
self.image_embeddings.append(image_embedding)
self.text_embeddings.append({
"embedding": text_embedding,
"description": text_description,
"image_path": image_path
})
def search(self, query, k=5):
"""搜索相关图像"""
inputs = self.clip_processor(text=[query], return_tensors="pt")
query_embedding = self.clip_model.get_text_features(**inputs)
# 计算相似度
similarities = []
for item in self.text_embeddings:
sim = cosine_similarity(query_embedding, item["embedding"])
similarities.append((item, sim))
# 排序返回
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:k]
PDF 表格提取
import pdfplumber
def extract_tables_from_pdf(pdf_path):
"""提取 PDF 中的表格"""
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
for table in page.extract_tables():
# 转换为文本表示
table_text = format_table_as_text(table)
tables.append({
"content": table_text,
"page": page_num + 1,
"type": "table"
})
return tables
def format_table_as_text(table):
"""将表格格式化为文本"""
if not table:
return ""
# 获取表头
headers = table[0]
rows = table[1:]
# 格式化
lines = []
lines.append("| " + " | ".join(str(h or "") for h in headers) + " |")
lines.append("| " + " | ".join(["---"] * len(headers)) + " |")
for row in rows:
lines.append("| " + " | ".join(str(c or "") for c in row) + " |")
return "\n".join(lines)
Graph RAG
利用知识图谱增强 RAG,处理实体关系查询。
基本概念
传统 RAG:
文档 → 分块 → 向量化 → 检索
Graph RAG:
文档 → 实体抽取 → 关系构建 → 图存储 → 图遍历 + 向量检索
实现
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
class GraphRAG:
def __init__(self, neo4j_url, neo4j_user, neo4j_password, llm):
self.graph = Neo4jGraph(
url=neo4j_url,
username=neo4j_user,
password=neo4j_password
)
self.llm = llm
self.graph_transformer = LLMGraphTransformer(llm=llm)
def add_document(self, document):
"""添加文档到图数据库"""
# 提取实体和关系
graph_documents = self.graph_transformer.convert_to_graph_documents([document])
# 存储到 Neo4j
self.graph.add_graph_documents(graph_documents)
def search(self, query, k=5):
"""图 + 向量混合检索"""
# 1. 从查询中提取实体
entities = self.extract_entities(query)
# 2. 图遍历找相关节点
graph_results = self.traverse_graph(entities)
# 3. 向量检索
vector_results = self.vector_search(query, k)
# 4. 合并结果
return self.merge_results(graph_results, vector_results)
def extract_entities(self, text):
"""从文本提取实体"""
prompt = f"从以下文本中提取实体名称:{text}"
response = self.llm.invoke(prompt)
return response.content.split(",")
def traverse_graph(self, entities):
"""图遍历查询"""
query = """
MATCH (e:Entity)-[r]-(related)
WHERE e.name IN $entities
RETURN e.name as entity, type(r) as relation,
related.name as related_entity, related.content as content
LIMIT 10
"""
return self.graph.query(query, {"entities": entities})
Graph RAG 适用场景
- 需要理解实体关系的问答
- 多跳推理问题
- 知识探索和发现
查询: "马斯克创立的公司有哪些员工跳槽到了 OpenAI?"
传统 RAG: 难以回答(需要多次查询)
Graph RAG:
1. 找到实体 "马斯克"、"OpenAI"
2. 遍历关系:马斯克 - 创立 -> 公司 - 员工 -> 员工 -> OpenAI
3. 返回结果
自适应 RAG
根据问题类型动态选择检索策略。
问题分类
from enum import Enum
class QueryType(Enum):
FACTUAL = "factual" # 事实查询
ANALYTICAL = "analytical" # 分析推理
CONVERSATIONAL = "conversational" # 对话式
PROCEDURAL = "procedural" # 步骤流程
UNKNOWN = "unknown"
class AdaptiveRAG:
def __init__(self, llm, vectorstore):
self.llm = llm
self.vectorstore = vectorstore
# 不同类型使用不同的检索策略
self.strategies = {
QueryType.FACTUAL: self.factual_retrieval,
QueryType.ANALYTICAL: self.analytical_retrieval,
QueryType.CONVERSATIONAL: self.conversational_retrieval,
QueryType.PROCEDURAL: self.procedural_retrieval,
}
def classify_query(self, query):
"""分类查询类型"""
prompt = f"""
分析以下问题的类型,返回最匹配的类型:
- factual: 事实查询,需要具体信息
- analytical: 分析推理,需要深度理解
- conversational: 对话式,上下文相关
- procedural: 步骤流程,需要操作指南
问题: {query}
类型:
"""
response = self.llm.invoke(prompt)
return QueryType(response.content.strip().lower())
def retrieve(self, query):
"""自适应检索"""
query_type = self.classify_query(query)
strategy = self.strategies.get(query_type, self.default_retrieval)
return strategy(query)
def factual_retrieval(self, query):
"""事实查询:精确检索"""
return self.vectorstore.similarity_search(query, k=3)
def analytical_retrieval(self, query):
"""分析查询:多角度检索"""
# 生成多个查询变体
variants = self.generate_query_variants(query)
results = []
for variant in variants:
results.extend(self.vectorstore.similarity_search(variant, k=3))
return self.deduplicate(results)[:10]
def conversational_retrieval(self, query):
"""对话检索:考虑历史"""
# 使用历史感知检索
pass
def procedural_retrieval(self, query):
"""流程查询:顺序检索"""
# 检索完整流程文档
return self.vectorstore.similarity_search(query, k=5)
假设性文档嵌入(HyDE)
先生成假设性答案文档,再检索相似文档。
class HyDERetriever:
def __init__(self, llm, vectorstore):
self.llm = llm
self.vectorstore = vectorstore
def retrieve(self, query, k=5):
"""HyDE 检索"""
# 1. 生成假设性文档
hypothetical_doc = self.generate_hypothetical_document(query)
# 2. 用假设文档检索
results = self.vectorstore.similarity_search(hypothetical_doc, k=k)
return results
def generate_hypothetical_document(self, query):
"""生成假设性文档"""
prompt = f"""
请为以下问题写一个详细的回答,假设你有完整的知识:
问题: {query}
回答:
"""
response = self.llm.invoke(prompt)
return response.content
迭代检索
多轮检索,逐步细化结果。
class IterativeRetriever:
def __init__(self, llm, vectorstore, max_iterations=3):
self.llm = llm
self.vectorstore = vectorstore
self.max_iterations = max_iterations
def retrieve(self, query):
"""迭代检索"""
all_results = []
current_query = query
for i in range(self.max_iterations):
# 检索
results = self.vectorstore.similarity_search(current_query, k=5)
all_results.extend(results)
# 检查是否需要继续
if self.is_sufficient(results, query):
break
# 生成新查询
current_query = self.refine_query(query, results)
return self.deduplicate(all_results)
def is_sufficient(self, results, query):
"""判断结果是否足够"""
# 用 LLM 判断
prompt = f"""
问题: {query}
检索结果摘要: {[r.page_content[:100] for r in results]}
这些结果是否足以回答问题?回答 yes 或 no。
"""
response = self.llm.invoke(prompt)
return "yes" in response.content.lower()
def refine_query(self, original_query, results):
"""细化查询"""
prompt = f"""
原问题: {original_query}
已检索到的信息不足。请生成一个更具体的查询,
以获取缺失的信息。
新查询:
"""
response = self.llm.invoke(prompt)
return response.content
RAG 与 Agent 结合
让 RAG 系统能够执行工具调用和多步推理。
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.tools import Tool
def create_rag_agent(llm, vectorstore):
"""创建 RAG Agent"""
# 定义检索工具
def search_documents(query: str) -> str:
"""搜索知识库"""
results = vectorstore.similarity_search(query, k=3)
return "\n\n".join([r.page_content for r in results])
tools = [
Tool(
name="search_knowledge_base",
func=search_documents,
description="搜索内部知识库,输入查询内容,返回相关文档"
)
]
# 创建 Agent
agent = create_openai_tools_agent(llm, tools)
agent_executor = AgentExecutor(agent=agent, tools=tools)
return agent_executor
# 使用
agent = create_rag_agent(llm, vectorstore)
response = agent.invoke({"input": "公司的年假制度是怎样的?如何申请?"})
小结
高级 RAG 技术从多个维度增强检索能力:
| 技术 | 解决的问题 | 适用场景 |
|---|---|---|
| 混合检索增强 | 提高召回率 | 通用场景 |
| 多模态 RAG | 处理非文本数据 | 文档含图表 |
| Graph RAG | 实体关系查询 | 知识探索 |
| 自适应 RAG | 问题类型多样 | 通用场景 |
| HyDE | 查询与文档语言差异大 | 语义鸿沟 |
| 迭代检索 | 复杂问题 | 需要多步推理 |