当前位置: 首页 > news >正文

实时数据处理实战:使用 Apache Flink 消费 Kafka 数据并进行窗口聚合

在大数据时代,实时处理流式数据已经成为企业级应用的标配。无论是用户行为分析、实时监控告警,还是金融风控系统,都离不开低延迟、高吞吐的流处理引擎。本文将带你从零开始,使用Apache FlinkKafka构建一个完整的实时数据处理流水线,涵盖数据模拟、流式消费、窗口聚合及结果输出,所有代码均可直接运行。

为什么选择 Flink + Kafka?

  • Kafka:分布式消息队列,擅长缓冲和分发海量实时数据,保证数据可靠性和顺序性。

  • Flink:领先的流处理框架,支持事件时间处理、精确一次语义(Exactly-Once)、毫秒级延迟和复杂状态管理。

  • 两者结合,可以轻松搭建生产级的实时数仓或监控系统。

实战场景

我们模拟一个电商网站的用户点击流:每个用户点击商品时产生一条 JSON 日志,包含userIdproductIdeventType(click/buy)、timestamp。我们需要每隔 1 分钟统计一次所有商品的点击量(PV),并将结果实时写入 Elasticsearch 便于可视化。

环境准备

为了专注代码逻辑,我们使用 Docker 快速启动 Kafka 和 Elasticsearch。Flink 程序可在本地 IDE 运行(或提交到集群)。

1. 启动 Kafka 和 ES

创建docker-compose.yml

yaml

version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ports: - "9092:9092" elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0 environment: - discovery.type=single-node - xpack.security.enabled=false ports: - "9200:9200"

运行:docker-compose up -d

2. 创建 Maven 项目

新建一个 Java 项目,pom.xml关键依赖:

xml

<properties> <flink.version>1.17.1</flink.version> <kafka.version>3.4.0</kafka.version> <jackson.version>2.15.2</jackson.version> </properties> <dependencies> <!-- Flink 核心依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Kafka 连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- JSON 序列化 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!-- Elasticsearch 8.x 连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch8</artifactId> <version>${flink.version}</version> </dependency> <!-- 日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.9</version> </dependency> </dependencies>

第一步:模拟数据生产者

我们编写一个独立的 Kafka 生产者,每秒随机生成 100~500 条点击事件,发送到click-events主题。

java

