向量数据库实践案例
通过完整的实战项目,掌握向量数据库在实际场景中的应用。
案例一:智能文档问答系统(RAG)
构建一个基于向量数据库的检索增强生成(RAG)系统,实现智能文档问答。
系统架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户提问 │────▶│ 嵌入模型 │────▶│ 向量数据库 │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 生成回答 │◀────│ LLM │◀────│ 相关文档 │
└─────────────┘ └─────────────┘ └─────────────┘
完整实现
import os
from typing import List, Dict, Any
from openai import OpenAI
# 向量数据库客户端(以 Qdrant 为例)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
class DocumentQASystem:
"""智能文档问答系统"""
def __init__(self, collection_name: str = "documents"):
self.openai = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
self.qdrant = QdrantClient(host="localhost", port=6333)
self.collection_name = collection_name
self._ensure_collection()
def _ensure_collection(self):
"""确保集合存在"""
if not self.qdrant.collection_exists(self.collection_name):
self.qdrant.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=1536, # OpenAI embedding-3-small 维度
distance=Distance.COSINE
)
)
def load_documents(self, file_path: str) -> List[str]:
"""加载文档并切分"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 按段落切分(实际项目可使用更智能的切分策略)
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
# 如果段落太长,进一步切分
chunks = []
for para in paragraphs:
if len(para) > 1000:
# 按句子切分
sentences = para.split('。')
current_chunk = ""
for sent in sentences:
if len(current_chunk) + len(sent) < 800:
current_chunk += sent + "。"
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sent + "。"
if current_chunk:
chunks.append(current_chunk)
else:
chunks.append(para)
return chunks
def add_document(self, file_path: str, metadata: Dict[str, Any] = None):
"""添加文档到知识库"""
chunks = self.load_documents(file_path)
points = []
for i, chunk in enumerate(chunks):
# 生成嵌入
embedding = self._get_embedding(chunk)
point = PointStruct(
id=f"{metadata.get('doc_id', 'doc')}_{i}",
vector=embedding,
payload={
"content": chunk,
"source": file_path,
**(metadata or {})
}
)
points.append(point)
# 批量插入
self.qdrant.upsert(
collection_name=self.collection_name,
points=points
)
print(f"已添加文档:{file_path},共 {len(chunks)} 个片段")
def _get_embedding(self, text: str) -> List[float]:
"""获取文本嵌入"""
response = self.openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""搜索相关文档"""
query_embedding = self._get_embedding(query)
results = self.qdrant.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=top_k
)
return [
{
"content": result.payload["content"],
"source": result.payload.get("source", ""),
"score": result.score
}
for result in results
]
def answer(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""回答问题"""
# 1. 检索相关文档
relevant_docs = self.search(question, top_k)
# 2. 构建上下文
context = "\n\n".join([
f"[文档 {i+1}] {doc['content']}"
for i, doc in enumerate(relevant_docs)
])
# 3. 构建提示词
prompt = f"""你是一个专业的文档问答助手。请基于以下提供的文档内容回答问题。
如果文档中没有相关信息,请明确说明无法回答。
相关文档内容:
{context}
用户问题:{question}
请提供准确、简洁的回答:"""
# 4. 调用 LLM 生成回答
response = self.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个专业的文档问答助手,基于提供的文档内容回答问题。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=500
)
return {
"question": question,
"answer": response.choices[0].message.content,
"sources": relevant_docs
}
# 使用示例
if __name__ == "__main__":
qa_system = DocumentQASystem()
# 添加文档
qa_system.add_document(
"docs/company_handbook.txt",
metadata={"doc_id": "handbook", "category": "internal"}
)
# 提问
result = qa_system.answer("公司的年假政策是什么?")
print(f"问题:{result['question']}")
print(f"回答:{result['answer']}")
print("\n参考文档:")
for i, source in enumerate(result['sources'], 1):
print(f"{i}. [{source['score']:.4f}] {source['content'][:100]}...")
案例二:语义搜索引擎
构建一个支持语义理解的搜索引擎,比传统关键词搜索更准确。
核心功能
- 语义理解:理解查询意图,而非简单匹配关键词
- 多语言支持:跨语言搜索
- 结果重排序:结合多种因素优化结果
实现代码
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticSearchEngine:
"""语义搜索引擎"""
def __init__(self):
# 使用多语言嵌入模型
self.model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
self.documents = []
self.embeddings = None
def index_documents(self, documents: List[Dict[str, str]]):
"""索引文档"""
self.documents = documents
# 生成嵌入
texts = [doc['title'] + " " + doc['content'] for doc in documents]
self.embeddings = self.model.encode(texts, convert_to_tensor=True)
print(f"已索引 {len(documents)} 篇文档")
def search(self, query: str, top_k: int = 10) -> List[Dict]:
"""语义搜索"""
# 生成查询嵌入
query_embedding = self.model.encode(query, convert_to_tensor=True)
# 计算相似度
similarities = np.dot(self.embeddings, query_embedding)
# 获取 top-k 结果
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
"document": self.documents[idx],
"score": float(similarities[idx])
})
return results
def hybrid_search(self, query: str, keywords: List[str] = None, top_k: int = 10) -> List[Dict]:
"""混合搜索:语义 + 关键词"""
# 语义搜索
semantic_results = self.search(query, top_k=top_k * 2)
# 如果有关键词,进行过滤和加权
if keywords:
for result in semantic_results:
content = result['document']['content'].lower()
title = result['document']['title'].lower()
# 计算关键词匹配度
keyword_score = sum(1 for kw in keywords if kw.lower() in content or kw.lower() in title)
keyword_score /= len(keywords)
# 综合得分:语义得分 * 0.7 + 关键词得分 * 0.3
result['score'] = result['score'] * 0.7 + keyword_score * 0.3
# 重新排序
semantic_results.sort(key=lambda x: x['score'], reverse=True)
return semantic_results[:top_k]
# 使用示例
search_engine = SemanticSearchEngine()
docs = [
{"title": "Python 编程入门", "content": "Python 是一种简单易学的编程语言..."},
{"title": "机器学习基础", "content": "机器学习是人工智能的重要分支..."},
{"title": "深度学习详解", "content": "深度学习使用神经网络进行学习..."}
]
search_engine.index_documents(docs)
# 语义搜索
results = search_engine.search("如何学习人工智能", top_k=3)
for r in results:
print(f"{r['document']['title']}: {r['score']:.4f}")
# 混合搜索
results = search_engine.hybrid_search("AI 学习", keywords=["python", "machine"], top_k=3)
案例三:推荐系统
基于向量相似度的个性化推荐系统。
协同过滤 + 内容推荐
class VectorRecommendationSystem:
"""基于向量的推荐系统"""
def __init__(self):
self.user_vectors = {} # 用户向量
self.item_vectors = {} # 物品向量
self.interactions = [] # 交互记录
def build_user_vectors(self, interactions: List[Dict]):
"""
构建用户向量
基于用户历史行为聚合物品向量
"""
from collections import defaultdict
user_items = defaultdict(list)
for inter in interactions:
user_items[inter['user_id']].append({
'item_id': inter['item_id'],
'rating': inter.get('rating', 1.0)
})
for user_id, items in user_items.items():
# 加权平均物品向量
vectors = []
weights = []
for item in items:
if item['item_id'] in self.item_vectors:
vectors.append(self.item_vectors[item['item_id']])
weights.append(item['rating'])
if vectors:
# 加权平均
user_vector = np.average(vectors, axis=0, weights=weights)
self.user_vectors[user_id] = user_vector / np.linalg.norm(user_vector)
def recommend(self, user_id: str, top_k: int = 10) -> List[Dict]:
"""为用户生成推荐"""
if user_id not in self.user_vectors:
return []
user_vector = self.user_vectors[user_id]
# 计算与所有物品的相似度
scores = []
for item_id, item_vector in self.item_vectors.items():
# 余弦相似度
similarity = np.dot(user_vector, item_vector)
scores.append((item_id, similarity))
# 排序并返回 top-k
scores.sort(key=lambda x: x[1], reverse=True)
return [
{"item_id": item_id, "score": float(score)}
for item_id, score in scores[:top_k]
]
def similar_items(self, item_id: str, top_k: int = 5) -> List[Dict]:
"""查找相似物品"""
if item_id not in self.item_vectors:
return []
target_vector = self.item_vectors[item_id]
scores = []
for other_id, other_vector in self.item_vectors.items():
if other_id != item_id:
similarity = np.dot(target_vector, other_vector)
scores.append((other_id, similarity))
scores.sort(key=lambda x: x[1], reverse=True)
return [
{"item_id": item_id, "score": float(score)}
for item_id, score in scores[:top_k]
]
# 使用示例
rec_sys = VectorRecommendationSystem()
# 假设已有物品向量
rec_sys.item_vectors = {
"item_1": np.random.randn(128),
"item_2": np.random.randn(128),
"item_3": np.random.randn(128),
}
# 构建用户向量
interactions = [
{"user_id": "user_1", "item_id": "item_1", "rating": 5.0},
{"user_id": "user_1", "item_id": "item_2", "rating": 3.0},
]
rec_sys.build_user_vectors(interactions)
# 生成推荐
recommendations = rec_sys.recommend("user_1", top_k=3)
print(recommendations)
案例四:图像相似度搜索
使用向量数据库实现以图搜图功能。
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import base64
import io
class ImageSearchEngine:
"""图像相似度搜索引擎"""
def __init__(self):
self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# 初始化向量数据库(以 Milvus 为例)
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
connections.connect("default", host="localhost", port="19530")
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="filename", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="description", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=512)
]
schema = CollectionSchema(fields, "图像集合")
self.collection = Collection("image_search", schema)
# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200}
}
self.collection.create_index("embedding", index_params)
def get_image_embedding(self, image_path: str) -> List[float]:
"""获取图像嵌入"""
image = Image.open(image_path)
inputs = self.processor(images=image, return_tensors="pt")
with torch.no_grad():
image_features = self.model.get_image_features(**inputs)
# 归一化
embedding = image_features / image_features.norm(dim=-1, keepdim=True)
return embedding.squeeze().tolist()
def add_image(self, image_path: str, description: str = ""):
"""添加图像到索引"""
embedding = self.get_image_embedding(image_path)
data = [
[image_path],
[description],
[embedding]
]
self.collection.insert(data)
def search_by_image(self, image_path: str, top_k: int = 5) -> List[Dict]:
"""以图搜图"""
query_embedding = self.get_image_embedding(image_path)
self.collection.load()
results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=top_k,
output_fields=["filename", "description"]
)
return [
{
"filename": hit.entity.get("filename"),
"description": hit.entity.get("description"),
"score": hit.distance
}
for hit in results[0]
]
def search_by_text(self, text: str, top_k: int = 5) -> List[Dict]:
"""以文搜图"""
inputs = self.processor(text=text, return_tensors="pt")
with torch.no_grad():
text_features = self.model.get_text_features(**inputs)
query_embedding = (text_features / text_features.norm(dim=-1, keepdim=True))
query_embedding = query_embedding.squeeze().tolist()
self.collection.load()
results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=top_k,
output_fields=["filename", "description"]
)
return [
{
"filename": hit.entity.get("filename"),
"description": hit.entity.get("description"),
"score": hit.distance
}
for hit in results[0]
]
案例五:多租户 SaaS 应用
在 SaaS 应用中实现多租户数据隔离。
from qdrant_client.models import Filter, FieldCondition, MatchValue
class MultiTenantVectorStore:
"""多租户向量存储"""
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.collection_name = "saas_documents"
# 确保集合存在
if not self.client.collection_exists(self.collection_name):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
def add_document(self, tenant_id: str, document_id: str, content: str, metadata: Dict = None):
"""为指定租户添加文档"""
# 生成嵌入
embedding = self._get_embedding(content)
# 添加租户 ID 到 payload
payload = {
"content": content,
"tenant_id": tenant_id, # 关键:存储租户 ID
**(metadata or {})
}
self.client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=f"{tenant_id}_{document_id}",
vector=embedding,
payload=payload
)
]
)
def search(self, tenant_id: str, query: str, top_k: int = 5) -> List[Dict]:
"""在租户数据内搜索"""
query_embedding = self._get_embedding(query)
# 关键:使用过滤条件确保只搜索该租户的数据
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="tenant_id",
match=MatchValue(value=tenant_id)
)
]
),
limit=top_k
)
return [
{
"id": result.id,
"content": result.payload["content"],
"score": result.score
}
for result in results
]
def delete_tenant_data(self, tenant_id: str):
"""删除租户的所有数据"""
self.client.delete(
collection_name=self.collection_name,
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="tenant_id",
match=MatchValue(value=tenant_id)
)
]
)
)
)
# 使用示例
store = MultiTenantVectorStore()
# 租户 A 添加文档
store.add_document("tenant_a", "doc_1", "这是租户A的文档内容")
# 租户 B 添加文档
store.add_document("tenant_b", "doc_1", "这是租户B的文档内容")
# 租户 A 只能搜到自己的文档
results = store.search("tenant_a", "文档")
print(results) # 只返回租户 A 的文档
最佳实践总结
1. 文档切分策略
def smart_chunk(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""智能文档切分"""
chunks = []
start = 0
while start < len(text):
# 找到 chunk_size 附近的句子边界
end = start + chunk_size
if end < len(text):
# 向后查找句子结束符
while end < len(text) and text[end] not in '。!?\n':
end += 1
end += 1
chunks.append(text[start:end].strip())
start = end - overlap # 重叠部分
return chunks
2. 嵌入模型选择
| 场景 | 推荐模型 | 维度 |
|---|---|---|
| 英文通用 | text-embedding-3-small | 1536 |
| 英文高精度 | text-embedding-3-large | 3072 |
| 中文 | BAAI/bge-large-zh | 1024 |
| 多语言 | paraphrase-multilingual-MiniLM | 384 |
| 轻量级 | all-MiniLM-L6-v2 | 384 |
3. 性能优化建议
- 批量操作:使用批量插入而非单条插入
- 索引选择:根据数据规模选择合适的索引类型
- 预过滤:先过滤再向量搜索,减少计算量
- 缓存:缓存热门查询结果
- 异步处理:嵌入生成使用异步处理
4. 数据安全
- 访问控制:实施严格的 API 访问控制
- 数据加密:敏感数据加密存储
- 审计日志:记录所有数据访问操作
- 租户隔离:多租户场景确保数据隔离