当前位置: 首页 > news >正文

数据倾斜的各种原因及处理方案

数据倾斜的本质是Shuffle 过程中 key 分布极度不均,导致个别 Task 处理的数据量远超其他 Task,成为整个作业的短板。

一、业务数据本身分布不均(热点 Key)

例子:搜索日志中统计每个搜索词的点击量,像热门词(如“天气”、“淘宝”)每天上亿次搜索,而其他的只有几十次。
处理

1. 加盐两阶段聚合

人为将同一个热点 Key 拆成多个“子 Key”,使它们分散到不同的 Reduce 分区先局部聚合,然后再将各局部结果合并为最终结果。局部聚合(加盐分散)+ 全局聚合

-- 第一阶段:加盐局部聚合 WITH salted_agg AS ( select salted_query, COUNT(1) AS local_cnt from ( SELECT -- 热点词加随机后缀 (0~99),非热点词保持不变 CASE WHEN query IN ('天气', '淘宝') THEN CONCAT(query, '_', CAST(FLOOR(RAND() * 100) AS INT)) ELSE query END AS salted_query FROM search_log ) as tmp group by salted_query ) -- 第二阶段:去后缀全局聚合 SELECT CASE WHEN salted_query LIKE '%\\_%' THEN split(salted_query,'_')[0] ELSE salted_query END AS query, SUM(local_cnt) AS total_click FROM salted_agg GROUP BY CASE WHEN salted_query LIKE '%\\_%' THEN split(salted_query,'_')[0] ELSE salted_query END;

2. Combiner

在 Map 端预先聚合,减少传输量,但不能根治倾斜。

3. 优化 Reduce 端内存与并行度

增大 Reduce Task 的内存堆空间、提高 shuffle 缓冲区比例、增加并行拷贝线程数等,只能让倾斜任务跑得“不那么容易 OOM”,但无法改变它处理的数据量是别的 Task 几十倍的事实,所以是辅助手段。


二、Join 时连接键严重偏斜

例子:订单表(10 亿行)关联用户表,其中用户user_id=10086拥有 5 亿条订单,其余用户只有几十条。
处理

1. 小表广播(大表join小表)

如果是大表join小表的情况,可以将小表进行广播到各节点,在 Map 端完成 Join,消除 Shuffle 和倾斜。

