跳到主要内容

向量数据库实践案例

通过完整的实战项目,掌握向量数据库在实际场景中的应用。

案例一:智能文档问答系统(RAG)

构建一个基于向量数据库的检索增强生成(RAG)系统,实现智能文档问答。

系统架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ 用户提问 │────▶│ 嵌入模型 │────▶│ 向量数据库 │
└─────────────┘ └─────────────┘ └──────┬──────┘


┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 生成回答 │◀────│ LLM │◀────│ 相关文档 │
└─────────────┘ └─────────────┘ └─────────────┘

完整实现

import os
from typing import List, Dict, Any
from openai import OpenAI

# 向量数据库客户端(以 Qdrant 为例)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

class DocumentQASystem:
"""智能文档问答系统"""

def __init__(self, collection_name: str = "documents"):
self.openai = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
self.qdrant = QdrantClient(host="localhost", port=6333)
self.collection_name = collection_name
self._ensure_collection()

def _ensure_collection(self):
"""确保集合存在"""
if not self.qdrant.collection_exists(self.collection_name):
self.qdrant.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=1536, # OpenAI embedding-3-small 维度
distance=Distance.COSINE
)
)

def load_documents(self, file_path: str) -> List[str]:
"""加载文档并切分"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()

# 按段落切分(实际项目可使用更智能的切分策略)
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]

# 如果段落太长,进一步切分
chunks = []
for para in paragraphs:
if len(para) > 1000:
# 按句子切分
sentences = para.split('。')
current_chunk = ""
for sent in sentences:
if len(current_chunk) + len(sent) < 800:
current_chunk += sent + "。"
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sent + "。"
if current_chunk:
chunks.append(current_chunk)
else:
chunks.append(para)

return chunks

def add_document(self, file_path: str, metadata: Dict[str, Any] = None):
"""添加文档到知识库"""
chunks = self.load_documents(file_path)

points = []
for i, chunk in enumerate(chunks):
# 生成嵌入
embedding = self._get_embedding(chunk)

point = PointStruct(
id=f"{metadata.get('doc_id', 'doc')}_{i}",
vector=embedding,
payload={
"content": chunk,
"source": file_path,
**(metadata or {})
}
)
points.append(point)

# 批量插入
self.qdrant.upsert(
collection_name=self.collection_name,
points=points
)

print(f"已添加文档:{file_path},共 {len(chunks)} 个片段")

def _get_embedding(self, text: str) -> List[float]:
"""获取文本嵌入"""
response = self.openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding

def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""搜索相关文档"""
query_embedding = self._get_embedding(query)

results = self.qdrant.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=top_k
)

return [
{
"content": result.payload["content"],
"source": result.payload.get("source", ""),
"score": result.score
}
for result in results
]

