当前位置: 首页 > news >正文

从零开始学Flink:TopN 榜单

在上一篇 《从零开始学Flink:Flink SQL四大Join解析》结尾提到过,下一篇要把 窗口聚合(Window Aggregation)与 TopN 讲清楚。窗口负责把无界流切成可统计的时间片,TopN 负责把“统计结果”变成榜单输出;两者组合起来,PV/UV、订单量、热销榜、实时大屏基本都能覆盖。

但这块也最容易踩坑:SQL 明明在跑却一直没输出、TopN 结果频繁更新/撤回下游写不进去、滑动窗口一上来状态就撑爆。本文直接用可复现的 Kafka 数据流把这些问题跑出来,并给出对应的处理方式。

本文基于 Flink 1.20+,用 SQL Client 直接在本地 standalone 集群验证。你可以把文中的 SQL 原样复制过去跑通,再按自己的业务把窗口粒度、乱序容忍、下游写入方式替换掉。

0. 环境准备:用 SQL Client 直接跑起来

为了把注意力放在 SQL 本身,本文用 Kafka 做数据源:手动往 Topic 推送点击行为数据,用 print 在 TaskManager Stdout 里观察结果。

使用前请确认 Flink 已加载 Kafka SQL Connector(把 flink-sql-connector-kafka-*.jar 放到 $FLINK_HOME/lib 并重启集群/SQL Client)。

先把下面几个参数设好,后面跑窗口/TopN 时更容易看到输出:

-- 1) 避免 source 空闲导致 watermark 不推进,从而窗口一直不触发
SET 'table.exec.source.idle-timeout' = '5s';
-- 2) 让窗口/TopN 的结果更“及时”(更快看到输出)
SET 'execution.checkpointing.interval' = '10s';
-- 3) 以流模式运行(源是无界),持续刷到 SQL Client
SET 'execution.runtime-mode' = 'streaming';
-- 4) 开启 changelog 模式,使窗口/TopN 的结果更“及时”(更快看到输出)
SET 'sql-client.execution.result-mode' = 'changelog';

接着创建一张点击行为表(事件时间 + Watermark):

CREATE TABLE dwd_click_log (user_id     STRING,item_id     STRING,category_id STRING,ts          BIGINT,event_time  AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND
) WITH ('connector' = 'kafka','topic' = 'dwd_click_log','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-dwd-click-log','scan.startup.mode' = 'earliest-offset','format' = 'json','json.ignore-parse-errors' = 'true'
);

先准备 Kafka Topic:

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic dwd_click_log --partitions 1 --replication-factor 1

再创建几个 print sink 用来观察输出:

CREATE TABLE ads_window_metrics_print (window_start TIMESTAMP_LTZ(3),window_end   TIMESTAMP_LTZ(3),pv           BIGINT,uv           BIGINT
) WITH ('connector' = 'print','print-identifier' = 'ads_window_metrics_print'
);
CREATE TABLE ads_session_metrics_print (window_start TIMESTAMP_LTZ(3),window_end   TIMESTAMP_LTZ(3),user_id      STRING,click_cnt    BIGINT
) WITH ('connector' = 'print','print-identifier' = 'ads_session_metrics_print'
);
CREATE TABLE ads_topn_print (window_start TIMESTAMP_LTZ(3),window_end   TIMESTAMP_LTZ(3),category_id  STRING,item_id      STRING,cnt          BIGINT,rn           BIGINT
) WITH ('connector' = 'print','print-identifier' = 'ads_topn_print'
);

1. 窗口聚合基础:你到底在对“哪段时间”做统计

在 Flink SQL 里,窗口的本质是:把无界流切成一个个“有限集合”,再在集合上做 GROUP BY 聚合。

窗口统计能否输出,核心取决于两件事:

  • 你选的是 Processing Time 还是 Event Time
  • Event Time 场景下,Watermark 是否在推进(决定窗口是否“关窗”)

本文以 Event Time 为主,因为绝大多数实时数仓指标都需要“按业务发生时间统计”,而不是“按处理到达时间统计”。

Flink 早期有 GROUP BY TUMBLE(...) 这类 Group Window 语法,新版本更推荐 Window TVF(Table Valued Function),它的输出会直接带上 window_start/window_end/window_time 字段,更清晰,也更容易与 TopN/Join 组合。

2.1 滚动窗口(TUMBLE):每条数据只属于一个窗口

典型场景:按分钟/小时统计 PV、UV、GMV。

INSERT INTO ads_window_metrics_print
SELECTwindow_start,window_end,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv
FROM TABLE(TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '1' SECOND)
)
GROUP BY window_start, window_end;

推送数据(最简单:控制台直接粘贴 JSON,一行一条)

$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log

粘贴下面数据(ts 用毫秒时间戳;为了让事件时间窗口及时“关窗”,建议 ts 单调递增,或者最后补一条明显更大的 ts 用来推进 Watermark):

