跳到主要内容

RAG 高级技术

随着 RAG 技术的发展,出现了许多增强和优化方案。本章将介绍混合检索增强、多模态 RAG、Graph RAG、自适应 RAG 等前沿技术,帮助你构建更强大的 RAG 系统。

混合检索增强

稀疏 + 密集混合

结合 BM25(稀疏)和向量检索(密集)的优势:

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from rank_bm25 import BM25Okapi

class HybridRetriever:
def __init__(self, vectorstore, documents, alpha=0.5):
"""
alpha: 向量检索权重 (0-1)
0 = 纯 BM25
1 = 纯向量检索
"""
self.vectorstore = vectorstore
self.bm25 = BM25Okapi([doc.page_content.split() for doc in documents])
self.documents = documents
self.alpha = alpha

def retrieve(self, query, k=10):
# 向量检索
vector_results = self.vectorstore.similarity_search_with_score(query, k=k*2)
vector_scores = {doc.metadata["id"]: 1/(1+score) for doc, score in vector_results}

# BM25 检索
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_results = {self.documents[i].metadata["id"]: score
for i, score in enumerate(bm25_scores)}

# 归一化
vector_scores = self._normalize(vector_scores)
bm25_results = self._normalize(bm25_results)

# 合并分数
all_ids = set(vector_scores.keys()) | set(bm25_results.keys())
combined = {}
for doc_id in all_ids:
v_score = vector_scores.get(doc_id, 0)
b_score = bm25_results.get(doc_id, 0)
combined[doc_id] = self.alpha * v_score + (1 - self.alpha) * b_score

# 排序返回
sorted_ids = sorted(combined.keys(), key=lambda x: combined[x], reverse=True)
return [self._get_doc_by_id(doc_id) for doc_id in sorted_ids[:k]]

def _normalize(self, scores):
if not scores:
return scores
max_score = max(scores.values())
min_score = min(scores.values())
if max_score == min_score:
return {k: 1.0 for k in scores}
return {k: (v - min_score) / (max_score - min_score) for k, v in scores.items()}

Colbert 晚期交互

ColBERT 使用细粒度的词级别交互,比整文档向量更精确:

# ColBERT 风格的检索
# 每个词一个向量,查询时计算每个词的最佳匹配

from ragatouille import RAGPretrainedModel

# 加载 ColBERT 模型
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")

# 索引文档
RAG.index(
collection=[doc.page_content for doc in documents],
document_ids=[doc.metadata["id"] for doc in documents],
index_name="my_index"
)

# 检索
results = RAG.search(query="如何提高团队效率", k=5)

多模态 RAG

处理文本、图像、表格等多种类型的数据。

图像检索

from PIL import Image
from transformers import CLIPProcessor, CLIPModel

class MultiModalRAG:
def __init__(self):
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.text_embeddings = []
self.image_embeddings = []

def add_image(self, image_path, text_description):
"""添加图像和描述"""
image = Image.open(image_path)

# 编码图像
inputs = self.clip_processor(images=image, return_tensors="pt")
image_embedding = self.clip_model.get_image_features(**inputs)

# 编码文本
text_inputs = self.clip_processor(text=[text_description], return_tensors="pt")
text_embedding = self.clip_model.get_text_features(**text_inputs)

self.image_embeddings.append(image_embedding)
self.text_embeddings.append({
"embedding": text_embedding,
"description": text_description,
"image_path": image_path
})

def search(self, query, k=5):
"""搜索相关图像"""
inputs = self.clip_processor(text=[query], return_tensors="pt")
query_embedding = self.clip_model.get_text_features(**inputs)

# 计算相似度
similarities = []
for item in self.text_embeddings:
sim = cosine_similarity(query_embedding, item["embedding"])
similarities.append((item, sim))

# 排序返回
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:k]

PDF 表格提取

import pdfplumber

def extract_tables_from_pdf(pdf_path):
"""提取 PDF 中的表格"""
tables = []

