跳到主要内容

Qdrant 向量数据库

Qdrant 是一个用 Rust 编写的开源向量数据库,专注于高性能、高可靠性和易用性。它提供丰富的过滤功能、分布式部署能力和多种客户端 SDK,是构建 AI 应用的理想选择。

概述

为什么选择 Qdrant

特性说明
Rust 编写内存安全,性能优异,资源占用低
高性能单节点每秒处理数万次查询
丰富过滤支持复杂的 payload 过滤条件
多种索引HNSW、量化索引,灵活选择
分布式原生支持集群部署
多语言 SDKPython、Rust、Go、TypeScript

适用场景

  • 高性能需求:需要低延迟、高吞吐的向量搜索
  • 复杂过滤:需要结合向量相似度和复杂条件过滤
  • 资源受限:需要轻量级但高性能的解决方案
  • 边缘部署:Rust 的高效性适合边缘设备

快速开始

1. 部署 Qdrant

Docker 部署

# 运行 Qdrant
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant

Docker Compose 部署

# docker-compose.yml
version: '3.8'

services:
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC
volumes:
- ./qdrant_storage:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333

2. 安装 Python SDK

pip install qdrant-client

3. 第一个示例

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

# 连接 Qdrant
client = QdrantClient(host="localhost", port=6333)

# 创建集合
collection_name = "test_collection"

client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

# 准备数据
points = [
PointStruct(
id=1,
vector=[0.1, 0.2, 0.3] + [0.0] * 381,
payload={"text": "第一篇文档", "category": "tech"}
),
PointStruct(
id=2,
vector=[0.2, 0.3, 0.4] + [0.0] * 381,
payload={"text": "第二篇文档", "category": "life"}
),
PointStruct(
id=3,
vector=[0.3, 0.4, 0.5] + [0.0] * 381,
payload={"text": "第三篇文档", "category": "tech"}
)
]

# 插入数据
client.upsert(collection_name=collection_name, points=points)

# 搜索
search_result = client.search(
collection_name=collection_name,
query_vector=[0.1, 0.2, 0.3] + [0.0] * 381,
limit=3
)

for result in search_result:
print(f"ID: {result.id}, Score: {result.score}, Text: {result.payload['text']}")

核心概念

Collection(集合)

集合是 Qdrant 中的数据组织单位,包含向量和 payload。

from qdrant_client.models import VectorParams, Distance

# 创建基本集合
client.create_collection(
collection_name="my_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

# 创建带配置的集合
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
hnsw_config={
"m": 16, # 最大连接数
"ef_construct": 100, # 构建参数
}
),
optimizers_config={
"default_segment_number": 2,
},
replication_factor=1, # 副本数
)

# 支持的相似度度量
# Distance.COSINE - 余弦相似度
# Distance.EUCLID - 欧几里得距离
# Distance.DOT - 点积
# Distance.MANHATTAN - 曼哈顿距离

# 删除集合
client.delete_collection(collection_name="my_collection")

# 获取集合信息
collection_info = client.get_collection(collection_name="my_collection")
print(f"向量数: {collection_info.points_count}")
print(f"维度: {collection_info.config.params.vectors.size}")

Point(点)

Point 是 Qdrant 中的基本数据单元,包含:

  • id: 唯一标识符(整数或 UUID)
  • vector: 向量数据
  • payload: 附加元数据(JSON 格式)
from qdrant_client.models import PointStruct
import uuid

# 使用整数 ID
point1 = PointStruct(
id=1,
vector=[0.1, 0.2, ...],
payload={"text": "文档内容", "category": "tech"}
)

# 使用 UUID
point2 = PointStruct(
id=str(uuid.uuid4()),
vector=[0.2, 0.3, ...],
payload={
"text": "另一篇文档",
"tags": ["ai", "ml"],
"metadata": {"author": "张三", "views": 100}
}
)

Payload(载荷)

Payload 是与向量关联的 JSON 元数据,支持复杂的数据结构和过滤。

payload = {
# 基本类型
"text": "文档内容",
"category": "technology",
"views": 1000,
"is_published": True,

# 数组
"tags": ["ai", "ml", "python"],
"scores": [95, 87, 92],

# 嵌套对象
"metadata": {
"author": "张三",
"created_at": "2024-01-01",
"source": {
"url": "https://example.com",
"domain": "example.com"
}
},

# 地理位置
"location": {
"lat": 39.9042,
"lon": 116.4074
}
}

数据操作

插入数据

from qdrant_client.models import PointStruct

# 单条插入
client.upsert(
collection_name="my_collection",
points=[
PointStruct(
id=1,
vector=[0.1, 0.2, ...],
payload={"text": "文档1"}
)
]
)

# 批量插入(推荐)
points = [
PointStruct(id=i, vector=vec, payload=payload)
for i, (vec, payload) in enumerate(zip(vectors, payloads))
]

client.upsert(
collection_name="my_collection",
points=points
)

