当前位置: 首页 > news >正文

Flink知识点(五)|Window(窗口)

上一期我们聊到了Flink知识点(四)|Watermark(水位线),本期我们继续一起聊下Flink的窗口。

一、为什么需要窗口

Flink 处理的是无界流,数据永远不会结束。窗口把无界流切成有界的数据块,才能做聚合统计。Window是无限数据流处理的核心,Window 将一个无限的stream拆分成有限大小的“buckets”桶,我们可以在这些桶上做计算操作。

还是外卖的例子:统计"每10分钟下了多少单",就需要把数据按时间切成一段一段来计算。


二、窗口的分类

2.1 按键分

KeyedStream → keyBy() 之后 → .window() 每个 key 独立维护自己的窗口 Non-Keyed → 不 keyBy() → .windowAll() 所有数据进同一个窗口(并行度强制为1,慎用)

2.2 按照窗口分配数据的规则分类

2.2.1 滚动窗口(Tumbling)

固定大小,不重叠,每条数据只属于一个窗口。

注:图片来源于网上
stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).sum("amount");

外卖场景:统计每10分钟的订单量,11:00-11:10 一个窗口,11:10-11:20 一个窗口,互不干扰。

2.2.2 滑动窗口(Sliding Window)

固定大小,有重叠,一条数据可能属于多个窗口。由窗口大小和滑动步长共同决定。

注:图片来源于网上
stream.keyBy(Order::getUserId)// 窗口大小10分钟,每5分钟滑动一次.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5))).sum("amount");

外卖场景:每5分钟统计一次"过去10分钟的订单量",用来做实时趋势监控。

注意:窗口大小 / 滑动步长 = 每条数据被计算的次数,步长越小,计算开销越大。

2.2.3 会话窗口(Session Window)

没有固定大小,按照数据间的间隔来切分。超过指定时间没有数据,窗口关闭。

注:图片来源于网上
stream.keyBy(Order::getUserId)// 超过30分钟没有数据,关闭窗口.window(EventTimeSessionWindows.withGap(Time.minutes(30))).sum("amount");

外卖场景:统计用户一次"点餐会话"的总消费,用户开始浏览到下单完成算一次会话,中间超过30分钟没操作就结束。

扩展:动态间隔

// 不同用户可以有不同的 gap.window(EventTimeSessionWindows.withDynamicGap(order->order.getUserLevel().equals("VIP")?60_000L:30_000L))

2.2.4 全局窗口(Global Window)

所有数据进同一个窗口,永远不自动触发,必须配合自定义 Trigger 使用。

注:图片来源于网上
stream.keyBy(Order::getUserId).window(GlobalWindows.create())// 每累积100条数据触发一次.trigger(CountTrigger.of(100)).sum("amount");

外卖场景:每个用户累计下了100单,触发一次统计。

扩展:把按键分和按照窗口分配数据的规则分类组合起来看

滚动滑动会话全局
Keyed✅ 最常用
Non-Keyed慎用慎用慎用慎用

实际生产中基本都是Keyed + 滚动/滑动/会话这三种组合。

2.3 按照驱动类型分

  • 时间窗口(Time Window):以时间点来定义窗口的开始和结束。>定点发车
  • 计数窗口(Count Window):以事件的个数来截取数据,达到固定的个数(预置)就出发计算并关闭窗口。>人齐发车

三、窗口函数

窗口收集到数据后,用窗口函数来计算。

3.1 增量聚合函数

数据来一条处理一条,不存储原始数据,内存占用小。

3.1.1 ReduceFunction

stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).reduce((a,b)->{a.setAmount(a.getAmount()+b.getAmount());returna;});

3.1.2 AggregateFunction

更灵活,输入输出类型可以不同

stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newAggregateFunction<Order,Tuple2<Double,Integer>,Double>(){@OverridepublicTuple2<Double,Integer>createAccumulator(){returnTuple2.of(0.0,0);// (总金额, 订单数)}@OverridepublicTuple2<Double,Integer>add(Orderorder,Tuple2<Double,Integer>acc){returnTuple2.of(acc.f0+order.getAmount(),acc.f1+1);}@OverridepublicDoublegetResult(Tuple2<Double,Integer>acc){returnacc.f0/acc.f1;// 返回平均金额}@OverridepublicTuple2<Double,Integer>merge(Tuple2<Double,Integer>a,Tuple2<Double,Integer>b){returnTuple2.of(a.f0+b.f0,a.f1+b.f1);}});

3.2 全量窗口函数

窗口触发时才处理,能拿到窗口内所有数据和窗口元信息。

3.2.1 ProcessWindowFunction

stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunction<Order,String,String,TimeWindow>(){@Overridepublicvoidprocess(StringuserId,Contextctx,Iterable<Order>orders,Collector<String>out){longwindowStart=ctx.window().getStart();longwindowEnd=ctx.window().getEnd();doubletotal=StreamSupport.stream(orders.spliterator(),false).mapToDouble(Order::getAmount).sum();out.collect(String.format("用户%s 在 %s~%s 消费了 %.2f 元",userId,windowStart,windowEnd,total));}});

3.2.2 WindowFunction

stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunction<Order,String,String,TimeWindow>(){@Overridepublicvoidprocess(StringuserId,Contextctx,Iterable<Order>orders,Collector<String>out){doubletotal=0;for(Orderorder:orders){total+=order.getAmount();}out.collect(userId+" 消费 "+total);}});

