当前位置: 首页 > news >正文

使用Logstash实现PostgreSQL到Elasticsearch的数据摄取

使用Logstash实现PostgreSQL到Elasticsearch的数据摄取

什么是Logstash?

Logstash是Elastic提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括Elasticsearch、Kafka、平面文件等。

Logstash管道包含三个不同的处理过程:

  • 输入:从中收集数据以进行摄取的数据源
  • 过滤器:使用Grok、Mutate、Date等插件转换(清理、聚合等)数据
  • 输出:摄取的目标(Elasticsearch、平面文件、数据库等)

以下是使用Logstash将数据发送到Elasticsearch的先决条件:

  • 系统上安装了Logstash和Postgres的JDBC驱动程序
  • 具有要同步的表或函数的Postgres数据库
  • 正在运行的Elasticsearch实例

Logstash设置(Windows版)

以下是本地安装和运行Logstash的简要步骤。

1. 安装Java

从官方Oracle网站下载JDK包(Java 8或更高版本)。下载完成后,将文件解压缩到首选位置。

解压缩文件后,需要添加环境变量以便系统识别Java命令。

转到环境变量,添加一个名为JAVA_HOME的新变量,并将其指向Java文件所在的目录。将%JAVA_HOME%\bin附加到路径中。

要验证安装是否成功,请转到命令提示符并运行以下命令:

java -version

如果一切设置正确,它将显示Java版本。

2. 安装Logstash

要安装Logstash,请从官方Elastic网站下载包,并将其解压缩到首选位置。

要在本地测试,请打开命令提示符,导航到Logstash文件夹中的bin文件夹,并运行以下命令:

logstash -e "input { stdin {} } output { stdout {} }"

Logstash摄取管道

1. 安装所需的JDBC驱动程序

从官方PostgreSQL网站下载Postgres驱动程序。将jar文件放在可访问的位置。

2. 创建Logstash管道

以下是示例管道:

input {jdbc {jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"jdbc_driver_class => "org.postgresql.Driver"jdbc_connection_string => "${JDBC_HOST}"jdbc_user => "${DB_USER}"jdbc_password => "${DB_PWD}"jdbc_paging_enabled => truejdbc_page_size => 1000schedule => "* * * * *"  # 计划每分钟运行一次statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"last_run_metadata_path => "c:/logstash/employee.tracker"}
}filter {mutate {remove_field => ["date", "@timestamp", "host"]}# 如果需要解析JSON字段的示例json {source => "first_name"target => "name"}
}output {stdout { codec => json_lines }elasticsearch {hosts => ["http://localhost:9200"]index => "my_table_index"custom_headers => {"Authorization" => "${AUTH_KEY}"}document_id => "%{table_id}" # 表中的唯一标识符timeout => 120}
}

上述管道用于增量摄取。这意味着它会跟踪最后一次运行,并从最后一次运行开始获取记录,按照计划摄取数据。

以下是使用的关键概念:

输入

  • jdbc_driver_library - JDBC驱动程序文件(.jar)的存储位置
  • jdbc_driver_class - 正在使用的驱动程序类
  • jdbc_connection_string - postgres数据库连接字符串
  • jdbc_user - 数据库用户名
  • jdbc_password - 用户的数据库密码
  • paging - 数据将以多页形式发送,每页大小为1000。这将提高管道的性能,并有助于跟踪发送到Elasticsearch的记录数
  • schedule - 上述管道计划每分钟运行一次。以下是计划的格式:
  • statement - 管道将执行的SQL语句。要执行复杂的语句,可以将其保存在单独的.sql文件中,并将文件路径提及到statement_filepath而不是statement。最好使用视图或物化视图而不是具有复杂连接的查询。

最后一部分用于增量摄取:

use_column_value => true
tracking_column => "updated_dt"
tracking_column_type => "timestamp"
last_run_metadata_path => "c:/project/logstash/date.tracker"
  • use_column_value设置为true。它让Logstash知道跟踪在tracking_column中使用的列updated_at的实际值,而不是使用上次运行查询的时间。在这种情况下,:sql_last_value将使用updated_dt值。
  • 如果设置为false,Logstash将使用上次查询执行时间作为:sql_last_value
  • 最后一次运行时间将保存在last_run_metadata_path中提到的文件中。它将用于跟踪管道最后一次运行的时间。

过滤器
这是一个可选部分,用于在将数据发送到目标之前操作数据。

在上述管道中,日期字段正在从摄取中删除。此外,它还将数据中的first_name发送到目标中的name字段。

输出
此部分定义数据的目标。在这种情况下,它是Elasticsearch端点、授权密钥(如果有)、elastic索引、document_id。document_id是索引中elastic文档的唯一标识符。如果未提及此字段,Elasticsearch将自动为文档分配唯一标识符。

在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch将在索引中查找此字段;如果匹配,它将更新同一文档。

如果未定义该字段,它将在索引中创建一个新文档,从而导致重复记录。

运行管道

要运行此管道,请打开命令提示符,转到Logstash文件夹,并运行以下命令:

bin/logstash -f c:/logstash/sample_pipeline.conf

以下是管道的输出。

来自Elasticsearch索引的输出。

{"took": 1,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 3,"relation": "eq"},"max_score": 1.0,"hits": [{"_index": "testing","_id": "1","_score": 1.0,"_source": {"name": "James","id": 1,"last_name": "Smith","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.167442600Z"}},{"_index": "testing","_id": "2","_score": 1.0,"_source": {"name": "John","id": 2,"last_name": "Doe","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.169021400Z"}},{"_index": "testing","_id": "3","_score": 1.0,"_source": {"name": "Kate","id": 3,"last_name": "Williams","updated_dt": "2024-12-12T16:10:57.349Z","@version": "1","@timestamp": "2025-06-25T20:41:02.170098800Z"}}]}
}

