当前位置: 首页 > news >正文

Flink SQL 性能调优MiniBatch、两阶段聚合、Distinct 拆分、MultiJoin 与 Delta Join 一文打通

1. 为什么 Flink SQL 会慢:状态与放大效应

Flink Table/SQL 的性能瓶颈高频出现在两类算子:

1)聚合(Group Aggregation / Window TVF Aggregation)
默认逐条处理:读 state → 更新 accumulator → 写回 state。RocksDB 场景下尤其“读写上瘾”,数据倾斜时还会出现热点 key,轻松 backpressure。

2)常规 Join(Regular Join)
同样逐条处理:查对侧 state → 更新本侧 state → 产出 join 结果。多表级联 join 时,中间结果会“记录放大”,state 变大、反查更慢、checkpoint 更重,作业稳定性直线下降。

接下来我们用一组优化把这两个痛点逐个拆掉。

2. MiniBatch 聚合:把“每条一次状态读写”变成“一批一次”

2.1 核心思想

MiniBatch 聚合会先把输入缓存到算子内部 buffer,触发时再批量处理。同一个 key 在一个 batch 内可以被折叠,状态访问从“每条一次”降低到“每个 key 一次”。

收益
显著减少 state 访问次数,提高吞吐,尤其是 RocksDB StateBackend。

代价
会引入额外延迟(因为要攒一批再算),吞吐与延迟的典型 trade-off。

2.2 开启方式(Java 配置)

// instantiate table environmentTableEnvironmenttEnv=...;// access flink configurationTableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");// 开启 mini-batchconfiguration.set("table.exec.mini-batch.allow-latency","5 s");// 允许缓存 5sconfiguration.set("table.exec.mini-batch.size","5000");// 每个 task 缓存最大记录数

2.3 一个经验值怎么选

  • allow-latency:先按 1s~5s 试,目标是“吞吐明显上升但业务还能接受延迟”
  • size:按单条记录大小与并发估算内存,5000/10000 常见,越大越容易提升吞吐但越吃内存、延迟越高
  • RocksDB 场景通常更值得开(state 读写成本更高)

2.4 Window TVF Aggregation 的特殊点

Window TVF 聚合默认总是开启 MiniBatch,而且它使用托管内存(managed memory)缓存,不走 JVM 堆,GC/OOM 风险小一些。Group Aggregation 则需要你显式开启。

3. Local-Global 两阶段聚合:专治数据倾斜与热点 key

3.1 为什么“两阶段”能治倾斜

GROUP BY的 key 倾斜时(比如某个 color/day 的数据量巨大),某些聚合实例会变成热点。两阶段聚合把聚合拆成:

  • Local 聚合:上游先在本地做一次预聚合(类似 MapReduce 的 Combine)
  • Global 聚合:下游再把各个 local 的 accumulator 合并

这样网络 shuffle 的数据量减少了,state 访问也减少了,热点压力被分摊。

3.2 注意:它依赖 MiniBatch

Local-Global 的“攒一波再合并”依赖 MiniBatch 的触发节奏,所以必须先开启 MiniBatch。

3.3 开启方式(Java 配置)

TableEnvironmenttEnv=...;TableConfigconfiguration=tEnv.getConfig();configuration.set("table.exec.mini-batch.enabled","true");configuration.set("table.exec.mini-batch.allow-latency","5 s");configuration.set("table.exec.mini-batch.size","5000");// 两阶段聚合:TWO_PHASEconfiguration.set("table.optimizer.agg-phase-strategy","TWO_PHASE");

3.4 适用场景

  • SUM / COUNT / MAX / MIN / AVG 等普通聚合 + 明显倾斜
  • 需要降低 shuffle 与 RocksDB state 读写

不太适用

  • DISTINCT 聚合(下一节讲)

4. Split Distinct Aggregation:让 COUNT(DISTINCT) 也能水平扩展

4.1 为什么 DISTINCT 聚合难搞

COUNT(DISTINCT user_id)如果 user_id 很稀疏,Local-Global 并不能有效减少数据:local accumulator 里几乎还是“原始全集”,全压到 global 上,global 仍是瓶颈。

4.2 解决思路:加一个 bucket key

把 distinct 拆成两层:

第一层:按group key + bucket key聚合
bucket key 由MOD(HASH_CODE(distinct_key), BUCKET_NUM)得到,默认 BUCKET_NUM=1024,可配置。

第二层:按原 group key 再聚合,把各 bucket 的结果 SUM 起来。

等价性
同一个 distinct_key 会落在同一个 bucket,去重逻辑不变,但热点被 1024 个桶分摊,吞吐更稳定。

4.3 自动改写示例

原 SQL:

SELECTday,COUNT(DISTINCTuser_id)FROMTGROUPBYday;

开启后会被等价改写为类似:

SELECTday,SUM(cnt)FROM(SELECTday,COUNT(DISTINCTuser_id)AScntFROMTGROUPBYday,MOD(HASH_CODE(user_id),1024))GROUPBYday;