3.3 增量和全量结合(推荐使用)

用 AggregateFunction 做增量聚合,再用 ProcessWindowFunction 拿窗口元信息,兼顾性能和灵活性:

stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newOrderAggregateFunction(),newOrderProcessWindowFunction());

四、触发器(Trigger)

决定窗口什么时候触发计算,默认不需要手动配置。

// 内置触发器EventTimeTrigger.create()// 默认,Watermark 超过窗口结束时间触发ProcessingTimeTrigger.create()// 处理时间触发CountTrigger.of(100)// 累积100条触发PurgingTrigger.of(...)// 触发后清空窗口数据// 自定义触发器.trigger(newTrigger<Order,TimeWindow>(){@OverridepublicTriggerResultonElement(Orderorder,longtimestamp,TimeWindowwindow,TriggerContextctx){// 每来一条数据都触发(实时输出,但开销大)returnTriggerResult.FIRE;}@OverridepublicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.FIRE_AND_PURGE;// 触发并清空}@OverridepublicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.CONTINUE;}});

TriggerResult 的四种结果:

结果含义
CONTINUE什么都不做
FIRE触发计算,保留数据
PURGE清空数据,不触发
FIRE_AND_PURGE触发计算并清空数据

五、迟到数据处理

那就使用侧输出流

OutputTag<Order>lateTag=newOutputTag<Order>("late-orders"){};SingleOutputStreamOperator<String>result=stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).allowedLateness(Time.minutes(1))// 窗口关闭后再等1分钟.sideOutputLateData(lateTag)// 超过1分钟的数据发到侧输出.process(newOrderProcessWindowFunction());// 单独处理极端迟到的数据result.getSideOutput(lateTag).print("极端迟到的订单");

六、整体流程总结

数据流 → keyBy() 按 key 分区 → window() 指定窗口类型(滚动/滑动/会话/全局) → trigger() 指定触发条件(可选,有默认值) → allowedLateness() 处理迟到数据(可选) → aggregate/process 窗口函数计算结果

七、怎么选窗口类型

固定周期统计(每小时、每天) → 滚动窗口 实时趋势、移动平均 → 滑动窗口 用户行为分析、会话统计 → 会话窗口 按条数或自定义条件触发 → 全局窗口 + 自定义 Trigger

八、Flink SQL中的窗口使用

TVF(Table-Valued Function)窗口

8.1 滚动窗口 TUMBLE

-- 每10分钟统计一次各用户的订单量和总金额SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cnt,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL'10'MINUTE))GROUPBYwindow_start,window_end,user_id;

8.2 滑动窗口 HOP

-- 窗口大小10分钟,每5分钟滑动一次SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(HOP(TABLEorders,DESCRIPTOR(order_time),INTERVAL'5'MINUTE,INTERVAL'10'MINUTE))GROUPBYwindow_start,window_end,user_id;

8.3 会话窗口 SESSION

-- 超过30分钟没有数据,关闭窗口SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(SESSION(TABLEorders,DESCRIPTOR(order_time),INTERVAL'30'MINUTE))GROUPBYwindow_start,window_end,user_id;

8.4 累计窗口 CUMULATE(TVF 特有)

这是 TVF 新增的窗口类型,DataStream API 没有对应实现。

-- 每天从0点开始,每1小时输出一次当天累计订单量-- 比如 01:00 输出 0~1 点的累计,02:00 输出 0~2 点的累计SELECTwindow_start,window_end,user_id,COUNT(*)AScumulative_cntFROMTABLE(CUMULATE(TABLEorders,DESCRIPTOR(order_time),INTERVAL'1'HOUR,INTERVAL'1'DAY))GROUPBYwindow_start,window_end,user_id;

CUMULATE 参数:表名、时间字段、累计步长、最大窗口大小

效果:
window_start=00:00 window_end=01:00 → 统计 0~1 点
window_start=00:00 window_end=02:00 → 统计 0~2 点
window_start=00:00 window_end=03:00 → 统计 0~3 点