def answer(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""回答问题"""
# 1. 检索相关文档
relevant_docs = self.search(question, top_k)

# 2. 构建上下文
context = "\n\n".join([
f"[文档 {i+1}] {doc['content']}"
for i, doc in enumerate(relevant_docs)
])

# 3. 构建提示词
prompt = f"""你是一个专业的文档问答助手。请基于以下提供的文档内容回答问题。
如果文档中没有相关信息,请明确说明无法回答。

相关文档内容:
{context}

用户问题:{question}

请提供准确、简洁的回答:"""

# 4. 调用 LLM 生成回答
response = self.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个专业的文档问答助手,基于提供的文档内容回答问题。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=500
)

return {
"question": question,
"answer": response.choices[0].message.content,
"sources": relevant_docs
}

# 使用示例
if __name__ == "__main__":
qa_system = DocumentQASystem()

# 添加文档
qa_system.add_document(
"docs/company_handbook.txt",
metadata={"doc_id": "handbook", "category": "internal"}
)

# 提问
result = qa_system.answer("公司的年假政策是什么?")
print(f"问题:{result['question']}")
print(f"回答:{result['answer']}")
print("\n参考文档:")
for i, source in enumerate(result['sources'], 1):
print(f"{i}. [{source['score']:.4f}] {source['content'][:100]}...")

案例二:语义搜索引擎

构建一个支持语义理解的搜索引擎,比传统关键词搜索更准确。

核心功能

  • 语义理解:理解查询意图,而非简单匹配关键词
  • 多语言支持:跨语言搜索
  • 结果重排序:结合多种因素优化结果

实现代码

from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticSearchEngine:
"""语义搜索引擎"""

def __init__(self):
# 使用多语言嵌入模型
self.model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
self.documents = []
self.embeddings = None

def index_documents(self, documents: List[Dict[str, str]]):
"""索引文档"""
self.documents = documents

# 生成嵌入
texts = [doc['title'] + " " + doc['content'] for doc in documents]
self.embeddings = self.model.encode(texts, convert_to_tensor=True)

print(f"已索引 {len(documents)} 篇文档")

def search(self, query: str, top_k: int = 10) -> List[Dict]:
"""语义搜索"""
# 生成查询嵌入
query_embedding = self.model.encode(query, convert_to_tensor=True)

# 计算相似度
similarities = np.dot(self.embeddings, query_embedding)

# 获取 top-k 结果
top_indices = np.argsort(similarities)[::-1][:top_k]

results = []
for idx in top_indices:
results.append({
"document": self.documents[idx],
"score": float(similarities[idx])
})

return results

def hybrid_search(self, query: str, keywords: List[str] = None, top_k: int = 10) -> List[Dict]:
"""混合搜索:语义 + 关键词"""
# 语义搜索
semantic_results = self.search(query, top_k=top_k * 2)

# 如果有关键词,进行过滤和加权
if keywords:
for result in semantic_results:
content = result['document']['content'].lower()
title = result['document']['title'].lower()

# 计算关键词匹配度
keyword_score = sum(1 for kw in keywords if kw.lower() in content or kw.lower() in title)
keyword_score /= len(keywords)

# 综合得分:语义得分 * 0.7 + 关键词得分 * 0.3
result['score'] = result['score'] * 0.7 + keyword_score * 0.3

# 重新排序
semantic_results.sort(key=lambda x: x['score'], reverse=True)

return semantic_results[:top_k]

# 使用示例
search_engine = SemanticSearchEngine()

docs = [
{"title": "Python 编程入门", "content": "Python 是一种简单易学的编程语言..."},
{"title": "机器学习基础", "content": "机器学习是人工智能的重要分支..."},
{"title": "深度学习详解", "content": "深度学习使用神经网络进行学习..."}
]

search_engine.index_documents(docs)

# 语义搜索
results = search_engine.search("如何学习人工智能", top_k=3)
for r in results:
print(f"{r['document']['title']}: {r['score']:.4f}")

# 混合搜索
results = search_engine.hybrid_search("AI 学习", keywords=["python", "machine"], top_k=3)

案例三:推荐系统

基于向量相似度的个性化推荐系统。

协同过滤 + 内容推荐

class VectorRecommendationSystem:
"""基于向量的推荐系统"""

def __init__(self):
self.user_vectors = {} # 用户向量
self.item_vectors = {} # 物品向量
self.interactions = [] # 交互记录

def build_user_vectors(self, interactions: List[Dict]):
"""
构建用户向量
基于用户历史行为聚合物品向量
"""
from collections import defaultdict

user_items = defaultdict(list)
for inter in interactions:
user_items[inter['user_id']].append({
'item_id': inter['item_id'],
'rating': inter.get('rating', 1.0)
})

for user_id, items in user_items.items():
# 加权平均物品向量
vectors = []
weights = []

for item in items:
if item['item_id'] in self.item_vectors:
vectors.append(self.item_vectors[item['item_id']])
weights.append(item['rating'])

if vectors:
# 加权平均
user_vector = np.average(vectors, axis=0, weights=weights)
self.user_vectors[user_id] = user_vector / np.linalg.norm(user_vector)

def recommend(self, user_id: str, top_k: int = 10) -> List[Dict]:
"""为用户生成推荐"""
if user_id not in self.user_vectors:
return []

user_vector = self.user_vectors[user_id]

# 计算与所有物品的相似度
scores = []
for item_id, item_vector in self.item_vectors.items():
# 余弦相似度
similarity = np.dot(user_vector, item_vector)
scores.append((item_id, similarity))

# 排序并返回 top-k
scores.sort(key=lambda x: x[1], reverse=True)

return [
{"item_id": item_id, "score": float(score)}
for item_id, score in scores[:top_k]
]

def similar_items(self, item_id: str, top_k: int = 5) -> List[Dict]:
"""查找相似物品"""
if item_id not in self.item_vectors:
return []

target_vector = self.item_vectors[item_id]

scores = []
for other_id, other_vector in self.item_vectors.items():
if other_id != item_id:
similarity = np.dot(target_vector, other_vector)
scores.append((other_id, similarity))

scores.sort(key=lambda x: x[1], reverse=True)

return [
{"item_id": item_id, "score": float(score)}
for item_id, score in scores[:top_k]
]

# 使用示例
rec_sys = VectorRecommendationSystem()

# 假设已有物品向量
rec_sys.item_vectors = {
"item_1": np.random.randn(128),
"item_2": np.random.randn(128),
"item_3": np.random.randn(128),
}

# 构建用户向量
interactions = [
{"user_id": "user_1", "item_id": "item_1", "rating": 5.0},
{"user_id": "user_1", "item_id": "item_2", "rating": 3.0},
]
rec_sys.build_user_vectors(interactions)

# 生成推荐
recommendations = rec_sys.recommend("user_1", top_k=3)
print(recommendations)

案例四:图像相似度搜索

使用向量数据库实现以图搜图功能。

from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import base64
import io

class ImageSearchEngine:
"""图像相似度搜索引擎"""

def __init__(self):
self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 初始化向量数据库(以 Milvus 为例)
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

connections.connect("default", host="localhost", port="19530")

fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="filename", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="description", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=512)
]

schema = CollectionSchema(fields, "图像集合")
self.collection = Collection("image_search", schema)

# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200}
}
self.collection.create_index("embedding", index_params)

def get_image_embedding(self, image_path: str) -> List[float]:
"""获取图像嵌入"""
image = Image.open(image_path)
inputs = self.processor(images=image, return_tensors="pt")

with torch.no_grad():
image_features = self.model.get_image_features(**inputs)

# 归一化
embedding = image_features / image_features.norm(dim=-1, keepdim=True)
return embedding.squeeze().tolist()

def add_image(self, image_path: str, description: str = ""):
"""添加图像到索引"""
embedding = self.get_image_embedding(image_path)

data = [
[image_path],
[description],
[embedding]
]

self.collection.insert(data)

def search_by_image(self, image_path: str, top_k: int = 5) -> List[Dict]:
"""以图搜图"""
query_embedding = self.get_image_embedding(image_path)

self.collection.load()

results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=top_k,
output_fields=["filename", "description"]
)

return [
{
"filename": hit.entity.get("filename"),
"description": hit.entity.get("description"),
"score": hit.distance
}
for hit in results[0]
]

def search_by_text(self, text: str, top_k: int = 5) -> List[Dict]:
"""以文搜图"""
inputs = self.processor(text=text, return_tensors="pt")

with torch.no_grad():
text_features = self.model.get_text_features(**inputs)

query_embedding = (text_features / text_features.norm(dim=-1, keepdim=True))
query_embedding = query_embedding.squeeze().tolist()

self.collection.load()

results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=top_k,
output_fields=["filename", "description"]
)

return [
{
"filename": hit.entity.get("filename"),
"description": hit.entity.get("description"),
"score": hit.distance
}
for hit in results[0]
]

案例五:多租户 SaaS 应用

在 SaaS 应用中实现多租户数据隔离。

from qdrant_client.models import Filter, FieldCondition, MatchValue

class MultiTenantVectorStore:
"""多租户向量存储"""

def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.collection_name = "saas_documents"

# 确保集合存在
if not self.client.collection_exists(self.collection_name):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)

def add_document(self, tenant_id: str, document_id: str, content: str, metadata: Dict = None):
"""为指定租户添加文档"""
# 生成嵌入
embedding = self._get_embedding(content)

# 添加租户 ID 到 payload
payload = {
"content": content,
"tenant_id": tenant_id, # 关键:存储租户 ID
**(metadata or {})
}

self.client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=f"{tenant_id}_{document_id}",
vector=embedding,
payload=payload
)
]
)

def search(self, tenant_id: str, query: str, top_k: int = 5) -> List[Dict]:
"""在租户数据内搜索"""
query_embedding = self._get_embedding(query)

# 关键:使用过滤条件确保只搜索该租户的数据
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="tenant_id",
match=MatchValue(value=tenant_id)
)
]
),
limit=top_k
)

return [
{
"id": result.id,
"content": result.payload["content"],
"score": result.score
}
for result in results
]

def delete_tenant_data(self, tenant_id: str):
"""删除租户的所有数据"""
self.client.delete(
collection_name=self.collection_name,
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="tenant_id",
match=MatchValue(value=tenant_id)
)
]
)
)
)

# 使用示例
store = MultiTenantVectorStore()

# 租户 A 添加文档
store.add_document("tenant_a", "doc_1", "这是租户A的文档内容")

# 租户 B 添加文档
store.add_document("tenant_b", "doc_1", "这是租户B的文档内容")

# 租户 A 只能搜到自己的文档
results = store.search("tenant_a", "文档")
print(results) # 只返回租户 A 的文档

最佳实践总结

1. 文档切分策略

def smart_chunk(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""智能文档切分"""
chunks = []
start = 0

while start < len(text):
# 找到 chunk_size 附近的句子边界
end = start + chunk_size
if end < len(text):
# 向后查找句子结束符
while end < len(text) and text[end] not in '。!?\n':
end += 1
end += 1

chunks.append(text[start:end].strip())
start = end - overlap # 重叠部分

return chunks

2. 嵌入模型选择

场景推荐模型维度
英文通用text-embedding-3-small1536
英文高精度text-embedding-3-large3072
中文BAAI/bge-large-zh1024
多语言paraphrase-multilingual-MiniLM384
轻量级all-MiniLM-L6-v2384

3. 性能优化建议

  1. 批量操作:使用批量插入而非单条插入
  2. 索引选择:根据数据规模选择合适的索引类型
  3. 预过滤:先过滤再向量搜索,减少计算量
  4. 缓存:缓存热门查询结果
  5. 异步处理:嵌入生成使用异步处理

4. 数据安全

  1. 访问控制:实施严格的 API 访问控制
  2. 数据加密:敏感数据加密存储
  3. 审计日志:记录所有数据访问操作
  4. 租户隔离:多租户场景确保数据隔离