当前位置: 首页 > news >正文

【Elasticsearch从入门到精通】第16篇:Elasticsearch批量操作API——Bulk、Reindex与跨集群索引

上一篇【第15篇】 Elasticsearch删除与更新API——精确操作与脚本更新
下一篇【第17篇】Elasticsearch并发控制——refresh参数与乐观并发控制


摘要

在实际生产环境中,单条文档操作往往无法满足性能需求,批量操作API是Elasticsearch高性能数据处理的基石。本文全面介绍了Elasticsearch的批量操作能力,涵盖Bulk API的NDJSON格式规范(元数据行+数据行的两行结构)、四种操作类型(index/create/update/delete)的混合使用与区别、最优批次大小(通常5-15MB)的配置策略与错误处理机制。深入解析了Reindex API的数据迁移能力,包括源索引到目标索引的完整流程、查询过滤、版本控制(internal/external/create三种模式)、脚本转换与管道预处理,以及跨集群Reindex的远程连接配置和白名单策略。最后介绍了Term向量API(_termvectors)的词元信息获取、统计信息分析与过滤功能。掌握这些内容将使你能够高效地完成Elasticsearch中的大规模数据操作与迁移任务。

一、Bulk API基本格式

1.1 NDJSON格式规范

Bulk API(_bulk)允许在单个API调用中执行多个索引和删除操作,可以显著提高操作效率。它使用新行分隔的JSON(NDJSON)格式:

action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n ...

注意:最后一行数据必须以换行符(\n)结尾。发送请求时,Content-Type应设置为application/x-ndjson

1.2 四种操作类型

Bulk API支持以下四种操作:

操作类型说明需要数据行
index索引文档(存在则覆盖)
create创建文档(存在则报错)
delete删除文档
update更新文档是(doc/upsert/script)

1.3 完整示例

POST_bulk{"index":{"_index":"test","_id":"1"}}{"field1":"value1"}{"delete":{"_index":"test","_id":"2"}}{"create":{"_index":"test","_id":"3"}}{"field1":"value3"}{"update":{"_id":"1","_index":"test"}}{"doc":{"field2":"value2"}}

注意:请求的 Content-Type 必须设置为application/x-ndjson。如果使用Curl提供文本文件输入,必须使用--data-binary标志而非-d

二、Bulk API响应与错误处理

2.1 响应结构

Bulk API的响应是一个大型JSON结构,每个操作的结果按请求中的顺序对应返回:

