当前位置: 首页 > news >正文

logstash定时同步elasticsearch数据 - Leonardo

1.logstash简介

  Logstash 是 Elastic Stack 的数据处理管道,负责采集、转换和输出数据。

2.使用场景

  由于业务需要,我需要将elastic集群中的数据定时同步到另一个独立的elastic单节点中,对比了一些工具,最后还是选择了logstash。

3.elastic环境准备

  • es集群和单节点部署这里就不作介绍了,我使用的是docker部署的
环境 IP地址 端口  elastic版本
elastic集群一主两从环境(生产环境)

192.168.1.2、192.168.1.3、192.168.1.4

使用nginx代理9201 7.1.1
elastic单节点(数据统计环境) 172.16.4.53 9200 7.1.1

4.logstash部署

4.1 创建容器映射目录

mkdir -p /data/logstash/{config,conf.d,logs,data}

4.2 创建logstash主配置文件(/data/logstash/config)

[root@localhost config]# cat /data/logstash/config/logstash.yml 
http.host: "0.0.0.0"
path.config: /usr/share/logstash/conf.d/*.conf
path.logs: /var/log/logstash
path.data: /usr/share/logstash/data

4.3 创建logstash日志文件(/data/logstash/config)

[root@localhost config]# cat /data/logstash/config/log4j2.properties 
# 日志级别:你可以根据需要调整,例如改成 DEBUG 可以看到更详细的同步过程
logger.elasticsearchoutput.name = logstash.outputs.elasticsearch
logger.elasticsearchoutput.level = DEBUGlogger.elasticsearchinput.name = logstash.inputs.elasticsearch
logger.elasticsearchinput.level = DEBUGappender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%nappender.rolling.type = RollingFile
appender.rolling.name = plain_rolling
appender.rolling.fileName = /var/log/logstash/logstash.log
appender.rolling.filePattern = /var/log/logstash/logstash-%d{yyyy-MM-dd}.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 30rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = plain_console
rootLogger.appenderRef.rolling.ref = plain_rolling

4.4 创建数据同步配置(/data/logstash/conf.d)

  • input 是源 ES 集群(http://192.168.1.2:9201),负责从生产环境读取数据;
    output 是目标单节点(http://192.168.4.53:9200),负责将数据写入你独立的 ES 实例。

    input 导出数据绝不会影响源集群的原始数据。
    Logstash 使用 Elasticsearch 的 只读 Scroll API 拉取数据,整个过程仅执行搜索请求,不会对源索引的文档、设置或映射做任何修改、删除或写入操作。你可以把它理解为在 Kibana 里执行了一次查询,只是结果被转发到了别处。

[root@localhost conf.d]# cat /data/logstash/conf.d/sync_to_single.conf 
input {elasticsearch {hosts => ["http://172.16.4.60:9200"]   # 替换为源集群IPuser => "elastic"password => "123456"index => "lipc_test88,cm_aidata2"           # 替换为实际索引名size => 1000scroll => "5m"docinfo => trueschedule => "55 17 * * *"                # 需要定时同步时放开#query => '{#  "range": {#    "timeStamp": {#      "gte": "now-1d/d",#      "lt": "now/d",#      "time_zone": "+08:00"#    }#  }#}'}
}filter {mutate {remove_field => ["@version", "@timestamp"]}
}output {elasticsearch {hosts => ["http://172.16.4.53:9200"]  #替换为目标地址user => "elastic"password => "123456"index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"action => "index"}stdout { codec => rubydebug }
}

4.5 设置容器映射目录权限

chown -R 1000:1000 /data/logstash

4.6 启动logstash容器

  • logstash版本需要与elastic版本一致,否则同步数据可能会有问题
[root@localhost logstash ]# cat /data/logstash/docker-compose.yaml 
version: '3.8'services:logstash:image: docker.elastic.co/logstash/logstash:7.1.1container_name: logstash-syncrestart: unless-stopped                # 容器退出后自动重启,除非手动停止environment:- TZ=Asia/Shanghaivolumes:- /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml- /data/logstash/conf.d/:/usr/share/logstash/conf.d/- /data/logstash/logs:/var/log/logstash- /data/logstash/data:/usr/share/logstash/data- /data/logstash/config/log4j2.properties:/usr/share/logstash/config/log4j2.propertiesnetwork_mode: "host"

5.同步测试

5.1 创建测试数据并插入elastic

 

5.2 执行一次性数据同步

 

5.3 定时执行同步

 

5.4 定时增量同步

 

6.编写elastic命令执行小工具

  • 可以使用小工具执行es常用命令
[root@localhost elastic]# cat es_command.sh 
#!/bin/bash
# ES 快捷命令工具 - 支持数字/中文/英文
# 使用方法: ./es_quick.sh [命令] 或直接运行交互式ES_HOST="192.168.1.2:9200"
USER="elastic"
PASS="123456"# 基础curl命令
curl_es() {curl -s -u "$USER:$PASS" "$@"
}# 显示帮助
show_help() {echo "========================================"echo "ES 快捷命令工具"echo "========================================"echo "数字 | 中文   | 英文       | 功能"echo "----------------------------------------"echo "1    | 索引   | index      | 查看所有索引"echo "2    | 容量   | disk       | 查看磁盘使用"echo "3    | 健康   | health     | 查看集群健康"echo "4    | 节点   | node       | 查看节点信息"echo "5    | 分片   | shard      | 查看分片分配"echo "6    | 文档   | count      | 查看文档数量"echo "7    | 搜索   | search     | 搜索索引数据"echo "8    | 映射   | mapping    | 查看索引映射"echo "9    | 设置   | setting    | 查看索引设置"echo "10   | 统计   | stats      | 查看集群统计"echo "11   | 任务   | task       | 查看任务列表"echo "12   | 模板   | template   | 查看索引模板"echo "13   | 删除   | delete     | 删除索引"echo "14   | 创建   | create     | 创建索引"echo "15   | 别名   | alias      | 查看别名"echo "16   | 快照   | snapshot   | 查看快照仓库"echo "17   | ILM    | ilm        | 查看ILM策略"echo "18   | SQL    | sql        | 查看SQL状态"echo "19   | 状态   | status     | 查看集群状态"echo "20   | 帮助   | help       | 显示此帮助"echo "0    | 退出   | exit       | 退出程序"echo "========================================"
}# 执行命令
execute_command() {case $1 in# 数字命令1|索引|index)curl_es "http://$ES_HOST/_cat/indices?v";;2|容量|disk)curl_es "http://$ES_HOST/_cat/allocation?v";;3|健康|health)curl_es "http://$ES_HOST/_cluster/health?pretty";;4|节点|node)curl_es "http://$ES_HOST/_cat/nodes?v";;5|分片|shard)curl_es "http://$ES_HOST/_cat/shards?v";;6|文档|count)echo -n "索引名(回车查看所有): "read idxif [ -z "$idx" ]; thencurl_es "http://$ES_HOST/_cat/count?v"elsecurl_es "http://$ES_HOST/$idx/_count"fi;;7|搜索|search)echo -n "索引名: "read idxecho -n "搜索关键字: "read keywordif [ -n "$keyword" ]; thencurl_es "http://$ES_HOST/$idx/_search?q=$keyword&size=5&pretty"elsecurl_es "http://$ES_HOST/$idx/_search?size=5&pretty"fi;;8|映射|mapping)echo -n "索引名(回车查看所有): "read idxif [ -z "$idx" ]; thencurl_es "http://$ES_HOST/_all/_mapping?pretty"elsecurl_es "http://$ES_HOST/$idx/_mapping?pretty"fi;;9|设置|setting)echo -n "索引名(回车查看所有): "read idxif [ -z "$idx" ]; thencurl_es "http://$ES_HOST/_all/_settings?pretty"elsecurl_es "http://$ES_HOST/$idx/_settings?pretty"fi;;10|统计|stats)curl_es "http://$ES_HOST/_cluster/stats?human&pretty";;11|任务|task)curl_es "http://$ES_HOST/_tasks?detailed&pretty";;12|模板|template)curl_es "http://$ES_HOST/_cat/templates?v";;13|删除|delete)echo -n "要删除的索引名: "read idxcurl_es -X DELETE "http://$ES_HOST/$idx?pretty";;14|创建|create)echo -n "要创建的索引名: "read idxcurl_es -X PUT "http://$ES_HOST/$idx";;15|别名|alias)curl_es "http://$ES_HOST/_cat/aliases?v";;16|快照|snapshot)curl_es "http://$ES_HOST/_snapshot?pretty";;17|ILM|ilm)curl_es "http://$ES_HOST/_ilm/policy?pretty";;18|SQL|sql)curl_es -X POST "http://$ES_HOST/_sql?format=txt" \-H "Content-Type: application/json" \-d '{"query": "SHOW TABLES"}';;19|状态|status)curl_es "http://$ES_HOST/";;20|帮助|help)show_help;;0|退出|exit|quit)echo "再见!"exit 0;;*)echo "未知命令: $1"echo "输入 'help' 查看可用命令";;esac
}# 主程序
main() {# 如果有参数,直接执行命令if [ $# -gt 0 ]; thenexecute_command "$1"exit 0fi# 交互式模式while true; doecho -e "\n========================================"echo -e "输入命令: \033[1;33m[数字/中文/英文]\033[0m"echo -e "示例: 1 或 索引 或 index"echo -e "输入 help 查看所有命令"echo -e "========================================"echo -n "命令: "read cmdexecute_command "$cmd"done
}# 检查连接
check_connection() {echo "测试ES连接..."if curl_es "http://$ES_HOST" > /dev/null 2>&1; thenecho -e "\033[32m✓ 连接成功\033[0m"elseecho -e "\033[31m✗ 连接失败\033[0m"exit 1fi
}# 启动
check_connection
main "$@"

 

http://www.jsqmd.com/news/444614/

相关文章:

  • 基于微信小程序与SenseVoice-Small的实时语音笔记应用开发
  • 基于CH224的Type-C PD受电端电路设计实战:从协议解析到PCB布局
  • 【技术突破】ncmdump:解决音频格式兼容难题的全栈方案
  • 浪浪山老前端的2025
  • 车载测试CAPL编程实战:高效写入文本文件的技巧与最佳实践
  • 预处理技术揭秘:如何加速病态线性方程组的迭代求解
  • MuJoCo新手必看:从XML配置到PD控制器的完整机器人仿真指南
  • Kubernetes如何自动识别资源瓶颈?
  • Qwen-Image-2512-Pixel-Art-LoRA商业应用:独立设计师接单用像素插画快速交付流程
  • Nunchaku-flux-1-dev企业应用:为内部知识库生成技术架构图解
  • PostgreSQL存储空间优化指南:如何精准分析表和索引占用情况
  • 美胸-年美-造相Z-Turbo效果实测:看看AI能画出多美的人像
  • AI Coder Agent 技术方案研究报告
  • 对ai的想象,是否能完成物理上的任务?
  • Kubernetes如何优化资源使用效率?
  • GNSS-INS松组合导航:从KF-GINS源码看卡尔曼滤波实现
  • 2026年分子筛转轮选购指南:深度解析TOP服务商与选型策略 - 2026年企业推荐榜
  • 2026年贵阳一站式建材公司推荐与选择指南 - 2026年企业推荐榜
  • 梦幻动漫魔法工坊保姆级教程:从安装到生成第一张动漫图
  • gte-base-zh嵌入模型入门实战:信息检索、语义相似度计算场景应用
  • K8s核心原理及注意事项
  • 空论视野下的全球智能治理
  • 【硬件片内测试】基于FPGA的完整QPSK链路测试,含频偏锁定,帧同步,定时点,Viterbi译码,信道,误码统计
  • 2026年最新:不锈钢精密铸造厂家联系电话推荐(附河北光德详细资料) - 品牌推荐
  • 3D 互动实验室:10 款极简小游戏 Prompt 教学
  • 郑州律师电话更新(2026年最新版):刘艳伟律师联系方式公布 - 品牌推荐
  • 【仿真测试】基于FPGA的完整QPSK通信链路实现,含频偏锁定,帧同步,定时点,Viterbi译码,信道,误码统计
  • Obsidian+OpenClaw:9分钟重构AI知识管理,再也不用当“信息搬运工”啦!
  • 尚巨网络18载深耕AI搜索+GEO精准赋能,全链路营销靠谱之选 - 品牌企业推荐师(官方)
  • C++的数组指针的类型