with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
for table in page.extract_tables():
# 转换为文本表示
table_text = format_table_as_text(table)
tables.append({
"content": table_text,
"page": page_num + 1,
"type": "table"
})

return tables

def format_table_as_text(table):
"""将表格格式化为文本"""
if not table:
return ""

# 获取表头
headers = table[0]
rows = table[1:]

# 格式化
lines = []
lines.append("| " + " | ".join(str(h or "") for h in headers) + " |")
lines.append("| " + " | ".join(["---"] * len(headers)) + " |")

for row in rows:
lines.append("| " + " | ".join(str(c or "") for c in row) + " |")

return "\n".join(lines)

Graph RAG

利用知识图谱增强 RAG,处理实体关系查询。

基本概念

传统 RAG:
文档 → 分块 → 向量化 → 检索

Graph RAG:
文档 → 实体抽取 → 关系构建 → 图存储 → 图遍历 + 向量检索

实现

from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer

class GraphRAG:
def __init__(self, neo4j_url, neo4j_user, neo4j_password, llm):
self.graph = Neo4jGraph(
url=neo4j_url,
username=neo4j_user,
password=neo4j_password
)
self.llm = llm
self.graph_transformer = LLMGraphTransformer(llm=llm)

def add_document(self, document):
"""添加文档到图数据库"""
# 提取实体和关系
graph_documents = self.graph_transformer.convert_to_graph_documents([document])

# 存储到 Neo4j
self.graph.add_graph_documents(graph_documents)

def search(self, query, k=5):
"""图 + 向量混合检索"""
# 1. 从查询中提取实体
entities = self.extract_entities(query)

# 2. 图遍历找相关节点
graph_results = self.traverse_graph(entities)

# 3. 向量检索
vector_results = self.vector_search(query, k)

# 4. 合并结果
return self.merge_results(graph_results, vector_results)

def extract_entities(self, text):
"""从文本提取实体"""
prompt = f"从以下文本中提取实体名称:{text}"
response = self.llm.invoke(prompt)
return response.content.split(",")

def traverse_graph(self, entities):
"""图遍历查询"""
query = """
MATCH (e:Entity)-[r]-(related)
WHERE e.name IN $entities
RETURN e.name as entity, type(r) as relation,
related.name as related_entity, related.content as content
LIMIT 10
"""
return self.graph.query(query, {"entities": entities})

Graph RAG 适用场景

  • 需要理解实体关系的问答
  • 多跳推理问题
  • 知识探索和发现
查询: "马斯克创立的公司有哪些员工跳槽到了 OpenAI?"

传统 RAG: 难以回答(需要多次查询)

Graph RAG:
1. 找到实体 "马斯克"、"OpenAI"
2. 遍历关系:马斯克 - 创立 -> 公司 - 员工 -> 员工 -> OpenAI
3. 返回结果

自适应 RAG

根据问题类型动态选择检索策略。

问题分类

from enum import Enum

class QueryType(Enum):
FACTUAL = "factual" # 事实查询
ANALYTICAL = "analytical" # 分析推理
CONVERSATIONAL = "conversational" # 对话式
PROCEDURAL = "procedural" # 步骤流程
UNKNOWN = "unknown"

class AdaptiveRAG:
def __init__(self, llm, vectorstore):
self.llm = llm
self.vectorstore = vectorstore

# 不同类型使用不同的检索策略
self.strategies = {
QueryType.FACTUAL: self.factual_retrieval,
QueryType.ANALYTICAL: self.analytical_retrieval,
QueryType.CONVERSATIONAL: self.conversational_retrieval,
QueryType.PROCEDURAL: self.procedural_retrieval,
}

def classify_query(self, query):
"""分类查询类型"""
prompt = f"""
分析以下问题的类型,返回最匹配的类型:
- factual: 事实查询,需要具体信息
- analytical: 分析推理,需要深度理解
- conversational: 对话式,上下文相关
- procedural: 步骤流程,需要操作指南

问题: {query}

类型:
"""
response = self.llm.invoke(prompt)
return QueryType(response.content.strip().lower())