这种方法有几个优点:

  • Logstash是一个开源工具,易于实现。
  • 有200多个可用于数据转换的插件。使用这些插件,可以使用过滤器解析和转换数据。
  • 它是数据源和Elasticsearch之间的解耦架构。
  • 与Elasticsearch无缝集成。

尽管这是一种开源的简单实现方法,但它也有一些缺点:

  • 延迟问题:对于需要极低延迟或实时数据的应用程序来说,它并不理想。随着管道的增长,加载、转换/过滤和发送数据需要时间。
  • 错误处理:除非明确监控,否则很难跟踪错误,这可能导致数据丢失。
  • 如果管道定义不当,可能会产生重复项。
  • 与其他工具相比,启动时间更长。
  • 它使用YAML风格的配置文件,这使其变得复杂且难以维护。
  • 资源利用:在重负载和复杂管道的情况下,它可能利用更多资源。

如果某人正在寻找更强大和集中化的数据流管道,可以使用上述管道。它不适用于实时数据传送。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)

公众号二维码

公众号二维码

http://www.jsqmd.com/news/50415/

相关文章:

  • 不丢帧、低延迟!图像采集卡的 5 步工作原理,看懂就是专家
  • 2025年封闭母线槽优质厂家权威推荐榜单:耐火母线槽/防水母线槽/空气型母线槽源头厂家精选
  • 2025年服装整烫专用设备定做厂家权威推荐榜单:服装小型整烫设备/服装隧道整烫设备/仙桃服装整烫设备源头厂家精选
  • 2025年数据分类分级产品选型排名与深度解析:可视化、自适应、一键部署成关键能力
  • 2025年国内一键部署、持久稳定的AI赋能的API数据接口安全厂商排名
  • Spring Data JPA 最佳实践【1/2】:实体设计指南
  • 轻量化、全链路、可溯源的医疗行业API安全最佳实践与案例
  • 数据库风险监测系统建设理论研究:从规范落地到智能化防御的全周期体系
  • 官网发布|智感未来-聚链共生-2026中国激光雷达大会暨展览会/火热招展中!!!
  • 2025年11月呼叫中心系统品牌推荐评测报告:从稳定性到AI能力的解决方案剖析
  • 2025广州最大的留学中介是哪家
  • Python入门:从零开始你的编程之旅
  • 2025北京申请留学机构哪家好
  • 2025年11月智能客服机器人服务商推荐热度榜:基于性能指标的结果承诺保障方案
  • 2025 年 11 月换热器厂家权威推荐榜:烟气气气换热器/烟气气水换热器,高效节能与耐用性深度解析及选购指南
  • QQueue队列
  • 2025年三集一体除湿热泵机组选购指南及厂家推荐,目前三集一体除湿热泵机组直销厂家联系电话精选实力品牌
  • 2025年11月取暖器品牌推荐评测报告:从稳定性到AI能力的解决方案剖析
  • 2025年11月数据标注平台推荐评测报告:从安全部署到智能辅助解决方案剖析
  • 2025年哈尔滨自闭症康复机构权威推荐榜单:孤独症/发育迟缓/发育落后源头机构精选
  • 2025年11月小程序定制开发公司推荐评测报告:从稳定性到AI能力的解决方案剖析
  • HarmonyOS自动化测试与持续集成实战指南 - 教程
  • 2025年远程高能点火器实力厂家权威推荐榜单:遥控高能点火器/防爆高能点火器/便携式高能点火器源头厂家精选
  • 如何在谷歌Chrome浏览器加载 Activex控件?
  • 2025年11月审计报告事务所推荐榜单:综合实力与专业服务对比分析
  • 2025年11月审计报告事务所推荐榜单:知名机构综合对比与选择指南
  • 2025年11月审计报告事务所推荐:权威榜单与选择指南
  • 腾讯云TBDS与CDH迁移常见问题有哪些?建议由CDH迁移到CMP 7.13 平台(类Cloudera CDP,如华为鲲鹏 ARM 版)
  • 2025年矿棉隔音棉源头厂家权威推荐榜单:玻璃纤维隔音棉/防火隔音棉/高密度隔音棉源头厂家精选
  • php Http请求 GET方式