跳到主要内容

Milvus 向量数据库

Milvus 是一个开源的云原生向量数据库,专为大规模向量检索和 AI 应用设计。它支持十亿级向量的存储和检索,提供分布式架构和丰富的索引类型,是生产环境的首选方案之一。

概述

为什么选择 Milvus

特性说明
开源免费Apache 2.0 协议,无商业限制
云原生支持 Kubernetes 部署,水平扩展
高性能GPU 加速,十亿级向量毫秒级查询
丰富索引支持 HNSW、IVF、PQ 等多种 ANN 算法
多语言 SDKPython、Java、Go、Node.js、C++
混合查询向量相似度 + 标量过滤

架构组件

┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ (Python SDK / Java SDK / REST API) │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 接入层 (Proxy) │
│ 负载均衡、请求路由、鉴权 │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 协调服务层 │
│ Root Coord │ Data Coord │ Query Coord │ Index Coord │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 执行层 │
│ Query Node │ Data Node │ Index Node │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│ 存储层 │
│ Object Storage (S3/OSS) │ Meta Store (ETCD) │
└─────────────────────────────────────────────────────────┘

快速开始

1. 部署 Milvus

Docker Compose 部署(开发测试)

# 下载安装脚本
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh

# 启动 Milvus
bash standalone_embed.sh start

# 停止 Milvus
bash standalone_embed.sh stop

Kubernetes 部署(生产环境)

# 添加 Helm 仓库
helm repo add milvus https://milvus-io.github.io/milvus-helm/
helm repo update

# 安装 Milvus
helm install my-milvus milvus/milvus

# 查看状态
kubectl get pods -l app.kubernetes.io/instance=my-milvus

2. 安装 SDK

pip install pymilvus

3. 第一个示例

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

# 连接到 Milvus
connections.connect("default", host="localhost", port="19530")

# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384)
]

# 创建集合(类似数据库表)
schema = CollectionSchema(fields, "示例集合")
collection = Collection("hello_milvus", schema)

# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200}
}
collection.create_index(field_name="embedding", index_params=index_params)

# 准备数据
import random
data = [
["Milvus 是一个向量数据库", "Python 是一种编程语言", "机器学习是 AI 的分支"],
[[random.random() for _ in range(384)] for _ in range(3)]
]

# 插入数据
entities = [
data[0], # text
data[1] # embedding
]
collection.insert(entities)

# 加载集合到内存
collection.load()

# 搜索
search_params = {"metric_type": "COSINE", "params": {"ef": 64}}
results = collection.search(
data=[[random.random() for _ in range(384)]],
anns_field="embedding",
param=search_params,
limit=3,
output_fields=["text"]
)

for result in results:
for item in result:
print(f"ID: {item.id}, Distance: {item.distance}, Text: {item.entity.get('text')}")

# 释放资源
collection.release()

核心概念

Collection(集合)

集合是 Milvus 中最高级别的数据组织单位,类似于关系型数据库的表。

from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# 定义字段
fields = [
# 主键字段
FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True # 自动生成 ID
),
# 标量字段(用于过滤)
FieldSchema(
name="category",
dtype=DataType.VARCHAR,
max_length=64
),
FieldSchema(
name="price",
dtype=DataType.FLOAT
),
# 向量字段
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=1536 # 向量维度
)
]

# 创建集合
schema = CollectionSchema(
fields=fields,
description="产品集合",
enable_dynamic_field=True # 允许动态字段
)

collection = Collection(name="products", schema=schema)

支持的字段类型

类型说明示例
INT8/16/32/64整数计数、ID
FLOAT/DOUBLE浮点数价格、评分
VARCHAR字符串名称、描述
BOOL布尔值是否可用
JSONJSON 对象复杂元数据
ARRAY数组标签列表
FLOAT_VECTOR浮点向量嵌入向量
BINARY_VECTOR二进制向量紧凑存储

Partition(分区)

分区用于物理隔离数据,提高查询效率。

# 创建分区
collection.create_partition("electronics")
collection.create_partition("clothing")