import org.apache.kafka.clients.producer.*; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; public class ClickEventProducer { private static final String TOPIC = "click-events"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final Random random = new Random(); private static final ObjectMapper mapper = new ObjectMapper(); public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { long userIdCounter = 0; while (true) { int eventsThisSecond = 100 + random.nextInt(401); // 100-500 条/秒 for (int i = 0; i < eventsThisSecond; i++) { String event = generateEvent(userIdCounter++); producer.send(new ProducerRecord<>(TOPIC, event)); } TimeUnit.SECONDS.sleep(1); } } } private static String generateEvent(long userId) { long now = System.currentTimeMillis(); ObjectNode json = mapper.createObjectNode(); json.put("userId", userId % 100000); // 模拟 10 万用户 json.put("productId", random.nextInt(500)); json.put("eventType", random.nextBoolean() ? "click" : "buy"); json.put("timestamp", now); return json.toString(); } }

运行这个类之前,先在 Kafka 中创建主题(生产环境一般自动创建,也可手动):

bash

docker exec -it <kafka-container-id> kafka-topics --create --topic click-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

第二步:定义数据实体和反序列化器

Flink 从 Kafka 消费的是二进制数据,我们需要将其解析为 POJO。这里定义一个简单的ClickEvent类:

java

import java.sql.Timestamp; public class ClickEvent { public long userId; public int productId; public String eventType; public long timestamp; // 毫秒 public ClickEvent() {} // 必须有无参构造 public ClickEvent(long userId, int productId, String eventType, long timestamp) { this.userId = userId; this.productId = productId; this.eventType = eventType; this.timestamp = timestamp; } @Override public String toString() { return String.format("ClickEvent{user=%d, product=%d, type=%s, ts=%s}", userId, productId, eventType, new Timestamp(timestamp)); } }

反序列化器,使用 Jackson 将 JSON 字符串转成ClickEvent

java

import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; public class ClickEventDeserializer implements DeserializationSchema<ClickEvent> { private static final ObjectMapper mapper = new ObjectMapper(); @Override public ClickEvent deserialize(byte[] message) throws IOException { return mapper.readValue(message, ClickEvent.class); } @Override public boolean isEndOfStream(ClickEvent nextElement) { return false; } @Override public TypeInformation<ClickEvent> getProducedType() { return TypeInformation.of(ClickEvent.class); } }

第三步:编写 Flink 核心作业

这是文章的核心——实时统计每分钟每个商品的点击次数(PV),并写入 Elasticsearch。

3.1 构建执行环境并设置时间特性

java

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import java.time.Duration; import java.util.Properties; public class RealTimePVJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 为了方便演示,设置并行度为1 // 设置 Kafka 消费属性 Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); kafkaProps.setProperty("group.id", "flink-pv-group"); // 创建 Kafka 消费者 FlinkKafkaConsumer<ClickEvent> consumer = new FlinkKafkaConsumer<>( "click-events", new ClickEventDeserializer(), kafkaProps ); // 为了处理乱序数据,设置最大允许延迟 5 秒 WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.timestamp); DataStream<ClickEvent> clickStream = env .addSource(consumer) .assignTimestampsAndWatermarks(watermarkStrategy); // 后续操作... env.execute("Real-time PV Statistics"); } }

3.2 计算每分钟每个商品的点击量

我们需要按productId分组,然后在 1 分钟的滚动窗口内统计click事件的数量。

java

import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; // 接上面代码 DataStream<Tuple2<Integer, Long>> productClickCounts = clickStream .filter(event -> "click".equals(event.eventType)) // 只统计点击 .map(new MapFunction<ClickEvent, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(ClickEvent event) { return Tuple2.of(event.productId, 1L); } }) .keyBy(tuple -> tuple.f0) // 按 productId 分组 .window(TumblingEventTimeWindows.of(Time.minutes(1))) .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

或者使用更清晰的sum聚合:

java

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; DataStream<Tuple2<Integer, Long>> counts = clickStream .filter(e -> "click".equals(e.eventType)) .map(e -> Tuple2.of(e.productId, 1L)) .returns(Types.TUPLE(Types.INT, Types.LONG)) .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .sum(1);

3.3 将结果输出到控制台(调试用)

java

counts.print();

3.4 写入 Elasticsearch(生产常用)

Flink 提供了 Elasticsearch 8 的 Sink。我们定义一个ElasticsearchSink,将每个窗口的统计结果以 JSON 形式存入索引product_pv

java

import org.apache.flink.streaming.connectors.elasticsearch8.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch8.RequestIndexer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.HashMap; import java.util.Map; public static ElasticsearchSink<Tuple2<Integer, Long>> createESSink() { // ES 服务器地址 HttpHost httpHost = new HttpHost("localhost", 9200, "http"); ElasticsearchSink.Builder<Tuple2<Integer, Long>> esBuilder = new ElasticsearchSink.Builder<>(java.util.Collections.singletonList(httpHost), (Tuple2<Integer, Long> element, RuntimeContext ctx, RequestIndexer indexer) -> { Map<String, Object> json = new HashMap<>(); json.put("productId", element.f0); json.put("clickCount", element.f1); json.put("windowEnd", System.currentTimeMillis()); // 可改用窗口结束时间 IndexRequest indexRequest = Requests.indexRequest() .index("product_pv") .source(json); indexer.add(indexRequest); }); // 可选:配置批量写入参数 esBuilder.setBulkFlushMaxActions(10); return esBuilder.build(); } // 在 main 方法中添加: counts.sinkTo(createESSink());

注意:Elasticsearch 索引product_pv需要提前创建(动态创建也可),建议在 Kibana 或通过 REST API 创建映射以优化性能。

第四步:完整可运行代码整合

为了方便复制,下面给出完整的RealTimePVJob.java(省略 package 和 imports,但核心逻辑已全):

java

public class RealTimePVJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 可以根据 CPU 调整 Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); kafkaProps.setProperty("group.id", "flink-pv-group"); FlinkKafkaConsumer<ClickEvent> consumer = new FlinkKafkaConsumer<>( "click-events", new ClickEventDeserializer(), kafkaProps ); WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, recordTimestamp) -> event.timestamp); DataStream<ClickEvent> source = env .addSource(consumer) .assignTimestampsAndWatermarks(watermarkStrategy); DataStream<Tuple2<Integer, Long>> pvStream = source .filter(event -> "click".equals(event.eventType)) .map(e -> Tuple2.of(e.productId, 1L)) .returns(Types.TUPLE(Types.INT, Types.LONG)) .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .sum(1); pvStream.print("PV per product every minute"); pvStream.sinkTo(createESSink()); env.execute("Flink Kafka to ES PV Job"); } private static ElasticsearchSink<Tuple2<Integer, Long>> createESSink() { HttpHost httpHost = new HttpHost("localhost", 9200, "http"); ElasticsearchSink.Builder<Tuple2<Integer, Long>> builder = new ElasticsearchSink.Builder<>(Collections.singletonList(httpHost), (element, ctx, indexer) -> { Map<String, Object> doc = new HashMap<>(); doc.put("productId", element.f0); doc.put("pv", element.f1); doc.put("timestamp", System.currentTimeMillis()); IndexRequest request = Requests.indexRequest() .index("product_pv") .source(doc); indexer.add(request); }); builder.setBulkFlushMaxActions(10); return builder.build(); } }