window_start=00:00 window_end=24:00 → 统计全天

外卖场景:大屏展示"今日累计订单量",每小时刷新一次。

九、举两个Flink SQL的例子

9.1 窗口 TopN

统计每10分钟内,下单金额最高的前3名用户:

SELECT*FROM(SELECTwindow_start,window_end,user_id,total_amount,ROW_NUMBER()OVER(PARTITIONBYwindow_start,window_endORDERBYtotal_amountDESC)ASrnFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL'10'MINUTE))GROUPBYwindow_start,window_end,user_id))WHERErn<=3;

9.2 窗口 Join

订单表和用户表按相同窗口做 Join,Join不太清楚的,可以看之前的文章:Flink知识点(一)|Flink中的双流关联

SELECTo.window_start,o.window_end,o.user_id,u.user_name,o.total_amountFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL'10'MINUTE))GROUPBYwindow_start,window_end,user_id)oJOIN(SELECTwindow_start,window_end,user_id,user_nameFROMTABLE(TUMBLE(TABLEusers,DESCRIPTOR(login_time),INTERVAL'10'MINUTE))GROUPBYwindow_start,window_end,user_id,user_name)uONo.window_start=u.window_startANDo.window_end=u.window_endANDo.user_id=u.user_id;
http://www.jsqmd.com/news/510684/

相关文章:

  • 2026年知名的光轴厂家推荐:油缸光轴/实心光轴/不锈钢光轴厂家选择参考建议 - 行业平台推荐
  • AI 时代的 Git 进阶术:如何优雅地让多个 Agent 并行开发
  • SiameseUIE Anaconda环境配置:Python虚拟环境最佳实践
  • 2026年评价高的Gcr15圆钢厂家推荐:45#钢圆钢/剥皮圆钢行业内口碑厂家推荐 - 行业平台推荐
  • GHelper:华硕笔记本硬件控制的轻量级解决方案
  • 3分钟搞定vLLM+Docker部署:从镜像构建到多卡推理全流程(附常见报错解决)
  • UE5-MCP:AI驱动的游戏开发效率提升解决方案
  • 100+中文词向量:构建智能语义理解的核心引擎
  • 2026年比较好的免炖即食燕窝公司推荐:即食燕窝代工/余姚即食燕窝/孕妇滋补即食燕窝公司口碑哪家靠谱 - 行业平台推荐
  • api工具apifox、apipost选择
  • 2026年口碑好的正品溯源燕窝盏品牌推荐:干挑溯源燕窝盏源头厂家推荐几家 - 行业平台推荐
  • 5大维度精通DocRED:文档级关系抽取实战指南
  • Pixel Dimension Fissioner保姆级教学:侧边栏参数调控+实时HUD解读
  • 2026年比较好的怡宝深圳送水公司推荐:哇哈哈深圳送水/深圳送水桶装水配送厂家推荐哪家好 - 行业平台推荐
  • Ai元人文:从自感痕迹论到伦理中间件——情境智慧中的价值原语化方法论(未展开)
  • 每周一个开源项目#1:MiroFish —— 一个试图“预测未来”的AI系统
  • 2026年靠谱的气膜结构厂家推荐:膜结构球场/膜结构停车棚用户好评厂家推荐 - 行业平台推荐
  • 【Hot 100 刷题计划】 LeetCode 763. 划分字母区间 | C++ 贪心算法题解
  • 2026年靠谱的3-氟-4-氨基苯酚厂家推荐:3-氟-4-氨基苯酚盐酸盐/高纯度3-氟-4-氨基苯酚/医药用3-氟-4-氨基苯酚厂家推荐参考 - 品牌宣传支持者
  • 56:XSS攻防博弈:从CSP策略到Filter绕过的实战推演
  • QuickBMS深度解析:游戏资源提取与逆向工程的瑞士军刀
  • 2026年热门的景观膜结构车棚品牌推荐:污水池膜结构车棚/自行车膜结构车棚/停车场膜结构车棚高评价厂家推荐 - 行业平台推荐
  • 踩坑复盘:弃MySQL选PostgreSQL,地理数据存储终于不头疼了
  • 2026年比较好的KCB齿轮油泵厂家推荐:YCB齿轮油泵/LQB沥青齿轮油泵/NCB高粘度内齿轮油泵人气实力厂商推荐 - 行业平台推荐
  • Pixel Dimension Fissioner开源镜像:免编译部署,支持A10/A100/V100全适配
  • 如何借助开源字体实现专业级排版?——EB Garamond 12复古字体全维度应用指南
  • C++ 基础核心知识
  • 【Python基础入门】第四课: 函数
  • 国家级认证 信息系统项目管理师(软高)一站式通关课程
  • 有哪些机构可以颁发信创产品评估证书?