4.4 开启方式

tEnv.getConfig().set("table.optimizer.distinct-agg.split.enabled","true");

可调 bucket 数:

  • table.optimizer.distinct-agg.split.bucket-num

4.5 限制点

目前不支持包含用户自定义 AggregateFunction 的聚合(distinct 拆分无法保证通用等价)。

5. DISTINCT 多维 UV:用 FILTER 替代 CASE WHEN,省 state、提性能

很多人写多维 UV 喜欢这样:

COUNT(DISTINCTCASEWHENflagIN(...)THENuser_idELSENULLEND)

更推荐用标准 SQL 的 FILTER:

SELECTday,COUNT(DISTINCTuser_id)AStotal_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('android','iphone'))ASapp_uv,COUNT(DISTINCTuser_id)FILTER(WHEREflagIN('wap','other'))ASweb_uvFROMTGROUPBYday;

关键收益
优化器能识别“同一 distinct_key(user_id)+ 不同 filter 条件”,从而复用状态:原本可能要 3 份 distinct state,现在可以共享一份,state 大小与访问次数都下降,特别适合高基数 UV 指标。

6. MiniBatch Regular Join:减少 state IO + 抑制冗余输出

6.1 常规 Join 的痛点

逐条 join 会导致:

  • 频繁查对侧 state(RocksDB 更慢)
  • 级联 join 时记录放大严重,中间结果爆炸

6.2 MiniBatch Join 做了两件事

1)buffer 内折叠记录:join 前先把同 key 的变更合并,减少参与 join 的数据量
2)尽量抑制冗余结果:buffer 处理时尽可能不输出多余的中间变更

6.3 开启方式(SQL 例子)

SET'table.exec.mini-batch.enabled'='true';SET'table.exec.mini-batch.allow-latency'='5S';SET'table.exec.mini-batch.size'='5000';SELECTa.idASa_id,a.a_content,b.idASb_id,b.b_contentFROMaLEFTJOINbONa.id=b.id;

说明
Regular Join 的 MiniBatch 默认是关闭的,需要显式开启(同聚合一样的三项参数)。

6.4 适用建议

  • Join 两侧是 Upsert / CDC 场景、同 key 变更频繁,buffer 折叠收益巨大
  • 级联 join 的作业,先上 mini-batch join 往往能立刻看到 state 与吞吐改善

7. 多表 Regular Join:MultiJoin(Flink 2.1)把“中间 state”直接砍掉

当你的 SQL 是多表非时态 regular join,最常见的故障模式是:state 越跑越大,checkpoint 越来越慢,作业越来越不稳。

Flink 2.1 引入 MultiJoin Operator,核心目标是:

零中间 state(zero intermediate state)
不再为 join 链上的每个二元 join 存中间结果状态,而是把多个流同时在一个算子里 join,显著减少 state。

7.1 什么时候启用最划算

如果满足两点,就非常值得试:

  • 作业有多个 join,且 join 条件共享至少一个公共 join key(能按同一 key 分区)
  • 你观察到:中间 join 的 state 比输入表 state 还大(典型记录放大链路)

7.2 开启方式

SET'table.optimizer.multi-join.enabled'='true';

7.3 支持与限制要点(很重要)

  • 当前处于实验状态(可能有优化与 breaking change)

  • 目前支持 streaming INNER / LEFT joins

  • RIGHT join 计划支持(但你上线前要以实际版本为准)

  • 分区要求:至少有一条可以把多表一起 partition 的 key

    • 支持:A JOIN B ON A.key = B.key JOIN C ON A.key = C.key
    • 支持:A JOIN B ON A.key = B.key JOIN C ON B.key = C.key(传递性)
    • 不支持:A.key1=B.key1 且 B.key2=C.key2(没有统一 key,会拆成多个 MultiJoin)

7.4 经验结论

  • 记录放大越明显,MultiJoin 越能“越跑越稳”
  • 如果 join 链反而让 state 变小(较少见),二元 join 可能更快,但 MultiJoin 通常 still 更省总 state

8. Delta Join:用“外部索引 + 双向查表”替代“大 state”,稳定性直接起飞

8.1 为什么 Delta Join 能把 state 压下去

传统 regular join 必须把两侧历史数据都存进 Flink state,以确保对侧迟到时还能匹配。

Delta join 的思路是:
不在 Flink state 里囤全量数据,而是借助外部存储系统的索引能力(例如 Apache Fluss 提供索引信息),直接对外部系统做高效索引查询来完成匹配。这样 Flink state 与外部存储之间不会重复存储同一份数据。

效果
state 大幅缩小,checkpoint 压力下降,作业长期运行更稳。

8.2 默认策略

Delta join 默认开启;当满足条件时 regular join 会自动被优化为 delta join。

如需关闭:

SET'table.optimizer.delta-join.strategy'='NONE';

