当前位置: 首页 > news >正文

Spark动态优化机制:AQE与DPP

1 AQE

1.1 AQE诞生的背景

Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。

RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列裁剪,这些规则和策略来源于数据库领域已有的应用经验。RBO实际上算是一种经验主义。经验主义的弊端就是对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO 是基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO 支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值等。因为有统计数据做支持,所以 CBO 选择的优化策略往往优于 RBO 选择的优化规则。
但是,CBO 也有三个方面的不足:
(1)、适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但在其他的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如 Parquet、ORC、CSV 等。
(2)、统计信息的搜集效率比较低。对于注册到 Hive Metastore 的数据表,开发者需要调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息,而各类信息的收集会消耗大量时间。
(3)、静态优化,RBO、CBO执行计划一旦制定完成,就会按照该计划坚定不移地执行;如果在运行时数据分布发生动态变化,先前制定的执行计划并不会跟着调整、适配。

考虑到 RBO 和 CBO 的种种限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自适应查询执行)。用一句话来概括,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。

1.2 AQE如何使用?

AQE三大特性:自动分区合并 、自动数据倾斜处理、Join 策略调整。

1.2.1 自动分区合并(Partition Coalescing)‌

‌场景‌:当Shuffle后产生大量小分区(如文件数过多或数据分布不均),导致任务调度开销过大或资源浪费。
‌示例‌:对日志数据进行GROUP BY聚合操作时,若原始数据被分为1000个小文件,AQE会动态合并相邻小分区,减少下游Task数量,避免因Task过多导致资源争抢或调度延迟‌。
-- 启用AQE后,Spark自动合并小分区 SET spark.sql.adaptive.enabled=true; SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;

1.2.2 Join策略动态切换(Switch Join Strategy)‌

‌场景‌:静态优化器选择的Join策略(如Sort-Merge Join)在运行时因数据量变化不再高效。
‌示例‌:
表A(1亿条)与表B(过滤后仅1万条)进行Join时,AQE在运行时检测到表B数据量极小,自动将Sort-Merge Join切换为Broadcast Join,减少Shuffle开销‌。
-- 动态切换为Broadcast Join SELECT /*+ MERGE(a, b) */ * FROM large_table a JOIN small_table b ON a.id = b.id;

1.2.3 倾斜Join优化(Optimize Skew Join)‌

‌场景‌:Join操作中某些Key的数据量极大(如热门商品或用户行为数据倾斜),导致单个Task处理时间过长。
‌示例‌:用户行为表与商品表Join时,商品ID为“hot_item_123”的记录占总量80%。AQE将该Key对应的数据拆分为多个子分区,并行处理以平衡负载‌。
-- 自动拆分倾斜Key SET spark.sql.adaptive.skewJoin.enabled=true; SELECT a.user_id, b.item_name FROM user_actions a JOIN items b ON a.item_id = b.item_id;

1.2.4 动态优化聚合操作‌

‌场景‌:聚合操作(如GROUP BY)因数据分布不均导致部分Task内存不足。
‌示例‌:统计每个城市的订单总金额时,若北上广深数据量远超其他城市,AQE会动态调整Hash聚合为Sort聚合,减少内存压力‌。
‌启用AQE的核心配置‌
`
-- 基础配置
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true; -- 合并小分区
SET spark.sql.adaptive.skewJoin.enabled=true; -- 倾斜Join优化

-- 高级调优(根据集群规模调整)
SET spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB; -- 分区合并目标大小
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB; -- 倾斜判定阈值
`

1.3 AQE适用场景

场景 问题特征 AQE优化动作 触发条件 配置参数
小文件过多 大量小分区导致Task数量激增,调度开销大 动态合并相邻分区,减少Task数量 spark.sql.adaptive.coalescePartitions.enabled=true
分区大小 < spark.sql.adaptive.advisoryPartitionSizeInBytes
spark.sql.adaptive.coalescePartitions.enabled
spark.sql.adaptive.advisoryPartitionSizeInBytes
spark.sql.adaptive.coalescePartitions.parallelismFirst
Join策略选择错误 大表Join小表时静态优化器未选择广播Join 运行时检测小表大小,切换为Broadcast Join 小表大小 ≤ spark.sql.adaptive.localShuffleReader.enabled
Join类型适合广播
spark.sql.adaptive.localShuffleReader.enabled
spark.sql.autoBroadcastJoinThreshold
数据倾斜严重 单个Task处理时间远高于其他Task,存在长尾任务 拆分倾斜Key数据到多个Task处理 Task处理时间 > 中位数 × N倍
数据倾斜度超过阈值
spark.sql.adaptive.skewJoin.enabled
spark.sql.adaptive.skewJoin.skewedPartitionFactor
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
聚合内存压力大 哈希聚合导致OOM,内存使用过高 切换为Sort聚合或调整分区大小 聚合内存使用超过阈值
分区数据分布不均匀
spark.sql.adaptive.hashAggregate.enabled

1.4 AQE配置调优参考表

优化目标 关键配置参数 推荐值 说明
小文件合并 spark.sql.adaptive.coalescePartitions.enabled true 启用分区合并
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB 目标分区大小
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 最小分区大小
spark.sql.adaptive.coalescePartitions.parallelismFirst false 优先保证并行度
数据倾斜处理 spark.sql.adaptive.skewJoin.enabled true 启用倾斜Join优化
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 倾斜分区判定因子
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 倾斜分区阈值
spark.sql.adaptive.forceOptimizeSkewedJoin true 强制优化倾斜Join
Join策略优化 spark.sql.adaptive.localShuffleReader.enabled true 启用本地Shuffle读取
spark.sql.autoBroadcastJoinThreshold 10MB 广播Join阈值
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 本地Map Join阈值
通用优化 spark.sql.adaptive.enabled true 启用AQE
spark.sql.adaptive.logLevel INFO AQE日志级别
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2 非空分区比例阈值

