数据倾斜的各种原因及处理方案
数据倾斜的本质是Shuffle 过程中 key 分布极度不均,导致个别 Task 处理的数据量远超其他 Task,成为整个作业的短板。
一、业务数据本身分布不均(热点 Key)
例子:搜索日志中统计每个搜索词的点击量,像热门词(如“天气”、“淘宝”)每天上亿次搜索,而其他的只有几十次。
处理:
1. 加盐两阶段聚合
人为将同一个热点 Key 拆成多个“子 Key”,使它们分散到不同的 Reduce 分区先局部聚合,然后再将各局部结果合并为最终结果。局部聚合(加盐分散)+ 全局聚合
-- 第一阶段:加盐局部聚合 WITH salted_agg AS ( select salted_query, COUNT(1) AS local_cnt from ( SELECT -- 热点词加随机后缀 (0~99),非热点词保持不变 CASE WHEN query IN ('天气', '淘宝') THEN CONCAT(query, '_', CAST(FLOOR(RAND() * 100) AS INT)) ELSE query END AS salted_query FROM search_log ) as tmp group by salted_query ) -- 第二阶段:去后缀全局聚合 SELECT CASE WHEN salted_query LIKE '%\\_%' THEN split(salted_query,'_')[0] ELSE salted_query END AS query, SUM(local_cnt) AS total_click FROM salted_agg GROUP BY CASE WHEN salted_query LIKE '%\\_%' THEN split(salted_query,'_')[0] ELSE salted_query END;2. Combiner
在 Map 端预先聚合,减少传输量,但不能根治倾斜。
3. 优化 Reduce 端内存与并行度
增大 Reduce Task 的内存堆空间、提高 shuffle 缓冲区比例、增加并行拷贝线程数等,只能让倾斜任务跑得“不那么容易 OOM”,但无法改变它处理的数据量是别的 Task 几十倍的事实,所以是辅助手段。
二、Join 时连接键严重偏斜
例子:订单表(10 亿行)关联用户表,其中用户user_id=10086拥有 5 亿条订单,其余用户只有几十条。
处理:
1. 小表广播(大表join小表)
如果是大表join小表的情况,可以将小表进行广播到各节点,在 Map 端完成 Join,消除 Shuffle 和倾斜。
| 维度 | Hive(Map Join) | Spark SQL(Broadcast Hash Join) |
|---|---|---|
| 自动转换开关 | hive.auto.convert.join=true | spark.sql.autoBroadcastJoinThreshold非负值 |
| 阈值控制 | hive.mapjoin.smalltable.filesize(25MB) | autoBroadcastJoinThreshold(10MB) |
| 手动 Hint | /*+ MAPJOIN(t) */ | /*+ BROADCAST(t) */ |
| 合并多个小表 | hive.auto.convert.join.noconditionaltask(如果多张表的大小总和不超过阈值,允许将多个 Map Join 合并成一个 Map-Only 任务,进一步提升效率。) | 每个 Join 单独广播,但 AQE 可优化 |
| 底层实现 | Map 端内存哈希表 | Executor 内存广播变量 + 哈希表 |
2. 倾斜键分离(大表 Join 大表)
步骤如下:
步骤 1:识别倾斜 Key
通过采样找出出现次数远超平均值的连接键。例如,发现user_id = 10086是倾斜 Key。
步骤 2:分离数据
将 orders 表和 user 表中的记录按“是否包含倾斜 Key”分别过滤,形成:
倾斜部分:orders_skew (user_id=10086) 和 user_skew (user_id=10086)
正常部分:orders_normal (user_id≠10086) 和 user_normal (user_id≠10086)
步骤 3:正常部分常规 Join
正常部分没有热点,直接走普通的 Shuffle Join 或者 Map Join(如果 user_normal 足够小),不会产生倾斜。
步骤 4:倾斜部分特殊处理——加盐膨胀
对于倾斜部分,采用加盐的思路,但针对 Join 的特点需要将小表膨胀:
对 user_skew 中的那条
user_id=10086记录,复制 N 份(N 如 100),每份加上一个随机后缀,变成user_id_salt = 10086_0, 10086_1, ..., 10086_99。这样小表由 1 条膨胀为 100 条。同时对 orders_skew 中
user_id=10086的订单记录,每条也加上 0~99 之间的随机后缀,生成user_id_salt。现在,两表在
user_id_salt字段上 Join,由于加上了随机后缀,原来的 3 亿条订单被均匀打散到 100 个 Reduce 上,每个 Reduce 处理约 300 万条订单和 1 条用户记录,负载均衡。
步骤 5:合并结果
将 Step 3 和 Step 4 的输出用 Union All 合并,得到完整的 Join 结果。
3. Hive参数调优
hive.optimize.skewjoin=true:自动检测倾斜键,将其分离并启动独立的 Task 处理。hive.skewjoin.key:指定倾斜键阈值(默认 100000),超过的记录数视为倾斜。
4. Spark AQE 自适应优化
spark.sql.adaptive.enabled=true
作用:开启 AQE 总开关。
原理:Spark 在 Shuffle 完成后,不再使用固定的并行度和计划,而是根据每个分区的实际数据量动态进行运行时优化,包括:
合并小分区:将多个数据量很小的分区合并成一个,减少 Task 数量,避免资源浪费。
自动切换 Join 策略:如果发现某张表的数据量在 Shuffle 后变得很小,可实时将 Sort Merge Join 切换为 Broadcast Hash Join。
倾斜分区拆分:允许下面的
skewJoin优化生效。
效果:无需人工调参,让 Spark 根据真实数据流动态决策,提高稳定性和性能。
spark.sql.adaptive.skewJoin.enabled=true
作用:开启 AQE 下的倾斜 Join 自动拆分优化(依赖总开关已开启)。
原理:当 Spark 检测到某个 Join 分区的大小明显超过中位数分区大小的5倍(默认,可配置)时,会将该倾斜分区拆分成多个小分区,并与另一张表对应的数据分别 Join,然后将结果合并。
效果:自动解决 Join 时由于热点键导致某个 Task 数据量过大的数据倾斜问题,无需手动加盐或设计分桶表。
5. 分桶 SMB Join(Sort-Merge Bucket Join)
分桶 SMB Join 是 Hive 中专门为大表 Join 大表设计的一种极致优化手段。它的核心思想是:提前把数据按连接键分桶且排序,Join 时直接按桶进行归并,彻底消除 Shuffle 和 Reduce 阶段。
SMB Join 是一种Map-Only 操作(没有 Reduce),它的执行过程很直接:
表必须是分桶表,且按 Join Key 排序
建表时,用CLUSTERED BY (join_key) SORTED BY (join_key) INTO N BUCKETS定义。这样每个桶内部的数据已经按连接键排好序了。两表的分桶数必须一致或成比例
最常见是两个表桶数相同,这样每个桶 ID 一一对应,直接配对 Join。Join 时,Hive 启动 Map Task
每个 Map Task 会读取两张表中相同桶 ID 的文件(如 orders 的 bucket 0 和 users 的 bucket 0)。因为桶内数据已按 key 排序,Map Task 只需将它们进行归并排序式的归并连接(就像归并两个有序链表),边读边匹配,内存占用很低,且纯顺序 I/O。
示意图:
orders 桶 0 和 users 桶 0 → Map Task 0
orders 桶 1 和 users 桶 1 → Map Task 1
...
没有 Shuffle,没有 Reduce,所有工作都在 Map 端完成。
注意:
两张表都必须是分桶排序表,且分桶键和排序键需是 Join 键。
桶的数量必须相等,或者一方的桶数是另一方的整数倍(此时多个小桶对应一个大桶)。生产上通常设为相同桶数。
如果桶数相等,Join 的桶列数据类型和分桶算法必须一致(避免逻辑桶号不匹配)。
表的数据不能频繁更新/追加,否则需要重新分桶排序维护。
三、分组聚合(Group By)时 Key 不均
例子:按城市分组统计人口,大城如北京、上海的人口是普通县城的千倍,聚合时对应 Reduce 数据量巨大。
处理:
1. Combiner 预聚合(Map 端合并)
Combiner 在 Map 端对<province, 1>做局部求和,输出<province, 局部总数>。这样每个 Map Task 只发少量汇总记录给 Reducer,大幅减少 Shuffle 数据量和倾斜 Reducer 的输入。
2. 加盐两阶段聚合
第一阶段 city 加随机后缀做局部聚合,第二阶段去掉后缀做全局聚合。
3. Hive参数调优
hive.groupby.skewindata是 Hive 中一个专门用于应对Group By 聚合操作造成的数据倾斜的优化参数。当它被设置为true(默认是false)时,Hive 会自动将存在倾斜的聚合操作拆分成两个 MapReduce Job来执行,通过“分步聚合”的方式平衡 Reduce 端的负载
4.Spark AQE 自适应优化
spark.sql.adaptive.enabled=true:开启 AQE 总开关。
四、去重/计数(Distinct)产生的倾斜
例子:SELECT COUNT(DISTINCT user_id) FROM orders;在大表上执行,所有同一 user_id 的去重任务会聚集到一个 Reduce,数据量大时严重倾斜。
处理:
1. 改写为 Group By
COUNT(DISTINCT col)之所以能通过改写为GROUP BY解决倾斜,核心原因在于执行计划的本质差异:前者通常限制在单任务做全局去重,后者可以自然分散到多任务并行,并利用 Map 端预聚合减少数据量。
| 方式 | 典型执行计划 | 数据流向 | 是否可预聚合 |
|---|---|---|---|
COUNT(DISTINCT col) | 所有数据汇集到一个 Reducer,严格去重后输出总条数 | 全量数据单点汇集 | 否(去重必须看到全部数据) |
SELECT COUNT(*) FROM (SELECT col FROM t GROUP BY col) | Map 端先局部去重组合并,Shuffle 到多个 Reducer 做全局分组,最后再统计分组个数 | 数据分散到多 Reducer | 是(Map 端 Combiner 可局部去重) |
2. 加盐两阶段聚合
同前面,局部聚合(加盐分散)+ 全局聚合
3. Hive参数调优
hive.optimize.distinct.rewrite=true(Hive 1.2.0+)
这个参数是为了解决早期版本中COUNT(DISTINCT)将所有数据拉到单个 Reducer 导致性能瓶颈的问题。
🔄作用:它会对查询逻辑进行重写,效果类似于手动将
COUNT(DISTINCT col)改写为SELECT COUNT(*) FROM (SELECT col FROM table GROUP BY col) t。↔️分组场景:如果
GROUP BY字段的基数很高,查询会被拆分为多个 MapReduce 任务,虽然能避免单点瓶颈,但也可能因任务增加而引入额外开销。✅适用场景:主要针对没有 Group By或分组粒度较粗的查询,可以有效缓解单Reducer过载问题。
hive.optimize.countdistinct=true(Hive 3.0.0+,默认开启)
这是Hive 3.0引入的更高级优化,是对前者的补充和增强。
🧠作用:它会根据数据量和基数信息进行基于成本的优化,自动选择最优执行计划。即使原始查询中有多个
count(distinct col),它也能智能地进行多次重写和聚合。📈多维优化:能够处理多列去重、多Grouping Sets等复杂场景,通过多级聚合模式,实现比单一重写规则更好的优化效果。
🐞兼容性:已知在部分版本中,同时开启
hive.optimize.countdistinct与hive.groupby.skewindata可能导致结果错误,如果遇到问题可考虑关闭其中一个参数。
4. Spark AQE自适应优化
spark.sql.adaptive.skewJoin.enabled=true
5.approx_count_distinct
(近似函数):业务允许误差(如1-2%)时,使用approx_count_distinct()完全避开 Shuffle 和倾斜,在 Hive 和 Spark 中均适用且性能极佳。
五、大量空值或脏数据引起倾斜
例子:日志中user_id字段很多为 NULL,导致所有 NULL 记录进入同一 Reduce,数据量巨大。
处理:
1. 直接过滤空值或脏数据
若业务允许,直接过滤掉空值或脏数据。
2.加盐两阶段聚合
若还是需要这部分数据,还是采用加盐两阶段聚合的方式。
六、总结口诀
能过滤的直接过滤(空值、脏数据)
能预聚合的先 Combiner(求和、计数)
小表直接广播(Map Join 消除 Shuffle)
大表加盐打散(两阶段聚合、倾斜键分离膨胀)
引擎有自动优化的优先用(Hive skewindata/skewjoin、Spark AQE)
