跳到主要内容

文档操作

文档(Document)是 Elasticsearch 中最小的数据单元,以 JSON 格式存储。本章将详细介绍文档的增删改查操作,帮助你掌握 Elasticsearch 的核心数据操作技能。

理解文档结构

在开始操作之前,先理解文档的基本结构:

{
"_index": "articles",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"title": "Elasticsearch 入门教程",
"content": "本文介绍 Elasticsearch 的基本概念",
"author": "张三",
"views": 1000,
"created_at": "2024-01-15T10:30:00"
}
}

元数据字段说明:

  • _index:文档所属的索引
  • _id:文档的唯一标识符
  • _version:文档版本号,每次更新自动递增
  • _seq_no:序列号,用于乐观锁控制
  • _primary_term:主分片任期,用于乐观锁控制
  • _source:文档的原始 JSON 内容

创建文档

指定 ID 创建

使用 PUT 方法创建文档时需要指定文档 ID:

PUT /articles/_doc/1
{
"title": "Elasticsearch 入门教程",
"content": "本文介绍 Elasticsearch 的基本概念和使用方法",
"author": "张三",
"category": "技术",
"views": 1000,
"tags": ["elasticsearch", "搜索", "教程"],
"status": "published",
"created_at": "2024-01-15 10:30:00"
}

响应示例:

{
"_index": "articles",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

响应字段解读:

  • result:操作结果,created 表示新建,updated 表示更新
  • _shards.total:涉及的分片总数(主分片 + 副本分片)
  • _shards.successful:成功操作的分片数

自动生成 ID

使用 POST 方法可以省略 ID,Elasticsearch 会自动生成一个唯一 ID:

POST /articles/_doc
{
"title": "Elasticsearch 进阶教程",
"content": "深入讲解 Elasticsearch 的高级特性",
"author": "李四"
}

响应示例:

{
"_index": "articles",
"_id": "abc123def456",
"result": "created"
}

自动生成的 ID 是一个 22 个字符的 Base64 编码字符串,保证了全局唯一性。

创建时检查是否存在

有时我们需要确保文档是新建的,而不是覆盖已有文档:

# 方法一:使用 _create 端点
PUT /articles/_create/1
{
"title": "新文章"
}

# 方法二:使用 op_type 参数
PUT /articles/_doc/1?op_type=create
{
"title": "新文章"
}

如果文档已存在,会返回 409 错误:

{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, document already exists"
},
"status": 409
}

理解 PUT vs POST

  • PUT:幂等操作,需要指定 ID。如果文档已存在,会完全替换;如果不存在,则创建。
  • POST:非幂等操作,可以不指定 ID。不指定 ID 时总是创建新文档。

在实际应用中,如果你有自然的业务 ID(如用户 ID、商品 ID),建议使用 PUT 指定 ID;如果没有合适的 ID,使用 POST 让 Elasticsearch 自动生成。

获取文档

根据 ID 获取

GET /articles/_doc/1

响应示例:

{
"_index": "articles",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"title": "Elasticsearch 入门教程",
"content": "本文介绍 Elasticsearch 的基本概念和使用方法",
"author": "张三"
}
}

found 字段为 true 表示文档存在,为 false 表示文档不存在。

只获取源数据

如果你只需要文档内容而不需要元数据:

GET /articles/_source/1

响应示例:

{
"title": "Elasticsearch 入门教程",
"content": "本文介绍 Elasticsearch 的基本概念和使用方法",
"author": "张三"
}

获取特定字段

可以通过参数指定返回的字段,减少网络传输:

# 只获取 title 和 author 字段
GET /articles/_doc/1?_source=title,author

# 排除某些字段
GET /articles/_doc/1?_source_excludes=content,tags

# 包含某些字段
GET /articles/_doc/1?_source_includes=title,author,views

检查文档是否存在

使用 HEAD 方法可以只检查文档是否存在,不返回文档内容:

HEAD /articles/_doc/1

存在返回 200 OK,不存在返回 404 Not Found。这在需要验证文档存在性但不需要内容的场景下非常有用。

批量获取

当需要获取多个文档时,使用 _mget API 比多次单独请求更高效:

