Qdrant 向量数据库
Qdrant 是一个用 Rust 编写的开源向量数据库,专注于高性能、高可靠性和易用性。它提供丰富的过滤功能、分布式部署能力和多种客户端 SDK,是构建 AI 应用的理想选择。
概述
为什么选择 Qdrant
| 特性 | 说明 |
|---|---|
| Rust 编写 | 内存安全,性能优异,资源占用低 |
| 高性能 | 单节点每秒处理数万次查询 |
| 丰富过滤 | 支持复杂的 payload 过滤条件 |
| 多种索引 | HNSW、量化索引,灵活选择 |
| 分布式 | 原生支持集群部署 |
| 多语言 SDK | Python、Rust、Go、TypeScript |
适用场景
- 高性能需求:需要低延迟、高吞吐的向量搜索
- 复杂过滤:需要结合向量相似度和复杂条件过滤
- 资源受限:需要轻量级但高性能的解决方案
- 边缘部署:Rust 的高效性适合边缘设备
快速开始
1. 部署 Qdrant
Docker 部署
# 运行 Qdrant
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
Docker Compose 部署
# docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC
volumes:
- ./qdrant_storage:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
2. 安装 Python SDK
pip install qdrant-client
3. 第一个示例
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
# 连接 Qdrant
client = QdrantClient(host="localhost", port=6333)
# 创建集合
collection_name = "test_collection"
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
# 准备数据
points = [
PointStruct(
id=1,
vector=[0.1, 0.2, 0.3] + [0.0] * 381,
payload={"text": "第一篇文档", "category": "tech"}
),
PointStruct(
id=2,
vector=[0.2, 0.3, 0.4] + [0.0] * 381,
payload={"text": "第二篇文档", "category": "life"}
),
PointStruct(
id=3,
vector=[0.3, 0.4, 0.5] + [0.0] * 381,
payload={"text": "第三篇文档", "category": "tech"}
)
]
# 插入数据
client.upsert(collection_name=collection_name, points=points)
# 搜索
search_result = client.search(
collection_name=collection_name,
query_vector=[0.1, 0.2, 0.3] + [0.0] * 381,
limit=3
)
for result in search_result:
print(f"ID: {result.id}, Score: {result.score}, Text: {result.payload['text']}")
核心概念
Collection(集合)
集合是 Qdrant 中的数据组织单位,包含向量和 payload。
from qdrant_client.models import VectorParams, Distance
# 创建基本集合
client.create_collection(
collection_name="my_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
# 创建带配置的集合
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
hnsw_config={
"m": 16, # 最大连接数
"ef_construct": 100, # 构建参数
}
),
optimizers_config={
"default_segment_number": 2,
},
replication_factor=1, # 副本数
)
# 支持的相似度度量
# Distance.COSINE - 余弦相似度
# Distance.EUCLID - 欧几里得距离
# Distance.DOT - 点积
# Distance.MANHATTAN - 曼哈顿距离
# 删除集合
client.delete_collection(collection_name="my_collection")
# 获取集合信息
collection_info = client.get_collection(collection_name="my_collection")
print(f"向量数: {collection_info.points_count}")
print(f"维度: {collection_info.config.params.vectors.size}")
Point(点)
Point 是 Qdrant 中的基本数据单元,包含:
id: 唯一标识符(整数或 UUID)vector: 向量数据payload: 附加元数据(JSON 格式)
from qdrant_client.models import PointStruct
import uuid
# 使用整数 ID
point1 = PointStruct(
id=1,
vector=[0.1, 0.2, ...],
payload={"text": "文档内容", "category": "tech"}
)
# 使用 UUID
point2 = PointStruct(
id=str(uuid.uuid4()),
vector=[0.2, 0.3, ...],
payload={
"text": "另一篇文档",
"tags": ["ai", "ml"],
"metadata": {"author": "张三", "views": 100}
}
)
Payload(载荷)
Payload 是与向量关联的 JSON 元数据,支持复杂的数据结构和过滤。
payload = {
# 基本类型
"text": "文档内容",
"category": "technology",
"views": 1000,
"is_published": True,
# 数组
"tags": ["ai", "ml", "python"],
"scores": [95, 87, 92],
# 嵌套对象
"metadata": {
"author": "张三",
"created_at": "2024-01-01",
"source": {
"url": "https://example.com",
"domain": "example.com"
}
},
# 地理位置
"location": {
"lat": 39.9042,
"lon": 116.4074
}
}
数据操作
插入数据
from qdrant_client.models import PointStruct
# 单条插入
client.upsert(
collection_name="my_collection",
points=[
PointStruct(
id=1,
vector=[0.1, 0.2, ...],
payload={"text": "文档1"}
)
]
)
# 批量插入(推荐)
points = [
PointStruct(id=i, vector=vec, payload=payload)
for i, (vec, payload) in enumerate(zip(vectors, payloads))
]
client.upsert(
collection_name="my_collection",
points=points
)
# 等待索引完成(可选)
client.upsert(
collection_name="my_collection",
points=points,
wait=True
)
搜索
基本向量搜索
# 基本搜索
results = client.search(
collection_name="my_collection",
query_vector=[0.1, 0.2, ...],
limit=10
)
# 带偏移的搜索(分页)
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
offset=20 # 跳过前 20 条
)
# 指定返回的 payload 字段
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_payload=["text", "category"] # 只返回指定字段
)
# 不返回 payload
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_payload=False
)
# 返回向量
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_vectors=True
)
带过滤条件的搜索
from qdrant_client.models import Filter, FieldCondition, MatchValue, Range
# 等于过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="technology")
)
]
),
limit=10
)
# 范围过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="views",
range=Range(
gte=100, # 大于等于
lte=1000 # 小于等于
)
)
]
),
limit=10
)
# 数组包含过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="tags",
match=MatchValue(value="ai") # 包含 "ai" 标签
)
]
),
limit=10
)
# 复杂组合过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[ # AND 条件
FieldCondition(
key="category",
match=MatchValue(value="technology")
),
FieldCondition(
key="is_published",
match=MatchValue(value=True)
)
],
should=[ # OR 条件
FieldCondition(
key="tags",
match=MatchValue(value="ai")
),
FieldCondition(
key="tags",
match=MatchValue(value="ml")
)
],
must_not=[ # NOT 条件
FieldCondition(
key="status",
match=MatchValue(value="deleted")
)
]
),
limit=10
)
批量搜索
# 一次搜索多个查询向量
results = client.search_batch(
collection_name="my_collection",
requests=[
SearchRequest(vector=vec1, limit=10),
SearchRequest(vector=vec2, limit=10),
SearchRequest(vector=vec3, limit=10),
]
)
# 每个查询的结果
for i, result_list in enumerate(results):
print(f"查询 {i} 的结果:")
for result in result_list:
print(f" ID: {result.id}, Score: {result.score}")
获取数据
# 根据 ID 获取
results = client.retrieve(
collection_name="my_collection",
ids=[1, 2, 3],
with_payload=True,
with_vectors=False
)
# 滚动获取所有数据(分页)
all_points = []
next_offset = None
while True:
results, next_offset = client.scroll(
collection_name="my_collection",
limit=1000,
offset=next_offset,
with_payload=True
)
all_points.extend(results)
if next_offset is None:
break
print(f"总共 {len(all_points)} 条数据")
更新和删除
# 更新 payload
client.set_payload(
collection_name="my_collection",
payload={"views": 2000},
points=[1] # 指定 ID
)
# 覆盖 payload
client.overwrite_payload(
collection_name="my_collection",
payload={"new_field": "new_value"},
points=[1]
)
# 删除 payload 字段
client.delete_payload(
collection_name="my_collection",
keys=["old_field"],
points=[1]
)
# 删除点
client.delete(
collection_name="my_collection",
points_selector=PointIdsList(points=[1, 2, 3])
)
# 按过滤条件删除
client.delete(
collection_name="my_collection",
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="obsolete")
)
]
)
)
)
完整 RAG 示例
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from openai import OpenAI
import os
class QdrantRAG:
def __init__(self, collection_name="knowledge_base"):
# 连接 Qdrant
self.client = QdrantClient(host="localhost", port=6333)
# OpenAI 客户端
self.openai = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
self.collection_name = collection_name
# 确保集合存在
self._setup_collection()
def _setup_collection(self):
"""设置集合"""
if not self.client.collection_exists(self.collection_name):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
def add_documents(self, documents):
"""添加文档到知识库"""
points = []
for i, doc in enumerate(documents):
# 生成嵌入
embedding = self._get_embedding(doc["content"])
points.append(
PointStruct(
id=i,
vector=embedding,
payload={
"content": doc["content"],
"source": doc.get("source", ""),
"title": doc.get("title", ""),
"category": doc.get("category", "general")
}
)
)
# 批量插入
self.client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"已添加 {len(documents)} 篇文档")
def _get_embedding(self, text):
"""获取文本嵌入"""
response = self.openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
def search(self, query, top_k=5, category=None):
"""搜索相关文档"""
# 生成查询向量
query_vector = self._get_embedding(query)
# 构建过滤条件
query_filter = None
if category:
query_filter = Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value=category)
)
]
)
# 搜索
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=query_filter,
limit=top_k
)
return [
{
"id": result.id,
"content": result.payload["content"],
"source": result.payload["source"],
"score": result.score
}
for result in results
]
def answer(self, question, top_k=3):
"""生成回答"""
# 检索相关文档
docs = self.search(question, top_k)
# 构建上下文
context = "\n\n".join([
f"[文档 {i+1}] {doc['content']}"
for i, doc in enumerate(docs)
])
# 调用 LLM
prompt = f"""基于以下文档回答问题。如果文档中没有相关信息,请说明。
文档:
{context}
问题:{question}
回答:"""
response = self.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个基于文档回答问题的助手。"},
{"role": "user", "content": prompt}
]
)
return {
"answer": response.choices[0].message.content,
"sources": docs
}
# 使用示例
rag = QdrantRAG()
docs = [
{"content": "Qdrant 是一个用 Rust 编写的向量数据库...", "source": "qdrant.tech"},
{"content": "向量数据库用于 AI 应用的语义搜索...", "source": "blog"}
]
rag.add_documents(docs)
result = rag.answer("什么是 Qdrant?")
print(f"回答:{result['answer']}")
高级功能
量化索引
# 创建带标量量化的集合
client.create_collection(
collection_name="quantized_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config={
"scalar": {
"type": "int8", # 8位量化
"quantile": 0.99,
"always_ram": True
}
}
)
# 创建带乘积量化的集合
client.create_collection(
collection_name="pq_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config={
"product": {
"compression": "x16", # 压缩比
"always_ram": True
}
}
)
地理位置过滤
from qdrant_client.models import GeoBoundingBox, GeoPoint
# 地理范围过滤
results = client.search(
collection_name="places",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="location",
geo_bounding_box=GeoBoundingBox(
bottom_right=GeoPoint(lat=39.8, lon=116.5),
top_left=GeoPoint(lat=40.0, lon=116.3)
)
)
]
),
limit=10
)
# 地理半径过滤
from qdrant_client.models import GeoRadius
results = client.search(
collection_name="places",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="location",
geo_radius=GeoRadius(
center=GeoPoint(lat=39.9, lon=116.4),
radius=5000 # 5公里
)
)
]
),
limit=10
)
快照和恢复
# 创建快照
snapshot_info = client.create_snapshot(collection_name="my_collection")
print(f"快照路径: {snapshot_info.name}")
# 恢复快照
client.recover_snapshot(
collection_name="my_collection",
location="snapshots/my_collection/my_snapshot.snapshot"
)
# 列出快照
snapshots = client.list_snapshots(collection_name="my_collection")
for snapshot in snapshots:
print(f"快照: {snapshot.name}, 大小: {snapshot.size}")
性能优化
批量操作
# 批量插入优化
def batch_upsert(client, collection_name, documents, embeddings, batch_size=1000):
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
batch_embeddings = embeddings[i:i+batch_size]
points = [
PointStruct(id=j+i, vector=vec, payload={"text": doc})
for j, (doc, vec) in enumerate(zip(batch_docs, batch_embeddings))
]
client.upsert(
collection_name=collection_name,
points=points,
wait=False # 异步插入
)
print(f"已插入 {i+len(batch_docs)}/{len(documents)}")
索引调优
# 创建高性能索引配置
client.create_collection(
collection_name="high_perf",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
hnsw_config={
"m": 32, # 更高的连接数 = 更好的精度
"ef_construct": 200, # 更高的构建参数 = 更好的精度
"full_scan_threshold": 10000,
"max_indexing_threads": 4,
"on_disk": False # 保持在内存中
}
)
)
# 搜索时调整 ef 参数
results = client.search(
collection_name="high_perf",
query_vector=query_vector,
search_params={
"hnsw_ef": 128 # 更高的值 = 更好的精度,但速度更慢
},
limit=10
)
内存优化
# 使用磁盘存储
client.create_collection(
collection_name="disk_collection",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
on_disk=True # 向量存储在磁盘
)
)
# Payload 存储在磁盘
client.create_payload_index(
collection_name="my_collection",
field_name="large_text",
field_schema="text",
on_disk=True
)
常见问题
Q: Qdrant 与 Milvus 如何选择?
| 维度 | Qdrant | Milvus |
|---|---|---|
| 性能 | 单节点性能优异 | 分布式性能优异 |
| 资源占用 | 较低 | 较高 |
| 功能丰富度 | 核心功能完善 | 功能更全面 |
| 部署复杂度 | 简单 | 较复杂 |
| 适用规模 | 千万级以下 | 亿级以上 |
Q: 如何处理大量数据?
# 1. 使用批量插入
# 2. 调整 segment 数量
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
optimizers_config={
"default_segment_number": 4,
"indexing_threshold": 20000
}
)
# 3. 使用量化减少内存占用
Q: 如何备份数据?
# 方法1:使用快照
snapshot = client.create_snapshot("my_collection")
# 方法2:导出数据
all_points = []
next_offset = None
while True:
results, next_offset = client.scroll(
collection_name="my_collection",
limit=10000,
offset=next_offset,
with_vectors=True
)
all_points.extend(results)
if next_offset is None:
break
# 保存到文件
import json
with open("backup.json", "w") as f:
json.dump([{
"id": p.id,
"vector": p.vector,
"payload": p.payload
} for p in all_points], f)
下一步
- pgvector 教程 - PostgreSQL 的向量扩展
- Redis Vector - Redis 向量功能(即将推出)
- 实践案例 - 完整项目示例