当前位置: 首页 > news >正文

文章十五:ElasticSearch 运用ingest加工索引数据

ingest简单介绍

他是es中的独立的数据处理加工的模块,等同于是轻量级的ETL(数据的抽取,转换,加载),类似于logstash,使用的是popeline的管道处理模型。

应用场景

数据写入,数据更新,构建大宽表,索引重建,共享处理

常规应用实战

创建ingest

PUT _ingest/pipeline/shijian_test { "description": "尝试创建ingest pipeline", "processors": [ { "set": { "field": "shijian", "value": "小黑黑" } }, { "remove": { "field": "age" } } ] }

虚拟执行命令:

POST _ingest/pipeline/shijian_test/_simulate { "docs": [ { "_index":"chen", "_source":{ "name":"lihua" } } ] }

ingest场景应用-数据写入:

真实场景中执行数据写入,执行pipeLine进行数据的处理:

POST ingest_test/_doc?pipeline=shijian_test { "name":"lisi", "age":11, "class":"11" } PUT ingest_test/_doc/1?pipeline=shijian_test { "name":"lihua", "age":10 }

使用这种方式进行数据的插入的话,这个数据在执行的时候就会被ingest进行处理,达到我们数据处理的作用。

ingest场景应用-数据更新:

原始数据一开始就在我们的索引中,但是需要对他进行处理,使用ingest进行数据的更新

POST ingest_test/_update_by_query?pipeline=shijian_test { "query": { "match": { "name": "lihua" } } }

ingest场景应用-索引重建:

在使用reindex重建索引时,指定pipeline这个字段

POST _reindex { "source": { "index":"kibana_sample_data_flights" }, "dest": { "index": "ingest_test", "pipeline": "shijian_test" } }

ingest访问索引meta元数据访问

PUT _ingest/pipeline/shijian_test { "description": "创建ingest", "processors": [ { "set": { "field": "_meta.index", "value": "{{_index}}" } }, { "set": { "field": "_meta.id", "value": "{{_id}}" } }, { "set": { "field": "_meta.timestamp", "value": "{{_ingest.timestamp}}" } } ] }

通过这种方式我们可以在数据的meta数据中拿到我们需要的数据。

PUT index_test/_doc/1?pipeline=shijian_test {"name":"lihua"} GET index_test/_search { "took": 466, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "index_test", "_id": "1", "_score": 1, "_source": { "name": "lihua", "_meta": { "index": "index_test", "id": "1", "timestamp": "2026-04-29T13:19:55.617899914Z" } } } ] } }

ingest访问索引source源数据:

在创建的时候我们可以从原有的数据中,获取到_source中的数据的语法如下:

PUT _ingest/pipeline/shijian_test { "description": "创建ingest", "processors": [ { "set": { "field": "new_name", "value": "{{name}}" } }, { "set": { "field": "new_source_name", "value": "{{_source.name}}" } } ] }

高级实战应用:

if逻辑条件判断

PUT _ingest/pipeline/shijian_test { "description": "测试使用if语句", "processors": [ { "set": { "if": "ctx.age==10", "field": "class", "value": 3 } } ] }

ignore_failure和on_failure属性

创建ingest之后,如果我们在命令中执行了实际上没有的字段或者是出现了错误的时候,终端请求,抛出异常,下面的例子会展示出来这个问题,在实际的开发中我们可以通过高级的属性来解决这个问题。

ignore_failure:默认是false,是否在执行数据处理的时候,当前处理的字段出现错误时,忽略这个错误,如果忽略,则继续执行

on_failure:

在不忽略问题的时候,如果出现问题的时候,执行什么语句。

PUT _ingest/pipeline/shijian_test { "description": "测试使用if语句", "processors": [ { "set": { "if": "ctx.age==10", "field": "class", "value": 3 } }, { "remove": { "field": "text", "ignore_failure": false, "on_failure": [ { "remove":{ "filed":"name" } } ] } } ] }

但是在实际开发中,出现问题直接报错还是很正常的事情,可以帮助我们发现问题和解决问题。

pipeline多管道执行

我们在执行之前创建多个ingest管道,之后使用多个管道执行任务

