当前位置: 首页 > news >正文

Flink2.1.1-Kafka写入Elasticsearch7

安装

Flink2.1.1 docker安装

Java代码示例

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.flink</groupId><artifactId>flink-kafka-es7-sql</artifactId><version>1.0.0</version><name>flink-kafka-es7-sql</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>2.1.1</flink.version><scala.binary.version>2.12</scala.binary.version><log4j.version>2.24.3</log4j.version><commons-math3.version>3.6.1</commons-math3.version><lombok.version>1.18.26</lombok.version></properties><dependencies><!-- Flink Streaming 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink 客户端依赖,用于本地执行 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!--        本地启动需要--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_2.12</artifactId>--><!--            <version>2.1.1</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>4.0.1-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>4.0.0-2.0</version></dependency><!-- Flink JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>${commons-math3.version}</version></dependency></dependencies><build><plugins><!-- Java 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- 统一打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><transformers><!-- 避免 META-INF 冲突 --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.flink.KafkaEs7Sql</mainClass></transformer></transformers><!-- 可选:把签名去掉,防止非法包 --><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude>//会和Flink集群自带的 Kryo/Objenesis冲突。<exclude>com/esotericsoftware/kryo/**</exclude><exclude>org/objenesis/**</exclude><exclude>META-INF/versions/9/org/objenesis/**</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

代码

package com.example.flink;import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Flink SQL application to consume from Kafka and count files per userId*/
public class KafkaEs7Sql {public static void main(String[] args) throws Exception {// 1. 创建流执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//每 5 秒触发一次env.enableCheckpointing(5000);// 精确一次env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);// 60 s 超时env.getCheckpointConfig().setCheckpointTimeout(60000);// 禁止并发,降低 IO 峰刺env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 两次 CP 至少间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 作业取消后保留 CP,便于手工恢复env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);// 2. 创建表环境final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/* 2. 源表(Processing Time 窗口无需事件时间字段) */tableEnv.executeSql("CREATE TABLE test_file_source (" +"  userId STRING," +"  type STRING," +"  fileType STRING," +"  fileUrl STRING," +"  rlsFileList ARRAY<ROW<fileUrl STRING, filePath STRING, fileType STRING>>," +"  shootTime BIGINT," +"  uploadTime BIGINT," +"  location STRING," +"  duration BIGINT," +"  pt AS PROCTIME()" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_file_topic'," +"  'properties.bootstrap.servers' = 'kk.kk.kk.kk:9092,kk.kk.kk.kk:9092,kk.kk.kk.kk:9092'," +"  'format' = 'json'," +"  'scan.startup.mode' = 'earliest-offset'" +")");tableEnv.executeSql("CREATE TABLE es_sink (" +"  id STRING," +"  userId STRING," +"  total BIGINT," +"  proTime TIMESTAMP(3)," +"  PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +"  'connector' = 'elasticsearch-7'," +"  'hosts' = 'http://es.es.es.es:9200'," +"  'index' = 'test_file_index7'," +"  'document-id.key-delimiter' = '_'," +"  'format' = 'json'," +// 【修改点】使用有效的值 at-least-once"  'sink.delivery-guarantee' = 'at-least-once'" +")");/* 4. 30 秒窗口统计 + 当前时间作为 timestamp */TableResult tableResult = tableEnv.executeSql("INSERT INTO es_sink " +"SELECT " +"  UUID() AS id, " +"  userId, " +"  COUNT(*) AS total, " +"  CURRENT_TIMESTAMP AS proTime " +"FROM test_file_source " +"GROUP BY " +"  userId, " +"  TUMBLE(pt, INTERVAL '30' SECOND)");tableResult.await();}
}

运行示例

上传KafkaEs7Sql的jar包到flink

upload_kafka_es7_jar

写入ES数据

test_file_index7_search

查看ES数据结构

{"test_file_index7" : {"mappings" : {"properties" : {"id" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"proTime" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"total" : {"type" : "long"},"userId" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}}}}}
}

备注

查看es数据

#查看数据
http://es.es.es.es:9200/test_file_index7/_search?pretty
#查看结构
http://es.es.es.es:9200/test_file_index7/_mapping?pretty
http://www.jsqmd.com/news/119436/

相关文章:

  • python: 安装使用celery
  • 【Open-AutoGLM任务分配核心机密】:揭秘企业级自动化调度背后的算法逻辑
  • django基于数据挖掘的微博事件分析与可视化系统的设计与实现演示录像2023_u9nmf-vue
  • Open-AutoGLM即将开幕:你不可错过的5大前沿议题与参会价值
  • 读懂HikariCP一百行代码,多线程就是个孙子
  • 为什么顶尖团队都在用Open-AutoGLM做月报?背后的数据逻辑首次公开
  • 别让“小眼镜”挡住清晰世界!儿童近视防控,家长必知的科学指南
  • JavaSE——面向对象思想的应用
  • 好写作AI:你的学位论文理论框架,是“导航图”还是“理论陈列馆”?
  • 单北斗GNSS在大坝形变监测中的应用与性能分析
  • Open-AutoGLM会议调度秘籍(企业级应用案例曝光)
  • 证件照合格率低?Open-AutoGLM智能预检系统上线,审核通过率翻倍
  • 注意:雪花算法并不是ID的唯一选择!
  • 2025年印刷机市场新品排行榜,印刷开槽模切机/全伺服前缘送纸印刷开槽模切联动线/高速全自动水墨印刷开槽模切机印刷机订制厂家口碑推荐榜 - 品牌推荐师
  • 揭秘Open-AutoGLM自动汇总技术:如何3分钟生成高质量团队周报
  • 大厂面试真题解析:java 集合 +spring+ 并发编程 +MyBatis
  • 错过Open-AutoGLM等于落后3年?AI驱动会议管理的终极解决方案
  • 为什么你的Open-AutoGLM项目总延期?深度剖析进度监控缺失的4大痛点
  • BAT 大厂 java 程序员面试必问:JVM+Spring+ 分布式 +tomcat+MyBatis
  • 抓 https 加密数据,偷偷摸摸爽得很!
  • 绝杀峡谷源码 副图 通达信 贴图
  • Open-AutoGLM周报自动化落地全路径(从部署到高阶调优)
  • 使用systemd,把服务装进 Linux 心脏里~
  • Open-AutoGLM周报引擎实战指南(AI驱动办公新革命)
  • SMP语言基础知识-应用系统,开发的痛点,开发者的痛点
  • 高手,云集在于REST、gRPC 和 GraphQL之间!
  • Open-AutoGLM工作流监控实战指南(实时可视化监控体系搭建全解析)
  • 各大互联网公司面经分享:Java 全栈知识 +1500 道大厂面试真题
  • Open-AutoGLM数据统计实战:5步教你精准提取月报核心指标
  • 还在手动管理日程?用Open-AutoGLM实现全自动提醒,效率翻倍!