def retrieve(self, query):
"""自适应检索"""
query_type = self.classify_query(query)
strategy = self.strategies.get(query_type, self.default_retrieval)
return strategy(query)

def factual_retrieval(self, query):
"""事实查询:精确检索"""
return self.vectorstore.similarity_search(query, k=3)

def analytical_retrieval(self, query):
"""分析查询:多角度检索"""
# 生成多个查询变体
variants = self.generate_query_variants(query)
results = []
for variant in variants:
results.extend(self.vectorstore.similarity_search(variant, k=3))
return self.deduplicate(results)[:10]

def conversational_retrieval(self, query):
"""对话检索:考虑历史"""
# 使用历史感知检索
pass

def procedural_retrieval(self, query):
"""流程查询:顺序检索"""
# 检索完整流程文档
return self.vectorstore.similarity_search(query, k=5)

假设性文档嵌入(HyDE)

先生成假设性答案文档,再检索相似文档。

class HyDERetriever:
def __init__(self, llm, vectorstore):
self.llm = llm
self.vectorstore = vectorstore

def retrieve(self, query, k=5):
"""HyDE 检索"""
# 1. 生成假设性文档
hypothetical_doc = self.generate_hypothetical_document(query)

# 2. 用假设文档检索
results = self.vectorstore.similarity_search(hypothetical_doc, k=k)

return results

def generate_hypothetical_document(self, query):
"""生成假设性文档"""
prompt = f"""
请为以下问题写一个详细的回答,假设你有完整的知识:

问题: {query}

回答:
"""
response = self.llm.invoke(prompt)
return response.content

迭代检索

多轮检索,逐步细化结果。

class IterativeRetriever:
def __init__(self, llm, vectorstore, max_iterations=3):
self.llm = llm
self.vectorstore = vectorstore
self.max_iterations = max_iterations

def retrieve(self, query):
"""迭代检索"""
all_results = []
current_query = query

for i in range(self.max_iterations):
# 检索
results = self.vectorstore.similarity_search(current_query, k=5)
all_results.extend(results)

# 检查是否需要继续
if self.is_sufficient(results, query):
break

# 生成新查询
current_query = self.refine_query(query, results)

return self.deduplicate(all_results)

def is_sufficient(self, results, query):
"""判断结果是否足够"""
# 用 LLM 判断
prompt = f"""
问题: {query}

检索结果摘要: {[r.page_content[:100] for r in results]}

这些结果是否足以回答问题?回答 yes 或 no。
"""
response = self.llm.invoke(prompt)
return "yes" in response.content.lower()

def refine_query(self, original_query, results):
"""细化查询"""
prompt = f"""
原问题: {original_query}

已检索到的信息不足。请生成一个更具体的查询,
以获取缺失的信息。

新查询:
"""
response = self.llm.invoke(prompt)
return response.content

RAG 与 Agent 结合

让 RAG 系统能够执行工具调用和多步推理。

from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.tools import Tool

def create_rag_agent(llm, vectorstore):
"""创建 RAG Agent"""

# 定义检索工具
def search_documents(query: str) -> str:
"""搜索知识库"""
results = vectorstore.similarity_search(query, k=3)
return "\n\n".join([r.page_content for r in results])

tools = [
Tool(
name="search_knowledge_base",
func=search_documents,
description="搜索内部知识库,输入查询内容,返回相关文档"
)
]

# 创建 Agent
agent = create_openai_tools_agent(llm, tools)
agent_executor = AgentExecutor(agent=agent, tools=tools)

return agent_executor

# 使用
agent = create_rag_agent(llm, vectorstore)
response = agent.invoke({"input": "公司的年假制度是怎样的?如何申请?"})

小结

高级 RAG 技术从多个维度增强检索能力:

技术解决的问题适用场景
混合检索增强提高召回率通用场景
多模态 RAG处理非文本数据文档含图表
Graph RAG实体关系查询知识探索
自适应 RAG问题类型多样通用场景
HyDE查询与文档语言差异大语义鸿沟
迭代检索复杂问题需要多步推理

下一步

参考资料