{"took":30,"errors":false,"items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"delete":{"_index":"test","_type":"_doc","_id":"2","_version":1,"result":"not_found","_shards":{"total":2,"successful":1,"failed":0},"status":404}}]}

2.2 部分失败处理

关键特性:Bulk API中单个操作的失败不会影响其余操作的执行。即使某个操作报错,其他操作仍会正常完成。errors字段仅表示是否存在任何失败操作。

2.3 在索引级别指定默认值

可以使用/{index}/_bulk端点,为所有未显式指定索引的操作设置默认索引:

POSTtwitter/_bulk{"index":{"_id":"1"}}{"user":"kimchy","message":"hello"}{"delete":{"_id":"2"}}{"create":{"_id":"3"}}{"user":"other","message":"world"}

三、Bulk API的Update操作

3.1 Update操作格式

Bulk中的Update操作需要两行:元数据行和操作内容行。支持以下选项:doc、upsert、doc_as_upsert、script、params、lang和_source。

POST_bulk{"update":{"_id":"1","_index":"index1","retry_on_conflict":3}}{"doc":{"field":"value"}}{"update":{"_id":"0","_index":"index1","retry_on_conflict":3}}{"script":{"source":"ctx._source.counter += params.param1","lang":"painless","params":{"param1":1}},"upsert":{"counter":1}}{"update":{"_id":"2","_index":"index1","retry_on_conflict":3}}{"doc":{"field":"value"},"doc_as_upsert":true}

3.2 retry_on_conflict参数

retry_on_conflict指定在发生版本冲突时重试更新的次数,直接写在操作元数据行中:

{"update":{"_id":"1","_index":"test","retry_on_conflict":3}}

3.3 _source控制

可以控制Update操作后返回的_source内容:

{"update":{"_id":"3","_index":"index1","_source":true}}{"doc":{"field":"value"}}

也可以放在操作内容行中:

{"update":{"_id":"4","_index":"index1"}}{"doc":{"field":"value"},"_source":true}

四、Bulk API最优批次大小

4.1 批次大小选择原则

Bulk API中没有一个绝对适合所有场景的批次大小,应根据具体工作负载进行测试。以下是一些通用建议:

指标推荐值说明
批次大小5-15MB兼顾吞吐量和内存使用
单批文档数1000-5000条根据文档大小调整
线程数根据客户端CPU核数通常不超CPU核数的2倍

4.2 批次大小对比

批次大小吞吐量内存占用响应延迟适用场景
1-5MB实时写入、低延迟场景
5-15MB通用批量导入
15-100MB很高离线数据迁移

注意:如果使用HTTP API,确保客户端不发送HTTP块传输(chunked encoding),因为这会降低速度。

五、Reindex API数据迁移

5.1 基本用法

Reindex API(_reindex)将文档从一个索引复制到另一个索引。最基本的形式如下:

POST_reindex{"source":{"index":"twitter"},"dest":{"index":"new_twitter"}}

5.2 版本控制

Reindex API支持多种版本控制模式:

internal(默认):盲目地将文档转储到目标,覆盖同ID文档:

POST_reindex{"source":{"index":"twitter"},"dest":{"index":"new_twitter","version_type":"internal"}}

external:保留源索引的版本号,创建丢失的文档,更新旧版本文档:

POST_reindex{"source":{"index":"twitter"},"dest":{"index":"new_twitter","version_type":"external"}}

create:仅在目标索引中创建缺少的文档,已有文档会版本冲突:

POST_reindex{"source":{"index":"twitter"},"dest":{"index":"new_twitter","op_type":"create"}}

5.3 版本控制策略对比

策略行为适用场景
internal(默认)覆盖同ID文档全量数据迁移
external保留版本号,增量更新跨集群增量同步
create仅创建新文档补充缺失数据

5.4 版本冲突处理

默认情况下,版本冲突会中止Reindex进程。设置conflicts=proceed可以在冲突时继续:

POST_reindex{"conflicts":"proceed","source":{"index":"twitter"},"dest":{"index":"new_twitter"}}

5.5 查询过滤

可以通过向source添加查询条件来限制迁移的文档范围:

POST_reindex{"source":{"index":"twitter","query":{"term":{"user":"kimchy"}}},"dest":{"index":"new_twitter"}}

5.6 多源索引

source.index可以是一个列表,允许从多个源索引复制:

POST_reindex{"source":{"index":["twitter","blog"]},"dest":{"index":"all_together"}}

5.7 字段过滤

通过_source过滤只迁移需要的字段:

POST_reindex{"source":{"index":"twitter","_source":["user","message"]},"dest":{"index":"new_twitter"}}

5.8 脚本转换

Reindex支持通过脚本修改文档内容和元数据:

POST_reindex{"source":{"index":"twitter"},"dest":{"index":"new_twitter"},"script":{"source":"ctx._source.tags = ctx._source.tags ?: []; ctx._source.timestamp = ctx._source.remove('post_date');","lang":"painless"}}

脚本中可以修改的元数据字段包括:_id_index_version_routing。可以设置ctx.opnoopdelete来控制操作行为。

5.9 路由控制

dest上设置路由参数:

POST_reindex{"source":{"index":"source","query":{"match":{"company":"cat"}}},"dest":{"index":"dest","routing":"cat"}}

路由参数支持三种值:

参数值行为
keep(默认)使用源索引的路由值
discard目标索引路由值设为空
<自定义值>所有文档使用指定路由

5.10 批次大小与管道

可以设置批次大小和使用索引预处理管道:

POST_reindex{"source":{"index":"source","size":100},"dest":{"index":"dest","pipeline":"my_pipeline"}}

六、跨集群Reindex

6.1 基本配置

Reindex API支持从远程Elasticsearch集群重新索引数据:

POST_reindex{"source":{"remote":{"host":"https://otherhost:9200","username":"user","password":"pass"},"index":"twitter"},"dest":{"index":"new_twitter"}}

6.2 白名单配置

远程主机必须在elasticsearch.yml中显式配置白名单:

reindex.remote.whitelist:"otherhost:9200,another:9200,127.0.10.*:9200,localhost:*"

白名单规则:

  • 使用逗号分隔的主机和端口组合
  • 支持通配符(如127.0.10.*:9200
  • 忽略通信协议,只匹配主机和端口
  • 必须在所有协调节点上配置

注意:使用Basic Auth时务必使用HTTPS,否则密码将以明文传输。跨集群Reindex功能可以与任何版本的Elasticsearch配合使用,是集群升级的有效方式。

6.3 远程连接参数

POST_reindex{"source":{"remote":{"host":"https://otherhost:9200","username":"user","password":"pass","socket_timeout":"1m","connect_timeout":"10s"},"index":"twitter","size":10},"dest":{"index":"new_twitter"}}
参数说明默认值
socket_timeout读超时时间30s
connect_timeout连接超时时间30s
size批次大小1000

注意:从远程服务器重新索引使用堆内缓冲区,默认最大为100MB。如果远程索引包含大文档,需要使用较小的批次大小。

七、Term向量API

7.1 基本概念

Term向量(Term Vectors)用来存储文档字段的Term信息(字段文本分词得到的词条)和统计信息。Term向量在默认情况下是实时的。

7.2 获取Term向量

GETtwitter/_termvectors/1

或者指定字段:

GETtwitter/_termvectors/1?fields=message

也可以通过请求体指定字段:

POSTtwitter/_termvectors/1{"fields":["message"],"term_statistics":true,"field_statistics":true}

7.3 返回值类型

类型参数说明默认
Term信息term_statistics总词频、文档频率false
Term统计positionsTerm位置信息不返回
Term统计offsetsTerm起始/结束偏移不返回
字段统计field_statistics文档计数、词频总和true

7.4 Term过滤

使用filter参数可以根据tf-idf分数过滤返回的Term,帮助找出文档的特征向量:

POSTtwitter/_termvectors/1{"fields":["plot"],"term_statistics":true,"filter":{"max_num_terms":3,"min_term_freq":1,"min_doc_freq":1}}

过滤参数说明:

参数说明默认值
max_num_terms每个字段返回的最大Term数25
min_term_freq源文档中最低词频1
max_term_freq源文档中最高词频无限
min_doc_freq最低文档频率1
max_doc_freq最高文档频率无限
min_word_length最小词长0
max_word_length最大词长无限

7.5 多文档Term向量

_mtermvectorsAPI允许一次获取多个文档的Term向量:

POSTtwitter/_mtermvectors{"ids":["1","2"],"fields":["text"],"term_statistics":true}

也可以在请求中提供人工文档来生成Term向量:

POSTtwitter/_mtermvectors{"docs":[{"_id":"1","fields":["text"]},{"doc":{"text":"some text"},"fields":["text"]}]}

八、总结与最佳实践

8.1 核心要点回顾

  1. Bulk API是Elasticsearch批量操作的核心,NDJSON格式简洁高效,支持四种操作混合使用
  2. 部分失败隔离是Bulk API的重要特性,单个操作失败不影响其他操作
  3. 批次大小建议在5-15MB之间,需要根据实际场景测试调优
  4. Reindex API是数据迁移的首选方案,支持版本控制、查询过滤、脚本转换等丰富功能
  5. 跨集群Reindex需要配置白名单和HTTPS认证,是集群升级和数据同步的有效手段
  6. Term向量API提供了词元级别的分析能力,适用于文本分析和相关性调试

8.2 生产环境最佳实践

  • Bulk批次优化:通过测试找到最佳批次大小,监控批量操作的响应时间和错误率
  • Reindex限速:大数据量迁移使用requests_per_second限速,避免影响在线业务
  • 跨集群安全:远程连接务必使用HTTPS,严格配置白名单
  • 增量迁移:使用version_type: external实现增量同步,避免全量覆盖
  • 脚本转换:在Reindex时使用脚本完成字段重命名、类型转换等数据清洗工作

上一篇【第15篇】 Elasticsearch删除与更新API——精确操作与脚本更新
下一篇【第17篇】Elasticsearch并发控制——refresh参数与乐观并发控制


http://www.jsqmd.com/news/870116/

相关文章:

  • 在无锡卖金饰,我只找福正美——上门回收的真实体验分享 - 上门黄金回收
  • docker、harbor、jenkins概念
  • Tiger vs Dagger:Java依赖注入框架的终极对比指南 [特殊字符]
  • [特殊字符] CNSH · 数据主权与AI伦理治理总纲 v2.0
  • React上下文菜单常见问题解答:解决10个典型使用难题
  • 抖音下载器完整指南:三步实现高效批量下载
  • B站视频下载解决方案:实现高清内容本地化存储的技术实践
  • 28 岁大专逆袭转行网络安全 资深前辈避坑忠告
  • 2026 初夏黔地包车测评:十家旅行社对比,贵阳美途说口碑出圈 - 美途说
  • HarmonyOS鸿蒙三方库移植:选 vcpkg 还是 lycium_plusplus?两种“框架化”方案对比
  • KanBots:开源看板工具,每张卡片跑一个并行 AI Agent,Hacker News 147 星炸裂
  • D2DX技术深度解析:如何为经典暗黑破坏神2注入现代图形渲染能力
  • 7. 线程编程(线程概念和创建)
  • 内存分析工具WinDbg及GFlags安装、使用详解
  • Windows和Office激活终极指南:5分钟搞定智能KMS激活
  • d2dx终极指南:三步让你的暗黑破坏神2在现代PC上焕然一新
  • 武商一卡通怎么回收?优质回收平台推荐! - 团团收购物卡回收
  • Unity开发笔记系列(协程)—— Coroutine continue failure报错
  • CTF 竞赛干货|50 个实战解题思路,收藏一篇就够用
  • EdgeFlow:Blender边缘流优化技术解析与拓扑革命
  • 如何在5分钟内掌握Translumo:Windows平台最强实时屏幕翻译工具
  • 2026南溪县黄金回收避坑指南;闲置黄金变现;认准铭润金银回收,诚信靠谱 - 亦辰小黄鸭
  • Python基础语法(二)
  • 工控行业IO信号Web监控平台原理及技术实现方案
  • 湖北国泓环境工程:江汉正规的开荒保洁公司怎么联系 - LYL仔仔
  • 3分钟彻底清理Windows右键菜单:ContextMenuManager让你的操作效率翻倍
  • 如何突破数字枷锁:QMCDecode终极解决方案实现音频格式自由
  • 跨平台串口调试终极指南:SSCom让硬件开发更简单
  • 工作中经常修改的安卓系统配置
  • 本centOS 10 机器所安装的数据库