Flink数据流写入Elasticsearch实战
目录
1. 代码结构
2. 代码解析
(1) 主程序入口
(2) 配置 Elasticsearch 集群
(3) 定义 Elasticsearch Sink Function
(4) 添加 Elasticsearch Sink
(5) 执行任务
3. 验证命令
这段代码是一个使用 Apache Flink 将数据流(Event对象)写入 Elasticsearch 的示例。以下是对代码的详细解析和说明:
1. 代码结构
包声明:
package sink
定义了代码所在的包。导入依赖:
导入了必要的 Java 和 Flink 相关类库,包括:java.util:用于使用ArrayList和HashMap。org.apache.flink:Flink 的核心类库。org.apache.flink.streaming.connectors.elasticsearch:Flink 的 Elasticsearch Sink 相关类。org.elasticsearch.client.Requests:Elasticsearch 的请求工具类。
Event类:
定义了一个简单的Event类,包含三个字段:user:用户名称。url:访问的 URL。timestamp:时间戳。
sinkToEs对象:
主程序入口,包含 Flink 流处理逻辑和 Elasticsearch Sink 的配置。
package sink import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests import source.ClickSource case class Event(user:String,url:String,timestamp:Long) /** * * @PROJECT_NAME: flink1.13 * @PACKAGE_NAME: sink * @author: 赵嘉盟-HONOR * @data: 2023-11-20 15:04 * @DESCRIPTION * */ object sinkToEs { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) ) //定义es集群主机列表 val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master",9200)) //定义一个esSinkFunction val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) //包装要发送的http请求 val request = Requests.indexRequest() .index("clicks") //表名 .source(data) //数据 .`type`("event") //类型 //发送请求 requestIndexer.add(request) } } data.addSink(new ElasticsearchSink.Builder[Event](hosts,esFun).build()) //验证命令 //curl 'localhost:9200/_cat/indices?v' //curl 'localhost:9200/clicks/_search?pretty' env.execute("sinkRedis") } }基于scala使用flink将读取到的数据写入到ES
发送完毕后可以使用以下命令进行验证:
curl 'localhost:9200/_cat/indices?v' curl 'localhost:9200/clicks/_search?pretty'2. 代码解析
(1) 主程序入口
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val data = env.fromElements( Event("Mary", "./home", 100L), Event("Sum", "./cart", 500L), Event("King", "./prod", 1000L), Event("King", "./root", 200L) )- 创建 Flink 流处理环境
StreamExecutionEnvironment。 - 使用
fromElements方法生成一个包含 4 个Event对象的流。
(2) 配置 Elasticsearch 集群
val hosts = new util.ArrayList[HttpHost]() hosts.add(new HttpHost("master", 9200))- 创建一个
ArrayList,用于存储 Elasticsearch 集群的主机信息。 - 添加一个 Elasticsearch 节点(
master,端口9200)。
(3) 定义 Elasticsearch Sink Function
val esFun = new ElasticsearchSinkFunction[Event] { override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val data = new util.HashMap[String, String]() data.put(t.user, t.url) val request = Requests.indexRequest() .index("clicks") // 索引名称 .source(data) // 数据 .`type`("event") // 类型 requestIndexer.add(request) } }- 实现
ElasticsearchSinkFunction接口,定义如何将Event对象写入 Elasticsearch。 - 在
process方法中:- 将
Event对象的user和url字段存入HashMap。 - 使用
Requests.indexRequest()创建一个索引请求。 - 指定索引名称(
clicks)、数据源(data)和类型(event)。 - 通过
requestIndexer.add(request)发送请求。
- 将
(4) 添加 Elasticsearch Sink
data.addSink(new ElasticsearchSink.Builder[Event](hosts, esFun).build())- 使用
ElasticsearchSink.Builder构建 Elasticsearch Sink。 - 将 Sink 添加到数据流中。
(5) 执行任务
env.execute("sinkRedis")- 启动 Flink 流处理任务,任务名称为
sinkRedis。
3. 验证命令
查看 Elasticsearch 索引:
bash
curl 'localhost:9200/_cat/indices?v'检查
clicks索引是否创建成功。查询索引数据:
bash
curl 'localhost:9200/clicks/_search?pretty'查看
clicks索引中的数据。