# 向指定分区插入数据
data = [
["iPhone", "electronics"],
[[0.1, 0.2, ...]] # 向量
]
collection.insert(data, partition_name="electronics")

# 从指定分区查询
collection.load(partition_names=["electronics"])
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
partition_names=["electronics"], # 只在 electronics 分区搜索
limit=10
)

Index(索引)

索引加速向量相似度搜索。

# HNSW 索引(推荐)
hnsw_params = {
"metric_type": "COSINE", # 或 L2, IP
"index_type": "HNSW",
"params": {
"M": 16, # 最大连接数(2-64)
"efConstruction": 200 # 构建搜索范围(建议 > 100)
}
}
collection.create_index(field_name="embedding", index_params=hnsw_params)

# IVF_FLAT 索引
ivf_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 128} # 聚类中心数
}
collection.create_index(field_name="embedding", index_params=ivf_params)

# IVF_SQ8 索引(压缩存储)
ivf_sq8_params = {
"metric_type": "L2",
"index_type": "IVF_SQ8",
"params": {"nlist": 128}
}

# 查看索引信息
print(collection.indexes)

# 删除索引
collection.drop_index()

数据操作

插入数据

# 单条插入
data = {
"text": "Milvus 向量数据库",
"category": "database",
"embedding": [0.1, 0.2, 0.3, ...] # 1536维
}
collection.insert([data])

# 批量插入(推荐)
batch_size = 1000
texts = ["文本1", "文本2", ...]
categories = ["cat1", "cat2", ...]
embeddings = [[...], [...], ...] # 预先生成的向量

for i in range(0, len(texts), batch_size):
batch_data = [
texts[i:i+batch_size],
categories[i:i+batch_size],
embeddings[i:i+batch_size]
]
collection.insert(batch_data)
print(f"已插入 {i+len(batch_data[0])}/{len(texts)}")

# 刷新数据(确保持久化)
collection.flush()

搜索

基本向量搜索

# 加载集合到内存
collection.load()

# 搜索参数
search_params = {
"metric_type": "COSINE",
"params": {"ef": 64} # HNSW 搜索范围
}

# 执行搜索
results = collection.search(
data=[query_embedding], # 查询向量列表
anns_field="embedding", # 向量字段名
param=search_params,
limit=10, # 返回结果数
output_fields=["text", "category"] # 返回的字段
)

# 处理结果
for hits in results:
for hit in hits:
print(f"ID: {hit.id}")
print(f"Distance: {hit.distance}") # 距离/相似度
print(f"Text: {hit.entity.get('text')}")
print(f"Category: {hit.entity.get('category')}")

带过滤条件的搜索

from pymilvus import Filter

# 使用表达式过滤
expr = "category == 'electronics' and price < 1000"

results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=10,
expr=expr, # 过滤表达式
output_fields=["text", "price"]
)

# 支持的表达式操作符
# ==, !=, >, >=, <, <=
# in, not in
# &&, ||, !
# like(前缀匹配)

# 复杂表达式示例
expr = "(category in ['electronics', 'computers']) && (price >= 100 && price <= 1000)"
expr = "name like 'iPhone%'" # 前缀匹配

批量搜索

# 一次搜索多个查询向量
query_embeddings = [vec1, vec2, vec3, ...]

results = collection.search(
data=query_embeddings,
anns_field="embedding",
param=search_params,
limit=10
)

# results 的长度等于 query_embeddings 的长度
for i, hits in enumerate(results):
print(f"查询 {i} 的结果:")
for hit in hits:
print(f" ID: {hit.id}, Distance: {hit.distance}")

查询(Query)

Query 用于基于标量字段的精确查询,不涉及向量相似度计算。

# 根据表达式查询
results = collection.query(
expr="category == 'electronics'",
output_fields=["id", "text", "price"],
limit=100
)

for result in results:
print(f"ID: {result['id']}, Text: {result['text']}")

# 根据 ID 查询
results = collection.query(
expr="id in [1, 2, 3]",
output_fields=["*"] # 返回所有字段
)