# 方式一:指定索引和 ID
GET /_mget
{
"docs": [
{ "_index": "articles", "_id": "1" },
{ "_index": "articles", "_id": "2" },
{ "_index": "products", "_id": "1" }
]
}

# 方式二:同一索引的简化写法
GET /articles/_mget
{
"ids": ["1", "2", "3"]
}

# 方式三:指定返回字段
GET /articles/_mget
{
"docs": [
{ "_id": "1", "_source": ["title", "author"] },
{ "_id": "2", "_source": ["title", "views"] }
]
}

批量获取的优势在于:

  • 减少网络往返次数
  • 服务端可以并行处理多个请求
  • 即使部分文档不存在,其他文档仍能正常返回

更新文档

完整替换

使用 PUT 方法会完全替换文档内容:

PUT /articles/_doc/1
{
"title": "Elasticsearch 完全指南",
"content": "更新后的内容",
"author": "张三"
}

重要提示: 这种方式会完全替换原文档。如果原文档有 tagsviews 字段,但新文档没有包含,这些字段会被删除。这就像重新写入整个文档。

部分更新

使用 _update API 可以只更新指定字段,保留其他字段不变:

POST /articles/_update/1
{
"doc": {
"views": 1500,
"title": "Elasticsearch 完全指南"
}
}

部分更新的工作原理:

  1. 读取原文档
  2. 合并更新内容
  3. 索引新文档
  4. 删除旧文档(标记为删除)

虽然部分更新看起来只是"修改"几个字段,但底层仍然是删除旧文档、创建新文档的过程。不过 Elasticsearch 帮你封装了这个过程,使用起来更方便。

使用脚本更新

脚本更新提供了更灵活的更新能力:

# 数值增减
POST /articles/_update/1
{
"script": {
"source": "ctx._source.views += params.increment",
"params": {
"increment": 100
}
}
}

# 条件更新
POST /articles/_update/1
{
"script": {
"source": "if (ctx._source.views > 1000) { ctx._source.status = 'popular' }"
}
}

# 添加数组元素
POST /articles/_update/1
{
"script": {
"source": "ctx._source.tags.add(params.tag)",
"params": {
"tag": "热门"
}
}
}

# 删除字段
POST /articles/_update/1
{
"script": {
"source": "ctx._source.remove('temp_field')"
}
}

脚本参数说明:

  • source:脚本代码,使用 Painless 语言
  • params:传递给脚本的参数,避免硬编码

Upsert 操作

Upsert(Update or Insert)操作:文档存在则更新,不存在则创建:

POST /articles/_update/2
{
"doc": {
"title": "新文章",
"views": 0
},
"doc_as_upsert": true
}

# 或使用 upsert 字段
POST /articles/_update/2
{
"script": {
"source": "ctx._source.views += 1"
},
"upsert": {
"title": "新文章",
"views": 1
}
}

两种方式的区别:

  • doc_as_upsert:文档不存在时,使用 doc 的内容创建新文档
  • upsert:文档不存在时,使用 upsert 的内容创建新文档;存在时执行 script

乐观锁控制

在高并发场景下,多个客户端可能同时更新同一文档。使用乐观锁可以防止并发冲突:

# 第一步:获取当前版本信息
GET /articles/_doc/1
# 返回 _seq_no: 5, _primary_term: 1

# 第二步:更新时指定版本
POST /articles/_update/1?if_seq_no=5&if_primary_term=1
{
"doc": {
"views": 1600
}
}

如果在此期间有其他客户端修改了文档,版本号会变化,你的更新请求会失败,返回 409 错误。这样可以避免"丢失更新"问题。

删除文档

根据 ID 删除

DELETE /articles/_doc/1

响应示例:

{
"_index": "articles",
"_id": "1",
"_version": 2,
"result": "deleted"
}

resultdeleted 表示删除成功,为 not_found 表示文档不存在。

根据查询删除

批量删除符合条件的文档:

POST /articles/_delete_by_query
{
"query": {
"match": {
"status": "draft"
}
}
}

删除所有文档

谨慎使用,会删除索引中的所有文档:

POST /articles/_delete_by_query
{
"query": {
"match_all": {}
}
}