# 等待索引完成(可选)
client.upsert(
collection_name="my_collection",
points=points,
wait=True
)

搜索

基本向量搜索

# 基本搜索
results = client.search(
collection_name="my_collection",
query_vector=[0.1, 0.2, ...],
limit=10
)

# 带偏移的搜索(分页)
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
offset=20 # 跳过前 20 条
)

# 指定返回的 payload 字段
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_payload=["text", "category"] # 只返回指定字段
)

# 不返回 payload
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_payload=False
)

# 返回向量
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
limit=10,
with_vectors=True
)

带过滤条件的搜索

from qdrant_client.models import Filter, FieldCondition, MatchValue, Range

# 等于过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="technology")
)
]
),
limit=10
)

# 范围过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="views",
range=Range(
gte=100, # 大于等于
lte=1000 # 小于等于
)
)
]
),
limit=10
)

# 数组包含过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="tags",
match=MatchValue(value="ai") # 包含 "ai" 标签
)
]
),
limit=10
)

# 复杂组合过滤
results = client.search(
collection_name="my_collection",
query_vector=query_vector,
query_filter=Filter(
must=[ # AND 条件
FieldCondition(
key="category",
match=MatchValue(value="technology")
),
FieldCondition(
key="is_published",
match=MatchValue(value=True)
)
],
should=[ # OR 条件
FieldCondition(
key="tags",
match=MatchValue(value="ai")
),
FieldCondition(
key="tags",
match=MatchValue(value="ml")
)
],
must_not=[ # NOT 条件
FieldCondition(
key="status",
match=MatchValue(value="deleted")
)
]
),
limit=10
)

批量搜索

# 一次搜索多个查询向量
results = client.search_batch(
collection_name="my_collection",
requests=[
SearchRequest(vector=vec1, limit=10),
SearchRequest(vector=vec2, limit=10),
SearchRequest(vector=vec3, limit=10),
]
)

# 每个查询的结果
for i, result_list in enumerate(results):
print(f"查询 {i} 的结果:")
for result in result_list:
print(f" ID: {result.id}, Score: {result.score}")

获取数据

# 根据 ID 获取
results = client.retrieve(
collection_name="my_collection",
ids=[1, 2, 3],
with_payload=True,
with_vectors=False
)

# 滚动获取所有数据(分页)
all_points = []
next_offset = None

while True:
results, next_offset = client.scroll(
collection_name="my_collection",
limit=1000,
offset=next_offset,
with_payload=True
)

all_points.extend(results)

if next_offset is None:
break

print(f"总共 {len(all_points)} 条数据")

更新和删除

# 更新 payload
client.set_payload(
collection_name="my_collection",
payload={"views": 2000},
points=[1] # 指定 ID
)

# 覆盖 payload
client.overwrite_payload(
collection_name="my_collection",
payload={"new_field": "new_value"},
points=[1]
)

# 删除 payload 字段
client.delete_payload(
collection_name="my_collection",
keys=["old_field"],
points=[1]
)

# 删除点
client.delete(
collection_name="my_collection",
points_selector=PointIdsList(points=[1, 2, 3])
)

# 按过滤条件删除
client.delete(
collection_name="my_collection",
points_selector=FilterSelector(
filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="obsolete")
)
]
)
)
)

完整 RAG 示例

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from openai import OpenAI
import os

class QdrantRAG:
def __init__(self, collection_name="knowledge_base"):
# 连接 Qdrant
self.client = QdrantClient(host="localhost", port=6333)

# OpenAI 客户端
self.openai = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

self.collection_name = collection_name

# 确保集合存在
self._setup_collection()

def _setup_collection(self):
"""设置集合"""
if not self.client.collection_exists(self.collection_name):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)

def add_documents(self, documents):
"""添加文档到知识库"""
points = []

for i, doc in enumerate(documents):
# 生成嵌入
embedding = self._get_embedding(doc["content"])

points.append(
PointStruct(
id=i,
vector=embedding,
payload={
"content": doc["content"],
"source": doc.get("source", ""),
"title": doc.get("title", ""),
"category": doc.get("category", "general")
}
)
)

# 批量插入
self.client.upsert(
collection_name=self.collection_name,
points=points
)

print(f"已添加 {len(documents)} 篇文档")

def _get_embedding(self, text):
"""获取文本嵌入"""
response = self.openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding

def search(self, query, top_k=5, category=None):
"""搜索相关文档"""
# 生成查询向量
query_vector = self._get_embedding(query)

# 构建过滤条件
query_filter = None
if category:
query_filter = Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value=category)
)
]
)

# 搜索
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=query_filter,
limit=top_k
)

return [
{
"id": result.id,
"content": result.payload["content"],
"source": result.payload["source"],
"score": result.score
}
for result in results
]

def answer(self, question, top_k=3):
"""生成回答"""
# 检索相关文档
docs = self.search(question, top_k)