# 分页查询
results = collection.query(
expr="category == 'electronics'",
output_fields=["id", "text"],
limit=10,
offset=20 # 跳过前 20 条
)

删除数据

# 根据表达式删除
collection.delete(expr="category == 'obsolete'")

# 根据 ID 删除
collection.delete(expr="id in [1, 2, 3]")

# 删除所有数据
collection.delete(expr="id >= 0")

更新数据

Milvus 不支持直接更新,需要先删除再插入:

# 1. 查询现有数据
results = collection.query(
expr=f"id == {target_id}",
output_fields=["*"]
)

if results:
old_data = results[0]

# 2. 删除旧数据
collection.delete(expr=f"id == {target_id}")

# 3. 插入新数据(保持相同 ID)
new_data = {
"id": target_id,
"text": "更新后的文本",
"category": old_data["category"],
"embedding": new_embedding
}
collection.insert([new_data])

collection.flush()

与嵌入模型集成

OpenAI Embedding

from openai import OpenAI
import os

openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

def get_embeddings(texts, model="text-embedding-3-small"):
"""批量获取文本嵌入"""
response = openai_client.embeddings.create(
input=texts,
model=model
)
return [item.embedding for item in response.data]

# 示例:构建语义搜索
documents = [
"Milvus 是一个开源向量数据库",
"Python 是流行的编程语言",
"深度学习是机器学习的子集"
]

# 生成嵌入
embeddings = get_embeddings(documents)

# 插入 Milvus
data = [
documents,
["tech"] * len(documents),
embeddings
]
collection.insert(data)

# 搜索
query = "什么是向量数据库?"
query_embedding = get_embeddings([query])[0]

results = collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=3,
output_fields=["text"]
)

for hit in results[0]:
print(f"Score: {hit.distance:.4f}, Text: {hit.entity.get('text')}")

本地模型(Sentence-Transformers)

from sentence_transformers import SentenceTransformer

# 加载本地模型
model = SentenceTransformer('BAAI/bge-large-zh') # 中文模型

def get_embeddings(texts):
"""批量编码文本"""
return model.encode(texts, normalize_embeddings=True).tolist()

# 使用方式与 OpenAI 相同

RAG 应用完整示例

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
from openai import OpenAI
import os

class MilvusRAG:
def __init__(self, collection_name="rag_docs"):
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# OpenAI 客户端
self.openai = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# 集合名称
self.collection_name = collection_name

# 初始化或获取集合
if utility.has_collection(collection_name):
self.collection = Collection(collection_name)
else:
self._create_collection()

def _create_collection(self):
"""创建集合"""
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=4096),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536)
]

schema = CollectionSchema(fields, "RAG 知识库")
self.collection = Collection(self.collection_name, schema)

# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 200}
}
self.collection.create_index("embedding", index_params)

def add_documents(self, documents):
"""
添加文档到知识库
documents: [{"content": "...", "source": "..."}, ...]
"""
# 提取内容
contents = [doc["content"] for doc in documents]
sources = [doc.get("source", "") for doc in documents]

# 生成嵌入
embeddings = self._get_embeddings(contents)

# 插入数据
data = [contents, sources, embeddings]
self.collection.insert(data)
self.collection.flush()

print(f"已添加 {len(documents)} 篇文档")

def _get_embeddings(self, texts):
"""获取文本嵌入"""
response = self.openai.embeddings.create(
input=texts,
model="text-embedding-3-small"
)
return [item.embedding for item in response.data]

def search(self, query, top_k=5):
"""搜索相关文档"""
# 加载集合
self.collection.load()

# 生成查询向量
query_embedding = self._get_embeddings([query])[0]

# 搜索
results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=top_k,
output_fields=["content", "source"]
)

# 格式化结果
documents = []
for hit in results[0]:
documents.append({
"content": hit.entity.get("content"),
"source": hit.entity.get("source"),
"score": hit.distance
})

return documents

def answer(self, question, top_k=3):
"""生成回答"""
# 检索相关文档
docs = self.search(question, top_k)

