当前位置: 首页 > news >正文

Flink数据流写入Elasticsearch实战

目录

1. 代码结构

2. 代码解析

(1) 主程序入口

(2) 配置 Elasticsearch 集群

(3) 定义 Elasticsearch Sink Function

(4) 添加 Elasticsearch Sink

(5) 执行任务

3. 验证命令


这段代码是一个使用 Apache Flink 将数据流(Event对象)写入 Elasticsearch 的示例。以下是对代码的详细解析和说明:

1. 代码结构

  • 包声明package sink
    定义了代码所在的包。

  • 导入依赖
    导入了必要的 Java 和 Flink 相关类库,包括:

    • java.util:用于使用ArrayListHashMap
    • org.apache.flink:Flink 的核心类库。
    • org.apache.flink.streaming.connectors.elasticsearch:Flink 的 Elasticsearch Sink 相关类。
    • org.elasticsearch.client.Requests:Elasticsearch 的请求工具类。
  • Event
    定义了一个简单的Event类,包含三个字段:

    • user:用户名称。
    • url:访问的 URL。
    • timestamp:时间戳。
  • sinkToEs对象
    主程序入口,包含 Flink 流处理逻辑和 Elasticsearch Sink 的配置。

package sink import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests import source.ClickSource case class Event(user:String,url:String,timestamp:Long) /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-20 15:04 * @DESCRIPTION * */ object sinkToEs { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) ) //定义es集群主机列表 val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master",9200)) //定义一个esSinkFunction val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) //包装要发送的http请求 val request = Requests.indexRequest() .index("clicks") //表名 .source(data) //数据 .`type`("event") //类型 //发送请求 requestIndexer.add(request) } } data.addSink(new ElasticsearchSink.Builder[Event](hosts,esFun).build()) //验证命令 //curl 'localhost:9200/_cat/indices?v' //curl 'localhost:9200/clicks/_search?pretty' env.execute("sinkRedis") } }

基于scala使用flink将读取到的数据写入到ES

发送完毕后可以使用以下命令进行验证:

curl 'localhost:9200/_cat/indices?v' curl 'localhost:9200/clicks/_search?pretty'

2. 代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) )
  • 创建 Flink 流处理环境StreamExecutionEnvironment
  • 使用fromElements方法生成一个包含 4 个Event对象的流。
(2) 配置 Elasticsearch 集群
val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master", 9200))
  • 创建一个ArrayList,用于存储 Elasticsearch 集群的主机信息。
  • 添加一个 Elasticsearch 节点(master,端口9200)。
(3) 定义 Elasticsearch Sink Function
val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) val request = Requests.indexRequest() .index("clicks") // 索引名称 .source(data) // 数据 .`type`("event") // 类型 requestIndexer.add(request) } }
  • 实现ElasticsearchSinkFunction接口,定义如何将Event对象写入 Elasticsearch。
  • process方法中:
    • Event对象的userurl字段存入HashMap
    • 使用Requests.indexRequest()创建一个索引请求。
    • 指定索引名称(clicks)、数据源(data)和类型(event)。
    • 通过requestIndexer.add(request)发送请求。
(4) 添加 Elasticsearch Sink
data.addSink(new ElasticsearchSink.Builder[Event](hosts, esFun).build())
  • 使用ElasticsearchSink.Builder构建 Elasticsearch Sink。
  • 将 Sink 添加到数据流中。
(5) 执行任务
env.execute("sinkRedis")
  • 启动 Flink 流处理任务,任务名称为sinkRedis

3. 验证命令

  • 查看 Elasticsearch 索引

    bash

    curl 'localhost:9200/_cat/indices?v'

    检查clicks索引是否创建成功。

  • 查询索引数据

    bash

    curl 'localhost:9200/clicks/_search?pretty'

    查看clicks索引中的数据。

http://www.jsqmd.com/news/888035/

相关文章:

  • 2026年比较好的四川卤味火锅底料/四川美蛙鱼火锅底料/牛油火锅底料优质公司推荐 - 行业平台推荐
  • Edge/Chrome浏览器必备:Tampermonkey油猴插件安装与脚本管理全攻略(含备份技巧)
  • 2026年热门的南充互联网网络推广/南充网络推广/南充网络推广运营优质公司推荐 - 行业平台推荐
  • 构建非侵入式智能帮助系统:三层感知架构与无感集成实践
  • Visual Studio 项目属性页开发完全教程:从基础到高级
  • 2026年比较好的青椒火锅底料/牛油火锅底料/番茄火锅底料主流厂家对比评测 - 品牌宣传支持者
  • 基于U-Net与匹配滤波的高光谱甲烷泄漏AI检测系统实践
  • AI智能体开发与上线
  • Burp Suite本地测试环境从零搭建实战指南
  • 2026年口碑好的定制数码印刷机/彩色数码印刷机/电子油墨数码印刷机/广州布料数码印刷机厂家对比推荐 - 品牌宣传支持者
  • 【ChatGPT】美国泛林集团Sabre® 系列水平镀铜设备深度拆解、爆炸图10张、信息图10张、C++代码框架
  • 避坑指南:树莓派4B编译FFmpeg支持H.264硬编时,我遇到的‘OMX_Core.h not found’等错误全解决
  • 别再乱用USB转串口了!手把手教你用Python直连山特UPS(C3K型号)读取实时数据
  • Visual Studio 项目系统依赖解析机制深度剖析:PackageReference 与 ProjectReference
  • 保姆级教程:在ArcGIS Pro插件中集成你的自定义工具箱(以‘消除重复要素’为例)
  • Flink数据流分布式写入文件实战
  • 数据标注:外包还是自建团队?成本对比与实战分析
  • KouShare-dl终极指南:10个高效下载蔻享学术视频的实用技巧
  • Apache Fesod终极指南:3大策略破解百万级Excel数据内存瓶颈
  • Kandan实时通信技术揭秘:Faye WebSocket与消息广播机制
  • Archon Specs:用约束性规范与实时验证消除AI代码生成中的幻觉问题
  • 全国职业院校技能大赛-心得+环境代码全资源
  • ARMv8缓存维护指令详解与优化实践
  • Nitronic50不锈钢厂商那家好?推荐几家Nitronic50线材国内厂商 - 品牌2025
  • Unity AndroidWebView模块:安卓原生WebView深度接管指南
  • Wireshark 3.6.3 Windows安装全指南:VC++运行库与Npcap驱动避坑详解
  • Qwen3-Coder-30B-A3B-Instruct-FP8部署指南:本地与云端最佳实践
  • 为Chromebook和树莓派打造的VS Code社区构建版本完全指南:终极安装与使用教程
  • CP_AutoSar目录(更新中....)
  • 魔兽地图转换工具:轻松实现地图格式转换与版本兼容