{"user_id":"u01","item_id":"i01","category_id":"c01","ts":1774454400000}
{"user_id":"u02","item_id":"i02","category_id":"c01","ts":1774454402000}
{"user_id":"u01","item_id":"i03","category_id":"c02","ts":1774454403000}
{"user_id":"u03","item_id":"i04","category_id":"c02","ts":1774454404000}
{"user_id":"u04","item_id":"i01","category_id":"c01","ts":1774454405000}

到 Flink Web UI → TaskManagers → Stdout 查看输出:
滚动窗口示例

要点:

  • TUMBLE 适合“报表型”指标,窗口不重叠,状态相对可控
  • COUNT(DISTINCT ...) 会引入去重状态,用户数大时要关注状态体积(生产中可考虑用近似去重或分层聚合)

2.2 滑动窗口(HOP):一条数据会被“复制”到多个窗口

典型场景:最近 5 分钟滚动 UV、最近 1 小时成交额每 5 分钟刷新一次。

示例:窗口长度 30s,每 10s 滑动一次。

INSERT INTO ads_window_metrics_print
SELECTwindow_start,window_end,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv
FROM TABLE(HOP(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND, INTERVAL '30' SECOND)
)
GROUP BY window_start, window_end;

到 Flink Web UI → TaskManagers → Stdout 查看输出:
滑动窗口示例

要点:

  • HOP 的状态压力通常显著高于 TUMBLE,因为数据会进入多个窗口
  • 业务上能用 TUMBLE 不用 HOP;必须用 HOP 时,尽量降低窗口长度或放大 slide(减少并行窗口数)

2.3 会话窗口(SESSION):按“事件间隔”自动切窗

典型场景:统计用户一次访问会话内的点击数/停留时长、按会话做转化漏斗。

示例:同一用户 10s 内没有新事件就认为会话结束。

INSERT INTO ads_session_metrics_print
SELECTwindow_start,window_end,user_id,COUNT(*) AS click_cnt
FROM TABLE(SESSION(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND)
)
GROUP BY window_start, window_end, user_id;

推送数据(最简单:控制台直接粘贴 JSON,一行一条)

$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dwd_click_log

往 topic dwd_click_log 推送数据(控制台直接粘贴 JSON,一行一条)

{"user_id":"u01","item_id":"i01","category_id":"c01","ts":1775143687285}
{"user_id":"u02","item_id":"i02","category_id":"c01","ts":1775143688285}
{"user_id":"u01","item_id":"i03","category_id":"c02","ts":1775143689285}

到 Flink Web UI → TaskManagers → Stdout 查看输出:
会话窗口示例

要点:

  • SESSION 窗口边界不固定,会因为迟到数据发生“合并”,下游会看到更新/撤回更频繁
  • 如果你的下游只接受 Append(只插入不更新),SESSION 往往不合适,除非你引入可更新的 sink(Upsert)

4. TopN:把窗口聚合变成实时榜单

TopN 的正确打开方式是“两段式”:

  1. 先做窗口聚合得到每个候选项的指标(比如每个商品的点击数)
  2. 再在聚合结果上做排序,取前 N

4.1 窗口内 TopN:每个窗口的热榜 Top3

需求:每 10 秒统计一次“各品类内点击 Top3 商品”。

INSERT INTO ads_topn_print
WITH item_cnt AS (SELECTwindow_start,window_end,category_id,item_id,COUNT(*) AS cntFROM TABLE(TUMBLE(TABLE dwd_click_log, DESCRIPTOR(event_time), INTERVAL '10' SECOND))GROUP BY window_start, window_end, category_id, item_id
),
ranked AS (SELECTwindow_start,window_end,category_id,item_id,cnt,ROW_NUMBER() OVER (PARTITION BY window_start, window_end, category_idORDER BY cnt DESC, item_id) AS rnFROM item_cnt
)
SELECTwindow_start,window_end,category_id,item_id,cnt,rn
FROM ranked
WHERE rn <= 3;

到 Flink Web UI → TaskManagers → Stdout 查看输出:
窗口内 TopN 示例

几个关键点:

  • PARTITION BY window_start, window_end, category_id 表示“每个窗口、每个品类各自一张榜单”
  • ORDER BY cnt DESC 决定榜单规则;追加 item_id 是为了稳定排序,避免并列时结果抖动
  • TopN 本质是对一张动态表做排序截断,窗口聚合(尤其是 SESSION/HOP)会产生更新,因此 TopN 输出常常不是纯 Append

5. 生产落地:TopN 为什么“写不进”下游

TopN 落地最常见的问题是:下游只接受追加流(Append-only),但 TopN 的结果在运行过程中会不断更新。