注意: 删除操作不会立即释放磁盘空间。被删除的文档只是标记为删除,在段合并时才会真正清理。如果需要立即释放空间,可以执行 force merge:

POST /articles/_forcemerge?only_expunge_deletes=true

批量操作

Bulk API 是 Elasticsearch 最重要的批量操作接口,可以在单次请求中执行多个创建、更新、删除操作。

Bulk API 基本语法

POST /_bulk
{"index": {"_index": "articles", "_id": "1"}}
{"title": "文章1", "author": "张三"}
{"index": {"_index": "articles", "_id": "2"}}
{"title": "文章2", "author": "李四"}
{"update": {"_index": "articles", "_id": "1"}}
{"doc": {"views": 100}}
{"delete": {"_index": "articles", "_id": "3"}}

格式规则:

  • 每行必须是有效的 JSON,行与行之间用换行符分隔
  • 操作行和数据行必须成对出现(delete 除外)
  • 不能有格式化缩进、注释或多余逗号
  • 最后一行必须有换行符

操作类型:

操作类型说明是否需要数据行
index创建或替换文档
create创建文档(已存在则失败)
update部分更新文档
delete删除文档

指定索引的批量操作

如果所有操作都针对同一个索引,可以在 URL 中指定索引:

POST /articles/_bulk
{"index": {"_id": "1"}}
{"title": "文章1"}
{"index": {"_id": "2"}}
{"title": "文章2"}
{"update": {"_id": "1"}}
{"doc": {"views": 100}}

批量操作的 Python 示例

实际应用中,通常使用客户端库来简化批量操作:

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch("http://localhost:9200")

# 方式一:手动构建批量数据
actions = [
{
"_index": "articles",
"_id": i,
"_source": {
"title": f"文章{i}",
"author": "张三",
"views": i * 100,
"created_at": "2024-01-15"
}
}
for i in range(1000)
]

# 执行批量操作
helpers.bulk(es, actions)

# 方式二:使用生成器处理大数据量
def generate_actions():
for i in range(100000):
yield {
"_index": "articles",
"_source": {
"title": f"文章{i}",
"author": "张三"
}
}

helpers.bulk(es, generate_actions())

批量操作的最佳实践

  • 批量大小:每个批次 5-15MB 或 1000-5000 个文档较为合适
  • 并行执行:可以使用多线程并行提交多个批量请求
  • 错误处理:检查响应中的 errors 字段,逐条处理失败的操作
  • 重试机制:对于暂时性错误,实现指数退避重试

文档路由

默认情况下,Elasticsearch 使用文档 ID 的哈希值将文档分配到特定分片。了解路由机制对于优化查询性能很重要。

路由算法

文档到分片的分配遵循以下公式:

shard_num = hash(_routing) % num_primary_shards

默认情况下,_routing 就是文档 _id。这意味着同一个 ID 的文档总是会被分配到同一个分片。

自定义路由

在某些场景下,使用自定义路由可以提高查询效率。例如,将同一用户的所有文档存储在同一个分片:

# 写入时指定路由
PUT /articles/_doc/1?routing=user_123
{
"title": "用户文章",
"user_id": "user_123"
}

# 查询时指定路由(只搜索特定分片)
GET /articles/_search?routing=user_123
{
"query": {
"match_all": {}
}
}

# 获取文档时也需要指定路由
GET /articles/_doc/1?routing=user_123

优势: 查询时只需搜索一个分片而不是所有分片,大大减少查询范围。

风险: 如果路由键分布不均匀,可能导致某些分片数据量过大(数据倾斜)。

强制路由

可以要求索引必须使用自定义路由:

PUT /articles
{
"mappings": {
"_routing": {
"required": true
}
}
}

这样,不指定路由的文档操作会返回错误,防止数据错误分布。

文档刷新机制

理解刷新机制对于优化写入性能和保证数据可见性非常重要。

近实时搜索

Elasticsearch 是近实时(Near Real-Time, NRT)搜索引擎。文档写入后,大约 1 秒才能被搜索到。这是因为:

  1. 文档首先写入内存缓冲区
  2. 每隔一段时间(默认 1 秒),内存缓冲区的内容被刷新到新的段
  3. 新段对搜索可见

刷新间隔控制