#创建两个管道 PUT _ingest/pipeline/test_001 { "description": "001号", "processors": [ { "set": { "field": "name", "value": "lihua" } } ] } PUT _ingest/pipeline/test_002 { "description": "002号", "processors": [ { "set": { "field": "age", "value": 11 } } ] } #创建高级管道,绑定两个管道 PUT _ingest/pipeline/number_sum_pipeline { "processors": [ { "pipeline": { "name": "test_001" } }, { "pipeline": { "name": "test_002" } } ] } #查询数据 GET index_test/_search #执行命令插入数据 PUT index_test/_doc/1?pipeline=number_sum_pipeline { "class":1 }

script+ingest处理数据问题

假设我们在实际的存储中,存储的一个人的姓名但是是分开存储的,这是我们使用脚本将他合并到一起,使用这个例子展示一下应用。

PUT _ingest/pipeline/name_ingest { "processors": [ { "script": { "source": """ String first_name = ctx.f_name; String last_name = ctx.l_name; ctx.name = [first_name,first_name,last_name] """, "lang": "painless" } } ] } PUT index_test/_doc/1?pipeline=name_ingest { "f_name":"li", "l_name":"hua" }

执行之后,进行查询时他的返回结果是:

{ "took": 68, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1, "hits": [ { "_index": "index_test", "_id": "1", "_score": 1, "_source": { "f_name": "li", "name": [ "li", "li", "hua" ], "l_name": "hua" } } ] } }
http://www.jsqmd.com/news/729275/

相关文章:

  • 手把手教你学Simulink——基于Simulink的扰动观测器(DOB)负载扰动补偿
  • 系统架构设计师论文预测题目2:论云原生架构下的可观测性系统设计
  • 芯片展哪家好?聚焦芯片前沿技术,甄选业内高人气专业芯片展 - 品牌2026
  • 电商导购 Agent:个性化推荐与下单 Harness
  • 关于搭建运维监控系统(Prometheus+Grafana)
  • NVIDIA TAO实战:手写字符检测与识别模型优化
  • 使用Python快速编写第一个调用Taotoken多模型API的脚本
  • 空间计算领域领军企业是哪家?镜像视界
  • VLFM复现!
  • 基于文本控制的PET医学影像降噪技术解析
  • EchoDistill:扩散模型一步个性化新方法解析
  • 大模型微调实战:LoRA 微调 LLaMA 2 踩坑全解+数据集预处理+训练调优+落地部署(8G显存可跑)
  • 如何高效使用跨平台自动化工具:KeymouseGo 鼠标键盘录制实战指南
  • 再战齿槽力!用Anti-Notch抑制齿槽力扰动效果竟然出乎意料的好!
  • 最简单把deepseek接入vscode
  • 【仿真测试】基于FPGA的QPSK软解调+扩频通信链路实现,包含帧同步,定时点,扩频伪码同步,信道,误码统计
  • 国内半导体展哪家好?2026年行业优质国内半导体展资源 - 品牌2026
  • 零基础学AI编程之一 Claude Code安装保姆级教程
  • 如何快速实现音乐地址解析:一站式跨平台音乐解析解决方案
  • 用STM32CubeMX和HAL库快速上手RFID读卡器(附完整工程源码)
  • Windows 11 + CUDA 11.8 环境下,手把手教你用 PaddleOCR 2.6 训练一个识别手写笔记的模型
  • 强化学习在图像质量评估中的应用:EditScore工具解析
  • 从蓝帽杯Misc赛题复盘,聊聊CTF比赛中那些“藏在流量里”的密码与哈希
  • 2026年灵芝酒贴牌定制哪家权威:黄精鹿鞭酒贴牌定制、养生酒代加工、养生酒贴牌定制、灵芝酒贴牌定制、石斛酒贴牌定制选择指南 - 优质品牌商家
  • 自动驾驶决策系统:CoIRL-AD框架的双策略动态平衡
  • 基于Model Context Protocol的Trello AI自动化管理实践
  • Swoole长连接安全水位线告警系统:基于eBPF实时监控FD泄漏、内存驻留超2s请求、非预期LLM token流(含Grafana看板开源)
  • 基于RAG的学术论文智能对话系统:Talk2Arxiv架构与部署实战
  • 第二十一天 基本计算器 II
  • TiDAR架构:融合自回归与扩散模型的语言生成新范式