维度Hive(Map Join)Spark SQL(Broadcast Hash Join)
自动转换开关hive.auto.convert.join=truespark.sql.autoBroadcastJoinThreshold非负值
阈值控制hive.mapjoin.smalltable.filesize(25MB)autoBroadcastJoinThreshold(10MB)
手动 Hint/*+ MAPJOIN(t) *//*+ BROADCAST(t) */
合并多个小表hive.auto.convert.join.noconditionaltask(如果多张表的大小总和不超过阈值,允许将多个 Map Join 合并成一个 Map-Only 任务,进一步提升效率。每个 Join 单独广播,但 AQE 可优化
底层实现Map 端内存哈希表Executor 内存广播变量 + 哈希表

2. 倾斜键分离(大表 Join 大表)

步骤如下

步骤 1:识别倾斜 Key
通过采样找出出现次数远超平均值的连接键。例如,发现user_id = 10086是倾斜 Key。

步骤 2:分离数据
将 orders 表和 user 表中的记录按“是否包含倾斜 Key”分别过滤,形成:

  • 倾斜部分:orders_skew (user_id=10086) 和 user_skew (user_id=10086)

  • 正常部分:orders_normal (user_id≠10086) 和 user_normal (user_id≠10086)

步骤 3:正常部分常规 Join
正常部分没有热点,直接走普通的 Shuffle Join 或者 Map Join(如果 user_normal 足够小),不会产生倾斜。

步骤 4:倾斜部分特殊处理——加盐膨胀
对于倾斜部分,采用加盐的思路,但针对 Join 的特点需要将小表膨胀

  • 对 user_skew 中的那条user_id=10086记录,复制 N 份(N 如 100),每份加上一个随机后缀,变成user_id_salt = 10086_0, 10086_1, ..., 10086_99。这样小表由 1 条膨胀为 100 条。

  • 同时对 orders_skew 中user_id=10086的订单记录,每条也加上 0~99 之间的随机后缀,生成user_id_salt

  • 现在,两表在user_id_salt字段上 Join,由于加上了随机后缀,原来的 3 亿条订单被均匀打散到 100 个 Reduce 上,每个 Reduce 处理约 300 万条订单和 1 条用户记录,负载均衡。

步骤 5:合并结果
将 Step 3 和 Step 4 的输出用 Union All 合并,得到完整的 Join 结果。

3. Hive参数调优

  • hive.optimize.skewjoin=true:自动检测倾斜键,将其分离并启动独立的 Task 处理。
  • hive.skewjoin.key:指定倾斜键阈值(默认 100000),超过的记录数视为倾斜。

4. Spark AQE 自适应优化

spark.sql.adaptive.enabled=true
  • 作用:开启 AQE 总开关。

  • 原理:Spark 在 Shuffle 完成后,不再使用固定的并行度和计划,而是根据每个分区的实际数据量动态进行运行时优化,包括:

    • 合并小分区:将多个数据量很小的分区合并成一个,减少 Task 数量,避免资源浪费。

    • 自动切换 Join 策略:如果发现某张表的数据量在 Shuffle 后变得很小,可实时将 Sort Merge Join 切换为 Broadcast Hash Join。

    • 倾斜分区拆分:允许下面的skewJoin优化生效。

  • 效果:无需人工调参,让 Spark 根据真实数据流动态决策,提高稳定性和性能。

spark.sql.adaptive.skewJoin.enabled=true
  • 作用:开启 AQE 下的倾斜 Join 自动拆分优化(依赖总开关已开启)。

  • 原理:当 Spark 检测到某个 Join 分区的大小明显超过中位数分区大小的5倍(默认,可配置)时,会将该倾斜分区拆分成多个小分区,并与另一张表对应的数据分别 Join,然后将结果合并。

  • 效果:自动解决 Join 时由于热点键导致某个 Task 数据量过大的数据倾斜问题,无需手动加盐或设计分桶表。

5. 分桶 SMB Join(Sort-Merge Bucket Join)

分桶 SMB Join 是 Hive 中专门为大表 Join 大表设计的一种极致优化手段。它的核心思想是:提前把数据按连接键分桶且排序,Join 时直接按桶进行归并,彻底消除 Shuffle 和 Reduce 阶段

SMB Join 是一种Map-Only 操作(没有 Reduce),它的执行过程很直接:

  1. 表必须是分桶表,且按 Join Key 排序
    建表时,用CLUSTERED BY (join_key) SORTED BY (join_key) INTO N BUCKETS定义。这样每个桶内部的数据已经按连接键排好序了。

  2. 两表的分桶数必须一致或成比例
    最常见是两个表桶数相同,这样每个桶 ID 一一对应,直接配对 Join。

  3. Join 时,Hive 启动 Map Task
    每个 Map Task 会读取两张表中相同桶 ID 的文件(如 orders 的 bucket 0 和 users 的 bucket 0)。因为桶内数据已按 key 排序,Map Task 只需将它们进行归并排序式的归并连接(就像归并两个有序链表),边读边匹配,内存占用很低,且纯顺序 I/O。

示意图

  • orders 桶 0 和 users 桶 0 → Map Task 0

  • orders 桶 1 和 users 桶 1 → Map Task 1

  • ...

没有 Shuffle,没有 Reduce,所有工作都在 Map 端完成。

注意:

  • 两张表都必须是分桶排序表,且分桶键和排序键需是 Join 键。

  • 桶的数量必须相等,或者一方的桶数是另一方的整数倍(此时多个小桶对应一个大桶)。生产上通常设为相同桶数

  • 如果桶数相等,Join 的桶列数据类型和分桶算法必须一致(避免逻辑桶号不匹配)。

  • 表的数据不能频繁更新/追加,否则需要重新分桶排序维护。


三、分组聚合(Group By)时 Key 不均

例子:按城市分组统计人口,大城如北京、上海的人口是普通县城的千倍,聚合时对应 Reduce 数据量巨大。
处理

1. Combiner 预聚合(Map 端合并)

Combiner 在 Map 端对<province, 1>做局部求和,输出<province, 局部总数>。这样每个 Map Task 只发少量汇总记录给 Reducer,大幅减少 Shuffle 数据量和倾斜 Reducer 的输入。

2. 加盐两阶段聚合

第一阶段 city 加随机后缀做局部聚合,第二阶段去掉后缀做全局聚合。

3. Hive参数调优

hive.groupby.skewindata是 Hive 中一个专门用于应对Group By 聚合操作造成的数据倾斜的优化参数。当它被设置为true(默认是false)时,Hive 会自动将存在倾斜的聚合操作拆分成两个 MapReduce Job来执行,通过“分步聚合”的方式平衡 Reduce 端的负载

4.Spark AQE 自适应优化

spark.sql.adaptive.enabled=true:开启 AQE 总开关。


四、去重/计数(Distinct)产生的倾斜

例子SELECT COUNT(DISTINCT user_id) FROM orders;在大表上执行,所有同一 user_id 的去重任务会聚集到一个 Reduce,数据量大时严重倾斜。
处理

1. 改写为 Group By

COUNT(DISTINCT col)之所以能通过改写为GROUP BY解决倾斜,核心原因在于执行计划的本质差异:前者通常限制在单任务做全局去重,后者可以自然分散到多任务并行,并利用 Map 端预聚合减少数据量。

方式典型执行计划数据流向是否可预聚合
COUNT(DISTINCT col)所有数据汇集到一个 Reducer,严格去重后输出总条数全量数据单点汇集(去重必须看到全部数据)
SELECT COUNT(*) FROM (SELECT col FROM t GROUP BY col)Map 端先局部去重组合并,Shuffle 到多个 Reducer 做全局分组,最后再统计分组个数数据分散到多 Reducer(Map 端 Combiner 可局部去重)

2. 加盐两阶段聚合

同前面,局部聚合(加盐分散)+ 全局聚合

3. Hive参数调优

hive.optimize.distinct.rewrite=true(Hive 1.2.0+)

这个参数是为了解决早期版本中COUNT(DISTINCT)将所有数据拉到单个 Reducer 导致性能瓶颈的问题。

  • 🔄作用:它会对查询逻辑进行重写,效果类似于手动将COUNT(DISTINCT col)改写为SELECT COUNT(*) FROM (SELECT col FROM table GROUP BY col) t

  • ↔️分组场景:如果GROUP BY字段的基数很高,查询会被拆分为多个 MapReduce 任务,虽然能避免单点瓶颈,但也可能因任务增加而引入额外开销。

  • 适用场景:主要针对没有 Group By或分组粒度较粗的查询,可以有效缓解单Reducer过载问题。

hive.optimize.countdistinct=true(Hive 3.0.0+,默认开启)

这是Hive 3.0引入的更高级优化,是对前者的补充和增强。

  • 🧠作用:它会根据数据量和基数信息进行基于成本的优化,自动选择最优执行计划。即使原始查询中有多个count(distinct col),它也能智能地进行多次重写和聚合。

  • 📈多维优化:能够处理多列去重、多Grouping Sets等复杂场景,通过多级聚合模式,实现比单一重写规则更好的优化效果。

  • 🐞兼容性:已知在部分版本中,同时开启hive.optimize.countdistincthive.groupby.skewindata可能导致结果错误,如果遇到问题可考虑关闭其中一个参数。

4. Spark AQE自适应优化

spark.sql.adaptive.skewJoin.enabled=true

5.approx_count_distinct

(近似函数):业务允许误差(如1-2%)时,使用approx_count_distinct()完全避开 Shuffle 和倾斜,在 Hive 和 Spark 中均适用且性能极佳。


五、大量空值或脏数据引起倾斜

例子:日志中user_id字段很多为 NULL,导致所有 NULL 记录进入同一 Reduce,数据量巨大。
处理

1. 直接过滤空值或脏数据

若业务允许,直接过滤掉空值或脏数据。

2.加盐两阶段聚合

若还是需要这部分数据,还是采用加盐两阶段聚合的方式。


六、总结口诀

  • 能过滤的直接过滤(空值、脏数据)

  • 能预聚合的先 Combiner(求和、计数)

  • 小表直接广播(Map Join 消除 Shuffle)

  • 大表加盐打散(两阶段聚合、倾斜键分离膨胀)

  • 引擎有自动优化的优先用(Hive skewindata/skewjoin、Spark AQE)

http://www.jsqmd.com/news/780347/

相关文章:

  • 数字电源深度标准化:从PMBus到系统互操作的技术挑战与路径
  • 企业官网技术演进路径:从静态展示到数据驱动获客的架构升级
  • MacBook Touch Bar Windows驱动完全指南:解锁跨系统触控交互的终极方案
  • ARM Core Tile与仿真基板系统架构解析
  • 企业级AI智能体开发实战:基于Astron Agent的工作流编排与RPA集成
  • 视频人脸打码软件工具
  • 基于大语言模型的AI论文审阅助手ChatReviewer:从原理到部署实践
  • 基于 Grafana 探索云端监控的艺术:从零开始的实战演练
  • GdUnit3嵌入式单元测试框架:在Godot引擎中实现高效代码验证
  • Go语言四层负载均衡器Nekot:云原生环境下的高性能流量分发实践
  • MRC(多路径可靠连接)协议
  • Product Hunt 每日热榜 | 2026-05-08
  • 一键安装 OpenClaw 全程图文教程 | 无需命令行
  • Figma中文界面插件:让全球顶尖设计工具真正为你所用
  • 基于MCP协议构建苹果官方文档智能查询系统
  • 头歌MySQL-基于电影、演员及票房应用的数据查询(Select)
  • 顶俏模式商城系统开发 单层直推积分流转架构解析
  • ARM链接器核心概念与优化实践指南
  • GEO优化干货分享:GEO品牌优化很重要,请收藏!
  • 1瓦x86处理器在嵌入式系统的低功耗实战
  • JAVA-实战8 Redis实战项目—雷神点评(12)UV统计
  • 传奇游戏|热血传奇|复古传奇|电脑版传奇网页游戏|复古传奇游戏玩与攻略|602游戏剖析
  • 嵌入式系统电源优化:CMOS功耗分析与DVFS技术实践
  • AI编程助手高效配置指南:Cursor与Claude Code专属工具箱实战
  • Ubuntu下载地址
  • 从2D到3D NAND:存储技术演进、控制器挑战与未来展望
  • Qoder Reset工具:彻底清除AI编程助手本地身份与指纹数据
  • Redis别再只当缓存用!8种常用数据结构+实战选型,一看就会
  • Suno Style API 集成教程
  • 从硬连线到软定义:可编程逻辑器件(PAL/CPLD/FPGA)演进史与技术解析