运行与验证

  1. 启动 Docker 容器(Kafka + ES)

  2. 运行ClickEventProducer,开始发送模拟数据

  3. 运行RealTimePVJob,观察控制台输出(每分钟打印一次统计结果)

  4. 查询 Elasticsearch验证数据写入:

    bash

    curl -X GET "localhost:9200/product_pv/_search?pretty"

    你会看到类似下面的文档:

    json

    { "productId": 123, "pv": 342, "timestamp": 1743249876000 }

进阶优化与常见问题

1. 使用事件时间 vs 处理时间

本例使用了事件时间(基于数据中的timestamp字段),能正确处理乱序和延迟数据。如果使用处理时间,只需将TumblingEventTimeWindows换成TumblingProcessingTimeWindows,但这样会丢失事件时间的语义。

2. 检查点(Checkpoint)配置

为了达到 Exactly-Once 语义,建议启用 checkpoint:

java

env.enableCheckpointing(60000); // 每分钟一次 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);

3. Kafka 消费者的 offset 提交

Flink 默认在 checkpoint 完成时提交 offset,保证一致性。如果未开启 checkpoint,则依赖setCommitOffsetsOnCheckpoints参数。

4. Elasticsearch 连接优化

  • 增大setBulkFlushMaxActionssetBulkFlushInterval提高吞吐。

  • 如果 ES 开启安全认证,需要添加HttpHost的凭证。

5. 处理乱序和迟到数据

除了 Watermark 的延迟容忍,可以使用allowedLateness让窗口等待一段时间:

java

.window(TumblingEventTimeWindows.of(Time.minutes(1))) .allowedLateness(Time.seconds(10)) .sideOutputLateData(lateTag) // 将迟到数据输出到侧输出流

6. 状态后端选择

如果窗口状态很大(例如按用户、商品多维聚合),建议使用 RocksDB 作为状态后端,避免 OOM。

扩展:从 PV 到 UV

只需修改 keyBy 和聚合逻辑即可计算每分钟独立访客数(UV)。例如:

java

