当前位置: 首页 > news >正文

Flink2.1.1-传感器温度计算示例

安装

Flink2.1.1 docker安装

Java代码示例

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.flink</groupId><artifactId>flink-sensor</artifactId><version>1.0.0</version><name>flink-sensor</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>2.1.1</flink.version><scala.binary.version>2.12</scala.binary.version><log4j.version>2.24.3</log4j.version><commons-math3.version>3.6.1</commons-math3.version><lombok.version>1.18.26</lombok.version></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><!-- Flink Core and Streaming Dependencies --><!-- 作用域设为 provided, 因为这些库在 Flink 集群上已经存在 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><!-- Flink Client Dependency (用于本地执行和提交作业) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Log4j Dependencies for local execution --><!-- 作用域设为 compile, 这样在本地 IDE 运行时日志才能正常工作 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>${commons-math3.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- 统一打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><transformers><!-- 避免 META-INF 冲突 --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.flink.AverageSensorReadings</mainClass></transformer></transformers><!-- 可选:把签名去掉,防止非法包 --><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude>//会和Flink集群自带的 Kryo/Objenesis冲突。<exclude>com/esotericsoftware/kryo/**</exclude><exclude>org/objenesis/**</exclude><exclude>META-INF/versions/9/org/objenesis/**</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

代码

SensorReading模型类

package com.example.flink.model;import lombok.Data;import java.io.Serializable;/*** 传感器读数数据模型 (POJO)* Flink 可以识别并高效处理 POJO。*/
@Data
public class SensorReading implements Serializable {public String sensorId;public long timestamp; // 事件时间戳 (毫秒)public double temperature;// Flink 要求一个无参构造函数public SensorReading() {}public SensorReading(String sensorId, long timestamp, double temperature) {this.sensorId = sensorId;this.timestamp = timestamp;this.temperature = temperature;}@Overridepublic String toString() {return "SensorReading{" +"sensorId='" + sensorId + '\'' +", timestamp=" + timestamp +", temperature=" + temperature +'}';}
}

AverageSensorReadings类

package com.example.flink;import com.example.flink.model.SensorReading;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Flink 作业主类:计算每个传感器在5秒滚动窗口内的平均温度。* 这个版本使用自定义的数据源,不需要外部输入。*/
public class AverageSensorReadings {public static void main(String[] args) throws Exception {// 1. 设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建自定义数据源:模拟生成传感器数据DataStream<SensorReading> sensorDataStream = env.addSource(new SensorSourceGenerator()).setParallelism(1); // 为了让时间戳和水位线有序,先将并行度设为1// 3. 分配时间戳和生成水位线DataStream<SensorReading> withTimestampsAndWatermarks = sensorDataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> event.timestamp));// 4. 执行计算:按键分组、开窗、聚合DataStream<Tuple3<String, Long, Double>> averageTemperatures = withTimestampsAndWatermarks.keyBy(reading -> reading.sensorId) // 按传感器ID分组.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) // 定义5秒的滚动事件时间窗口.aggregate(new TemperatureAggregator()); // 使用自定义聚合函数// 5. 输出结果到控制台averageTemperatures.print();// 6. 执行作业env.execute("Average Sensor Readings Job (Self-Contained)");}/*** 自定义数据源,模拟生成传感器数据*/public static class SensorSourceGenerator implements SourceFunction<SensorReading> {private volatile boolean isRunning = true;private final Random random = new Random();@Overridepublic void run(SourceFunction.SourceContext<SensorReading> ctx) throws Exception {// 模拟的传感器ID列表String[] sensorIds = {"sensor_1", "sensor_2", "sensor_3"};while (isRunning) {for (String sensorId : sensorIds) {// 生成一个传感器读数SensorReading reading = new SensorReading(sensorId,System.currentTimeMillis(), // 使用当前系统时间作为事件时间60 + random.nextGaussian() * 20 // 生成一个围绕60度的随机温度);// 将数据发送到下游ctx.collect(reading);}// 每秒生成一批数据TimeUnit.MILLISECONDS.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}}/*** 自定义聚合函数,用于计算平均温度* IN:  SensorReading (输入类型)* ACC: Tuple3<Double, Long, String> (累加器类型: <温度总和, 计数, 传感器ID>)* OUT: Tuple3<String, Long, Double> (输出类型: <传感器ID, 窗口结束时间, 平均温度>)*/public static class TemperatureAggregator implements AggregateFunction<SensorReading, Tuple3<Double, Long, String>, Tuple3<String, Long, Double>> {@Overridepublic Tuple3<Double, Long, String> createAccumulator() {return Tuple3.of(0.0, 0L, "");}@Overridepublic Tuple3<Double, Long, String> add(SensorReading reading, Tuple3<Double, Long, String> accumulator) {if (accumulator.f2.isEmpty()) {accumulator.f2 = reading.sensorId;}return Tuple3.of(accumulator.f0 + reading.temperature, accumulator.f1 + 1, accumulator.f2);}@Overridepublic Tuple3<String, Long, Double> getResult(Tuple3<Double, Long, String> accumulator) {// 为了简化,这里窗口时间戳输出为0return Tuple3.of(accumulator.f2, 0L, accumulator.f0 / accumulator.f1);}@Overridepublic Tuple3<Double, Long, String> merge(Tuple3<Double, Long, String> a, Tuple3<Double, Long, String> b) {return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2);}}
}

运行示例

上传sensor的jar包到flink

upload_sensor_jar

sensor任务

sensor_job

sensor任务日志

sensor_task_log

http://www.jsqmd.com/news/119382/

相关文章:

  • 【AI摄影革命】:Open-AutoGLM如何重新定义标准证件照生产流程
  • 告别反复重拍,Open-AutoGLM让你一次过审:国家级证件照生成实战解析
  • 心电信号ECG去噪:Matlab实现低通滤波与小波分解结合
  • 【医疗AI新突破】:Open-AutoGLM如何实现个性化用药提醒?
  • 【AI+医疗新突破】:Open-AutoGLM实现秒级挂号预约的5个关键步骤
  • YOLOv11 改进 - C2PSA | C2PSA融合DiffAttention差分注意力:轻量级差分计算实现高效特征降噪,提升模型抗干扰能力
  • 2025年度微动开关实力厂家推荐榜单,电动推杆微动开关/小型微动开关/微动开关/汽车微动开关/微动开关订制厂家推荐榜单 - 品牌推荐师
  • 【企业级保险监控方案】:基于Open-AutoGLM的7×24小时到期预警系统搭建
  • JavaSE——方法注意事项
  • JavaSE——方法注意事项
  • 体检报告查询进入AI时代:Open-AutoGLM究竟带来了哪些颠覆性变革?
  • Open-AutoGLM收益查询避坑指南(资深工程师亲授6大核心要点)
  • 12.21 模拟赛
  • 从语音到纪要全自动,Open-AutoGLM让会议效率提升8倍,你用了吗?
  • Open-AutoGLM实战指南:7步搭建企业级智能会议纪要系统
  • Flink2.1.1-WordCount示例
  • 【Open-AutoGLM收益监控终极方案】:5分钟搭建实时收益提醒系统
  • Flink2.1.1-docker安装
  • Open-AutoGLM会议纪要黑科技(90%团队还不知道的AI提效神器)
  • 校园IT负责人必看:Open-AutoGLM如何解决传统预约系统的4大痛点?
  • Open-AutoGLM用药提醒实战指南:5步搭建专属健康守护系统
  • JavaSE——带返回值的方法
  • 当 LinkedList 不是列表时,速度快的兔子都追不上!
  • 【Open-AutoGLM会议纪要生成全攻略】:3大核心技术揭秘与落地实践
  • 揭秘2025:国内全自动粘钉一体机一线厂家实力推荐榜,技术好的全自动粘钉一体机解决方案与实力解析 - 品牌推荐师
  • 【行业首曝】Open-AutoGLM高并发场景压测报告:支撑万级并发预约的底层逻辑
  • Java 岗面试 99 题 (含答案):JVM+Spring+MySQL+ 线程池 + 锁
  • 2025年大连值得信赖的BIP企业排行,人力云/好业财/协同云/税务云/好会计/财务云/易代账/供应链云/好生意BIP服务商选哪家 - 品牌推荐师
  • hot100 238.除自身以外的数组的乘积
  • 网络调试助手链接服务器