你会在 print 里看到类似 +I/-U/+U 的变更日志输出(不同版本格式略有差异),这意味着:

  • 早期输出的第 3 名,后续可能被挤掉,需要撤回
  • 早期输出的第 1 名,后续计数增长,会以更新的形式重发

落地时通常有两种策略:

  • 写 Upsert Sink:Kafka Upsert、JDBC(主键表)、HBase、Redis 等,要求结果表定义 PRIMARY KEY (...) NOT ENFORCED
  • 把 TopN 变成“窗口结束一次性输出”:只在窗口最终关闭后输出最终榜单,减少更新(更偏离线思路)

对于实时大屏、实时榜单,通常选 Upsert Sink。

6. 性能与稳定性:窗口与 TopN 的几个关键参数

6.1 状态 TTL:给状态设“上限”

即使是窗口聚合,状态也不是“完全自动可控”的:滑动窗口、去重、TopN 的中间表都会占用状态。

生产建议给作业设置统一 TTL,例如保留 7 天(按业务调整):

SET 'table.exec.state.ttl' = '7 d';

6.2 MiniBatch:降低聚合与排序的更新频率

当源数据更新非常频繁(尤其是有去重、TopN)时,MiniBatch 能显著减少算子更新次数,提升吞吐:

SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '2 s';
SET 'table.exec.mini-batch.size' = '2000';

6.3 Watermark 设计:迟到与实时性的平衡

WATERMARK FOR event_time AS event_time - INTERVAL '3' SECOND 的 3 秒不是越大越好:

  • 太小:乱序稍大就会被判定为迟到数据而丢弃
  • 太大:窗口输出延迟变大,榜单刷新变慢

通常做法是先用历史数据评估乱序分布(P95/P99),再给一个能接受的延迟阈值。

7. 小结

  • Window TVF(TUMBLE/HOP/SESSION)是 Flink SQL 窗口聚合的主流写法,关键在于 Event Time + Watermark 是否能把窗口按预期触发出来
  • TopN 建议两段式:先聚合再排名,并根据业务选择 ROW_NUMBERRANK
  • TopN 多数情况下会产生更新/撤回,生产下游优先考虑 Upsert Sink(或把输出改为“窗口结束一次性输出”)
  • 性能与稳定性重点关注:State TTL、MiniBatch、Watermark 延迟与业务时效的权衡

关注【代码匠心】,回复关键字 flink,获取 Flink 学习资料!
代码匠心

原文链接: http://blog.daimajiangxin.com.cn

http://www.jsqmd.com/news/578715/

相关文章:

  • 从LVGL菜单组件反推:手搓一个轻量级C语言菜单框架(适合RTOS/单片机)
  • 本科毕业论文“通关秘籍”:好写作AI的神奇助力
  • 主流CRM系统盘点与选型:电商零售企业2026实战指南
  • STM32开发中的可执行文件格式解析:AXF、HEX与BIN
  • 8位单片机高效处理16位整数的4种方法
  • 我的编程之路
  • 确保在STA线程中运行
  • 】【】
  • COMSOL锂电池仿真入门教学:主要为电极单元的电化学-热耦合,也可以是电池包热仿真
  • 雕塑的安装方式(以玻璃钢为例)
  • rotary_encoder库深度解析:正交编码器状态机与嵌入式抗抖动实践
  • npm 发布报错 403 Forbidden(2FA)解决方案
  • Spring with AI (): 搜索扩展——向量数据库与RAG(下)
  • 收藏备用!大模型面试高频题:为什么有KV-Cache却没有Q-Cache?小白也能秒懂
  • ESP32S3+SPIFFS实战:5分钟搭建个人网盘(含前端完整代码)
  • 硕士毕业论文“攻坚利器”:好写作AI的全方位赋能
  • OpenClaw学习曲线:Kimi-VL-A3B-Thinking从入门到精通的30天记录
  • STV Group和Post-Quantum成功测试全球首款抗量子无人机
  • 数据处理与统计分析----沙箱
  • P2569 [SCOI2010] 股票交易s 题解
  • 第 4 章 列表数据类型 知识点精讲
  • [特殊字符] 镜像视界|视频不再记录世界,而是计算世界:空间智能的崛起——基于Pixel-to-Space与动态三维重构的空间智能感知体系
  • 基于MATLAB的简单带有GUI界面的交通路标识别项目
  • 新手电工必看!3个致命接线错误,90%的人都踩过坑
  • 前端代码可读性优化:让你的代码不再像天书
  • Ostrakon-VL-8B对比评测:主流开源多模态模型在餐饮场景的较量
  • 大厂vs.垂直玩家:电商AI视频工具怎么选?易元AI的“专注”才是护城河
  • 人工智能+督导闭环,奥尔特云街道网格治理闭环系统
  • 全网独家!加入风机模块的IEEE9模型!
  • 树莓派5B - 零基础应用开发系列(第二期):从环境配置到首个物联网应用