8.3 可调缓存参数(调优入口)

  • table.exec.delta-join.cache-enabled
  • table.exec.delta-join.left.cache-size
  • table.exec.delta-join.right.cache-size

8.4 支持与限制(上线前务必自查)

支持

  • INSERT-only source 表
  • 没有 DELETE 的 CDC source 表
  • delta join 前的 projection / filter
  • 算子内部缓存

限制:出现以下任一情况就不能优化成 delta join

  • 表的 index key 必须在 join 等值条件中
  • 目前只支持 INNER JOIN
  • 下游必须能处理重复变更(例如 UPSERT sink 且没有 upsertMaterialize 时可能不行)
  • CDC 场景:join key 必须是主键的一部分
  • CDC 场景:所有 filter 必须应用在 upsert key 上
  • filter / projection 中不允许非确定性函数

9. 一套“按症下药”的调优落地清单

9.1 你看到 backpressure + RocksDB state 读写很重

优先做

  • 开 MiniBatch(聚合/Join 都考虑)
  • 聚合倾斜明显就上 TWO_PHASE(Local-Global)

9.2 你有 COUNT DISTINCT 且 group key 热点

优先做

  • 开 distinct-agg split(bucket 拆分)
  • UV 多维统计用 FILTER 替换 CASE WHEN,争取共享 state

9.3 你有多表级联 join,state 越跑越大

优先做

  • 开 mini-batch join(先抑制记录放大)
  • Flink 2.1 且满足公共 key 分区条件:尝试 MultiJoin
  • 如果 source 外部系统具备索引能力且满足限制:让 regular join 自动转 delta join(或检查为何没转)

10. 结语:把“默认策略”切到“更适合生产负载的策略”

Flink SQL 的默认执行策略是通用稳妥型,但生产负载往往更偏“状态密集 + 倾斜 + 多表 join + CDC 变更频繁”。你这份调优组合拳的核心路线很清晰:

  • MiniBatch:用吞吐换少量延迟,换来 state IO 大幅下降
  • Local-Global:治倾斜、减 shuffle
  • Distinct 拆分 + FILTER:让 UV 指标也能扩展、还能省 state
  • MiniBatch Join:减少中间结果与冗余输出
  • MultiJoin:多表 join 直接砍中间 state
  • Delta Join:把大 state 变成外部索引查表,长期稳定运行
http://www.jsqmd.com/news/192283/

相关文章:

  • 气血不足免疫力低下?补气血吃什么最好最快?红参+阿胶双效调理,女人秋冬补气血的正确方法?和悦怡深度滋养 - 博客万
  • ReadyPlayerMe创建角色后如何用于HeyGem合成?
  • HeyGem数字人系统部署常见问题解答:网络、浏览器与存储注意事项
  • 制作马头琴音乐节奏游戏,跟着马头琴音乐的节奏点击屏幕。得分高的解锁新曲目。
  • 企业级预报名管理系统管理系统源码|SpringBoot+Vue+MyBatis架构+MySQL数据库【完整版】
  • 新闻播报自动化尝试:将文字转语音+数字人视频一键生成
  • [精品]基于微信小程序的社区论坛系统 UniApp
  • 【IC】多die设计的bump和TSV规划方法
  • 链表专题(一):以退为进的智慧——「移除链表元素」
  • 内置式永磁同步电机IPMSM的最大转矩电流比MTPA控制仿真模型探索
  • Acid Pro循环音乐制作+HeyGem教育内容生产
  • C#系统部署实战精要(从开发到运维的9个关键细节)
  • 揭秘C# 交错数组修改难题:5种实战场景下的最佳解决方案
  • 如何用一行Lambda重构冗长代码?老码农的秘密武器曝光
  • 编写民间艺术知识答题小程序,随机出题,答对积累积分,兑换文创产品优惠券。
  • HeyGem + GPU加速:大幅提升AI数字人视频生成效率的技术方案
  • 开源项目二次开发案例:科哥如何改造原始模型为HeyGem系统
  • C#企业级应用部署难题:如何在3步内完成生产环境零故障发布
  • 腾讯微云存储HeyGem培训素材方便随时取用
  • [精品]基于微信小程序的 任务打卡系统UniApp
  • GESP认证C++编程真题解析 | P14918 [GESP202512 五级] 相等序列
  • 揭秘C#跨平台权限验证难题:5个关键步骤实现安全合规访问
  • 网盘直链下载助手助力HeyGem资源分发:实现快速共享输出视频
  • 解锁本科论文新境界:书匠策AI——你的学术隐形导航仪
  • 人工智能测试工程师,需要掌握哪些真正「能落地」的技能?
  • [精品]基于微信小程序的宠物领养平台 UniApp
  • C# 12顶级语句实战指南(告别传统Main方法的时代)
  • 京东搜索关键词
  • Ableton Live创作旋律+HeyGem生成解说视频配套
  • 从“憋不出致谢”到“逻辑自洽闭环”:一位本科毕业生如何用AI工具重构论文写作流程