1.5 AQE版本演进特性

Spark版本 新增AQE特性 解决的问题
Spark 3.0 • 动态合并Shuffle分区
• 动态切换Join策略
• 动态优化数据倾斜
初步解决静态优化器的局限性
Spark 3.1 • 动态分区剪裁(DPP)
• 优化Shuffle分区数
进一步提升复杂查询性能
Spark 3.2 • 运行时过滤优化
• 增强的倾斜处理
优化Join和过滤性能
Spark 3.3 • 自适应查询合并
• 更智能的广播超时处理
减少重复计算,增强稳定性
Spark 3.4+ • AI驱动的优化建议
• 更细粒度的自适应控制
智能化优化决策

‌注‌:AQE在Spark 3.0+版本默认启用,需结合集群资源和数据特征调整参数以最大化性能‌。

2 DPP

2.1 什么是DPP

DPP 是 Spark 针对分区表 Join / 过滤的优化机制,核心是在运行时动态推导分区裁剪条件,只扫描符合条件的分区,避免全表扫描。对比静态分区裁剪(仅能基于 SQL 中显式的过滤条件裁剪),DPP 的核心优势:

  • 静态裁剪:仅能识别 where dt='2026-01-01' 这类显式条件
  • DPP 裁剪:若查询是 select * from A join B on A.dt = B.dt where A.id=100,DPP 会先执行 A 表的过滤(id=100),得到 A 表的 dt 取值(如 2026-01-01),再将该条件动态应用到 B 表的分区裁剪,只扫描 B 表 dt=2026-01-01 的分区

2.2 开启DPP

spark.sql.optimizer.dynamicPartitionPruning.enabled

2.3 触发条件

注意:这个动态分区裁剪操作默认是开启的,但是触发动态分区裁剪是需要一些条件的。
(1)需要裁剪的表必须是分区表,并且分区字段必须在 Join 中的 on 条件里面
(2)Join 类型必须是 Inner Join、Left Semi Join、Left Outer Join 或者 Right Outer Join

  • 针对 Inner Join:Join 操作左边的表、右边的表可以都是分区表,或者只有某一个表是分区表,至少要有一个表是分区表,这样才能支持裁剪
  • 针对 Left Semi Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪
  • 针对 Left Outer Join:需要保证 Join 操作右边的表是分区表,这样才能支持裁剪
  • 针对 Right Outer Join:需要保证 Join 操作左边的表是分区表,这样才能支持裁剪
    (3)另一张表里面需要至少存在一个过滤条件

https://ggwujun.github.io/blog-architect/spark性能调优实战/04.spark-sql性能调优篇/05/
https://www.cnblogs.com/yeyuzhuanjia/p/18791016
https://cloud.tencent.com/developer/article/1922959
https://juejin.cn/post/7504968076557205513
https://www.hangge.com/blog/cache/detail_3633.html

http://www.jsqmd.com/news/194949/

相关文章:

  • 手机号码归属地查询终极指南:快速定位地理位置的开源解决方案
  • 如何快速掌握BetterJoy:让Switch手柄在PC上完美运行的完整指南
  • 闲鱼自动化终极指南:3步搭建智能运营系统
  • Universal-x86-Tuning-Utility终极指南:快速掌握x86设备性能调优
  • DownKyi视频下载神器:从零基础到高效管理的终极指南
  • Downkyi哔哩下载神器:新手必备的8大实用技巧
  • LeaguePrank终极指南:5分钟掌握英雄联盟客户端个性化技巧
  • 视频下载工具终极指南:5大场景解锁批量管理新境界
  • League Akari英雄联盟智能助手:5大核心功能深度评测与实战指南
  • Unity自动翻译黑科技:XUnity.AutoTranslator闪电部署与实战指南
  • 百度网盘解析工具:3步获取高速下载地址,告别限速困扰
  • 碧蓝航线自动化脚本实战指南:从零基础到精通配置
  • 终极星露谷物语XNB文件编辑指南:从入门到精通
  • 终极指南:如何通过手机号码实现精准地理位置追踪
  • 揭秘DownKyi:3分钟掌握B站8K视频下载与智能管理技巧
  • 英雄联盟智能游戏管家:懒人玩家的专属神器
  • 特价股票投资中的情绪因子分析与应用
  • 如何突破百度网盘限速:免费解析工具让下载速度飙升10倍
  • 3分钟搞定Windows右键菜单:ContextMenuManager终极清理指南 [特殊字符]
  • uniapp 苹果支付
  • DownKyi视频获取工具完全操作手册:专业级媒体内容管理方案
  • 终极解决方案:Alas碧蓝航线自动化脚本完全配置手册
  • 英雄联盟全能助手LeagueAkari:5大实用功能提升你的游戏体验
  • 英雄联盟辅助工具LeagueAkari:新手玩家的智能游戏伙伴完全指南
  • 深度学习计算机毕设之基于人工智能CBAM-CNN的故障汽车检测
  • Dotenv 环境变量管理完全指南
  • 闲鱼自动化终极指南:3分钟搭建系统,每天稳定获取200闲鱼币
  • LAV Filters终极指南:为什么说它是Windows播放体验的完美解决方案?
  • XUnity自动翻译插件:3分钟掌握Unity游戏无障碍翻译终极方案
  • 突破语言壁垒:XUnity自动翻译让外文游戏秒变中文版