# 构建上下文
context = "\n\n".join([
f"[文档 {i+1}] {doc['content']}"
for i, doc in enumerate(docs)
])

# 调用 LLM
prompt = f"""基于以下文档回答问题。如果文档中没有相关信息,请说明。

文档:
{context}

问题:{question}

回答:"""

response = self.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个基于文档回答问题的助手。"},
{"role": "user", "content": prompt}
]
)

return {
"answer": response.choices[0].message.content,
"sources": docs
}

# 使用示例
rag = MilvusRAG()

# 添加文档
docs = [
{"content": "Milvus 是一个开源的云原生向量数据库...", "source": "milvus.io"},
{"content": "向量数据库用于存储和检索高维向量数据...", "source": "wikipedia.org"}
]
rag.add_documents(docs)

# 提问
result = rag.answer("什么是 Milvus?")
print(f"回答:{result['answer']}")
print(f"来源:{result['sources']}")

性能优化

批量操作

import time

# 批量插入优化
def batch_insert(collection, texts, embeddings, batch_size=1000):
total = len(texts)
for i in range(0, total, batch_size):
batch_texts = texts[i:i+batch_size]
batch_embeddings = embeddings[i:i+batch_size]

data = [batch_texts, batch_embeddings]
collection.insert(data)

if i % 10000 == 0:
print(f"已插入 {i}/{total}")

collection.flush()

# 批量搜索优化
def batch_search(collection, query_embeddings, batch_size=100):
results = []
for i in range(0, len(query_embeddings), batch_size):
batch = query_embeddings[i:i+batch_size]
batch_results = collection.search(
data=batch,
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=10
)
results.extend(batch_results)
return results

索引选择

场景推荐索引理由
追求速度HNSW查询最快
内存受限IVF_SQ8压缩存储
小规模 (<10万)FLAT100% 精确
GPU 加速GPU_IVF_FLAT利用 GPU 计算

分区策略

# 按时间分区(适合时序数据)
collection.create_partition("2024_01")
collection.create_partition("2024_02")

# 按类别分区(适合类别明确的数据)
collection.create_partition("electronics")
collection.create_partition("clothing")

# 查询时指定分区,减少搜索范围
collection.search(
data=[query],
anns_field="embedding",
param=search_params,
partition_names=["2024_02"], # 只搜索最新数据
limit=10
)

运维管理

监控指标

# 查看集合统计
stats = collection.get_stats()
print(f"行数: {stats['row_count']}")

# 查看索引进度
index_progress = collection.index().progress
print(f"索引进度: {index_progress}%")

# 查看加载状态
load_state = collection.load_state()
print(f"加载状态: {load_state}")

数据备份

# 导出数据(使用 query)
batch_size = 1000
offset = 0
all_data = []

while True:
results = collection.query(
expr="id >= 0",
output_fields=["*"],
limit=batch_size,
offset=offset
)

if not results:
break

all_data.extend(results)
offset += batch_size

if len(results) < batch_size:
break

# 保存到文件
import json
with open("backup.json", "w") as f:
json.dump(all_data, f)

集群管理

# 查看集群节点
from pymilvus import utility

nodes = utility.get_query_segment_info("collection_name")
for node in nodes:
print(f"节点: {node.nodeID}, 段: {node.segmentID}")

常见问题

Q: 如何升级 Milvus?

# 备份数据
# 使用 Helm 升级
helm upgrade my-milvus milvus/milvus

# 或使用 Docker
bash standalone_embed.sh stop
# 拉取新版本镜像
bash standalone_embed.sh start

Q: 查询返回结果为空?

检查清单:

  1. 集合是否已加载 collection.load()
  2. 索引是否已创建
  3. 查询向量维度是否匹配
  4. 过滤表达式是否正确

Q: 内存不足怎么办?

  1. 使用压缩索引(IVF_SQ8、PQ)
  2. 增加分区,按需加载
  3. 扩容集群节点
  4. 调整 queryNode.cacheSize

下一步