# 构建上下文
context = "\n\n".join([
f"[文档 {i+1}] {doc['content']}"
for i, doc in enumerate(docs)
])

# 调用 LLM
prompt = f"""基于以下文档回答问题。如果文档中没有相关信息,请说明。

文档:
{context}

问题:{question}

回答:"""

response = self.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个基于文档回答问题的助手。"},
{"role": "user", "content": prompt}
]
)

return {
"answer": response.choices[0].message.content,
"sources": docs
}

# 使用示例
rag = QdrantRAG()

docs = [
{"content": "Qdrant 是一个用 Rust 编写的向量数据库...", "source": "qdrant.tech"},
{"content": "向量数据库用于 AI 应用的语义搜索...", "source": "blog"}
]
rag.add_documents(docs)

result = rag.answer("什么是 Qdrant?")
print(f"回答:{result['answer']}")

高级功能

量化索引

# 创建带标量量化的集合
client.create_collection(
collection_name="quantized_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config={
"scalar": {
"type": "int8", # 8位量化
"quantile": 0.99,
"always_ram": True
}
}
)

# 创建带乘积量化的集合
client.create_collection(
collection_name="pq_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config={
"product": {
"compression": "x16", # 压缩比
"always_ram": True
}
}
)

地理位置过滤

from qdrant_client.models import GeoBoundingBox, GeoPoint

# 地理范围过滤
results = client.search(
collection_name="places",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="location",
geo_bounding_box=GeoBoundingBox(
bottom_right=GeoPoint(lat=39.8, lon=116.5),
top_left=GeoPoint(lat=40.0, lon=116.3)
)
)
]
),
limit=10
)

# 地理半径过滤
from qdrant_client.models import GeoRadius

results = client.search(
collection_name="places",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="location",
geo_radius=GeoRadius(
center=GeoPoint(lat=39.9, lon=116.4),
radius=5000 # 5公里
)
)
]
),
limit=10
)

快照和恢复

# 创建快照
snapshot_info = client.create_snapshot(collection_name="my_collection")
print(f"快照路径: {snapshot_info.name}")

# 恢复快照
client.recover_snapshot(
collection_name="my_collection",
location="snapshots/my_collection/my_snapshot.snapshot"
)

# 列出快照
snapshots = client.list_snapshots(collection_name="my_collection")
for snapshot in snapshots:
print(f"快照: {snapshot.name}, 大小: {snapshot.size}")

性能优化

批量操作

# 批量插入优化
def batch_upsert(client, collection_name, documents, embeddings, batch_size=1000):
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
batch_embeddings = embeddings[i:i+batch_size]

points = [
PointStruct(id=j+i, vector=vec, payload={"text": doc})
for j, (doc, vec) in enumerate(zip(batch_docs, batch_embeddings))
]

client.upsert(
collection_name=collection_name,
points=points,
wait=False # 异步插入
)

print(f"已插入 {i+len(batch_docs)}/{len(documents)}")

索引调优

# 创建高性能索引配置
client.create_collection(
collection_name="high_perf",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
hnsw_config={
"m": 32, # 更高的连接数 = 更好的精度
"ef_construct": 200, # 更高的构建参数 = 更好的精度
"full_scan_threshold": 10000,
"max_indexing_threads": 4,
"on_disk": False # 保持在内存中
}
)
)

# 搜索时调整 ef 参数
results = client.search(
collection_name="high_perf",
query_vector=query_vector,
search_params={
"hnsw_ef": 128 # 更高的值 = 更好的精度,但速度更慢
},
limit=10
)

内存优化

# 使用磁盘存储
client.create_collection(
collection_name="disk_collection",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE,
on_disk=True # 向量存储在磁盘
)
)

# Payload 存储在磁盘
client.create_payload_index(
collection_name="my_collection",
field_name="large_text",
field_schema="text",
on_disk=True
)

常见问题

Q: Qdrant 与 Milvus 如何选择?

维度QdrantMilvus
性能单节点性能优异分布式性能优异
资源占用较低较高
功能丰富度核心功能完善功能更全面
部署复杂度简单较复杂
适用规模千万级以下亿级以上

Q: 如何处理大量数据?

# 1. 使用批量插入
# 2. 调整 segment 数量
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
optimizers_config={
"default_segment_number": 4,
"indexing_threshold": 20000
}
)

# 3. 使用量化减少内存占用

Q: 如何备份数据?

# 方法1:使用快照
snapshot = client.create_snapshot("my_collection")

# 方法2:导出数据
all_points = []
next_offset = None

while True:
results, next_offset = client.scroll(
collection_name="my_collection",
limit=10000,
offset=next_offset,
with_vectors=True
)
all_points.extend(results)
if next_offset is None:
break

# 保存到文件
import json
with open("backup.json", "w") as f:
json.dump([{
"id": p.id,
"vector": p.vector,
"payload": p.payload
} for p in all_points], f)

下一步