# 修改刷新间隔
PUT /articles/_settings
{
"index": {
"refresh_interval": "5s"
}
}

# 禁用自动刷新(批量导入时)
PUT /articles/_settings
{
"index": {
"refresh_interval": "-1"
}
}

# 手动刷新
POST /articles/_refresh

写入时刷新选项

# 写入后立即刷新
PUT /articles/_doc/1?refresh=true
{
"title": "新文章"
}

# 写入后等待刷新完成
PUT /articles/_doc/1?refresh=wait_for
{
"title": "新文章"
}

# 不刷新(默认行为)
PUT /articles/_doc/1?refresh=false
{
"title": "新文章"
}

三种选项的区别:

  • refresh=true:立即刷新,文档马上可见,但会影响性能
  • refresh=wait_for:等待下次刷新完成后再返回,延迟 1 秒左右
  • refresh=false:不等待刷新,最快返回,但文档可能 1 秒内不可见

实战示例

博客文章管理系统

# 创建文章索引
PUT /articles
{
"mappings": {
"properties": {
"title": { "type": "text", "analyzer": "ik_max_word" },
"content": { "type": "text", "analyzer": "ik_max_word" },
"author": { "type": "keyword" },
"category": { "type": "keyword" },
"tags": { "type": "keyword" },
"status": { "type": "keyword" },
"views": { "type": "integer" },
"likes": { "type": "integer" },
"created_at": { "type": "date" },
"updated_at": { "type": "date" }
}
}
}

# 发布新文章
POST /articles/_doc
{
"title": "Python 异步编程指南",
"content": "详细介绍 asyncio 的使用方法...",
"author": "张三",
"category": "Python",
"tags": ["python", "asyncio", "教程"],
"status": "published",
"views": 0,
"likes": 0,
"created_at": "2024-01-15T10:30:00",
"updated_at": "2024-01-15T10:30:00"
}

# 增加浏览量
POST /articles/_doc/abc123/_update
{
"script": {
"source": "ctx._source.views += 1"
}
}

# 点赞
POST /articles/_doc/abc123/_update
{
"script": {
"source": "ctx._source.likes += 1"
}
}

# 修改状态为精华
POST /articles/_doc/abc123/_update
{
"doc": {
"status": "featured",
"updated_at": "2024-01-16T15:00:00"
}
}

# 添加标签
POST /articles/_doc/abc123/_update
{
"script": {
"source": "ctx._source.tags.add(params.tag)",
"params": {
"tag": "精华"
}
}
}

批量导入商品数据

# 使用 bulk 批量导入
POST /_bulk
{"index":{"_index":"products"}}
{"name":"iPhone 15","price":6999,"category":"手机","stock":100}
{"index":{"_index":"products"}}
{"name":"MacBook Pro","price":14999,"category":"电脑","stock":50}
{"index":{"_index":"products"}}
{"name":"iPad Pro","price":7999,"category":"平板","stock":80}
{"index":{"_index":"products"}}
{"name":"AirPods Pro","price":1999,"category":"耳机","stock":200}

# 批量更新库存
POST /products/_update_by_query
{
"script": {
"source": "ctx._source.stock -= params.sold",
"params": {
"sold": 1
}
},
"query": {
"term": {
"category": "手机"
}
}
}

小结

本章我们深入学习了 Elasticsearch 文档操作的核心知识:

  1. 创建文档:指定 ID 创建、自动生成 ID、创建时检查存在性
  2. 获取文档:单个获取、批量获取、字段过滤、存在性检查
  3. 更新文档:完整替换、部分更新、脚本更新、upsert 操作
  4. 删除文档:按 ID 删除、按查询删除
  5. 批量操作:Bulk API 的语法和最佳实践
  6. 文档路由:自定义路由的使用场景和注意事项
  7. 刷新机制:近实时搜索的原理和控制方法

练习

  1. 创建一个商品索引 products,添加 10 个商品文档,包含名称、价格、分类、库存等字段
  2. 使用批量操作导入 100 条测试数据
  3. 编写脚本实现:每次查看商品详情时自动增加浏览量
  4. 实现商品上架/下架功能:修改状态字段并记录更新时间
  5. 使用乐观锁实现:防止并发修改库存时出现超卖

参考资料