DataStream<Tuple2<Integer, Long>> uvStream = source .filter(e -> "click".equals(e.eventType)) .map(e -> Tuple2.of(e.productId, e.userId)) .keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new AggregateFunction<Tuple2<Integer,Long>, Set<Long>, Long>() { @Override public Set<Long> createAccumulator() { return new HashSet<>(); } @Override public Set<Long> add(Tuple2<Integer,Long> value, Set<Long> acc) { acc.add(value.f1); return acc; } @Override public Long getResult(Set<Long> acc) { return (long) acc.size(); } @Override public Set<Long> merge(Set<Long> a, Set<Long> b) { a.addAll(b); return a; } });

总结

本文带你完整实现了一条基于 Kafka + Flink + Elasticsearch 的实时数据处理管道。从模拟数据生成、事件时间窗口聚合,到结果持久化,每一步都提供了可直接运行的代码。你可以基于此模板轻松适配其他业务场景,如实时推荐、监控报警、大屏展示等。

Flink 的强大远不止于此——状态存储、CEP 模式匹配、SQL 动态分析等,都能与 Kafka 无缝集成。希望这篇文章能成为你踏入实时流处理世界的扎实一步。动手试试,你会爱上数据流过的每一毫秒。

http://www.jsqmd.com/news/556426/

相关文章:

  • 如何为Neutralinojs应用添加专业级窗口动画效果:终极实现指南
  • 智能体为什么这么火?
  • 影墨·今颜快速上手:英文Prompt写法+小红书审美风格控制技巧
  • 不止于‘看’:用Python玩转双光融合相机的数据采集与可视化分析
  • boxing裁剪功能深度优化:UCrop集成与自定义裁剪方案
  • 7天效率挑战:OpenClaw+Qwen3-32B镜像优化个人工作流
  • dry插件系统解析:如何扩展自定义Docker管理功能
  • 3个核心维度解析iOS数据取证:iLEAPP从入门到精通
  • 终极跨平台开发指南:ReScript Compiler在Windows/macOS/Linux的完整适配方案
  • 免费音频转换终极指南:用fre:ac轻松搞定音乐格式转换
  • STM32中断驱动下的EV1527无线解码实现与优化策略
  • PokemonRedExperiments强化学习训练中断恢复终极指南:checkpoint系统设计详解
  • Unblock-Youku测试与部署指南:从开发到上架Chrome商店
  • 【独家首发】Mojo 1.2 + Python 3.12混合编程标准架构图(工业级认证,仅限前500位开发者获取)
  • Netty从入门到精通:Java程序员必备!
  • Windows热键冲突终结者:Hotkey Detective技术解析与实战指南
  • 深入解析FOC控制中的Clark/Park变换及其Matplotlib动态仿真实现
  • 告别远程调试!手把手教你用DevEco Studio本地模拟器开发鸿蒙TV应用
  • 【图文教程】6大方法教你彻底禁止win11自动更新
  • ONNX-TensorRT 核心解析器深度解析:NvOnnxParser 架构与实现原理
  • 终极指南:如何用Chanlun-Pro实现智能缠论量化交易
  • NSwag安全访问控制配置指南:保护敏感API操作的终极方案
  • 摄影小白必看:你的手机拍照忽明忽暗?5分钟搞懂AE自动曝光与‘白加黑减’原理
  • 容器生命周期
  • 猫抓Cat-Catch:如何用浏览器扩展精准捕获网页媒体资源?
  • Python与Abaqus联合作战:高效自动化仿真实战指南
  • EasyExcel实战:如何用CellWriteHandler给特定单元格加红色背景(附依赖冲突解决方案)
  • OpenInTerminal:重新定义macOS终端操作效率的必备工具
  • [具身智能-158]:三个最适合入门的具身智能落地场景,并规划了一条从“单一功能”到“通用智能”的演进路径。
  • CAJ转PDF高效解决方案:让学术文献跨平台阅读不再困难