Spark SQL 窗口函数完整技术文档
1. 文档概述
本文档系统讲解Apache Spark SQL窗口函数的核心原理、语法体系、实战应用、性能调优及常见问题排查。不仅涵盖基础函数说明,更深入剖析窗口函数的执行机制,提供生产环境中可直接复用的代码模板和问题解决方案。
2. 窗口函数核心原理
2.1 什么是窗口函数
窗口函数是一种行级计算函数,它为每一行定义一个"窗口"(与当前行相关的行集合),在窗口内执行计算并返回结果。与聚合函数的本质区别在于:聚合函数将多行合并为一行,窗口函数保留所有原始行,为每行附加计算结果。
核心价值:
- 无需自连接即可实现跨行比较和计算
- 一行代码完成排名、累计、移动平均等复杂分析
- 比传统SQL写法性能提升3~10倍
- 支持分区内的精细化数据处理
2.2 窗口的三要素
一个完整的窗口由以下三个不可分割的部分组成:
| 要素 | 作用 | 可选性 | 默认值 |
|---|---|---|---|
| 分区(PARTITION BY) | 将数据集划分为多个独立的计算单元 | 可选 | 整个数据集作为一个分区 |
| 排序(ORDER BY) | 定义分区内行的相对顺序 | 可选(排名类函数必须) | 无排序 |
| 框架(FRAME) | 定义当前行的计算范围 | 可选 | 有ORDER BY时:RANGE UNBOUNDED PRECEDING TO CURRENT ROW无ORDER BY时: ROWS UNBOUNDED PRECEDING TO UNBOUNDED FOLLOWING |
2.3 窗口函数的执行流程
- 分区阶段:Spark根据PARTITION BY子句将数据分发到不同的Executor
- 排序阶段:每个Executor内对分区数据按ORDER BY子句排序
- 窗口计算阶段:为每一行创建窗口框架,执行函数计算
- 结果合并阶段:将各分区计算结果合并返回
3. 完整语法与高级用法
3.1 标准语法
window_function([expression[,expression...]])OVER([PARTITIONBYpartition_col1,partition_col2,...][ORDERBYorder_col1[ASC|DESC][NULLSFIRST|LAST],order_col2...][ROWS|RANGEBETWEENframe_startANDframe_end])3.2 框架子句详解
框架是窗口函数最容易出错的部分,必须精确理解其含义。
3.2.1 ROWS框架(基于行号)
按物理行号定义窗口范围,与行的值无关。
-- 包含当前行及前2行(共3行)ROWSBETWEEN2PRECEDINGANDCURRENTROW-- 包含当前行及后1行(共2行)ROWSBETWEENCURRENTROWAND1FOLLOWING-- 包含整个分区ROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING3.2.2 RANGE框架(基于值)
按排序列的值定义窗口范围,包含所有值在指定区间内的行。
-- 包含所有工资在当前员工工资±500范围内的员工RANGEBETWEEN500PRECEDINGAND500FOLLOWING-- 包含所有日期在当前日期前30天内的记录RANGEBETWEENINTERVAL30DAYSPRECEDINGANDCURRENTROW重要区别:当排序列有重复值时,ROWS框架只包含指定数量的行,而RANGE框架会包含所有值相同的行。
3.3 命名窗口与窗口继承
当多个窗口函数使用相同的窗口定义时,使用WINDOW子句可以大幅提高代码可读性和执行效率。
SELECTproduct_id,sale_date,amount,SUM(amount)OVERmonthly_windowASmonthly_total,AVG(amount)OVER(monthly_windowORDERBYsale_dateROWS6PRECEDING)ASweekly_avgFROMsales WINDOW monthly_windowAS(PARTITIONBYproduct_id,EXTRACT(MONTHFROMsale_date)),daily_windowAS(monthly_windowORDERBYsale_date);-- 窗口继承4. 窗口函数分类与实战示例
4.1 排名类函数
用于为分区内的行分配排名,是最常用的窗口函数类型。
| 函数 | 功能 | 特点 | 适用场景 |
|---|---|---|---|
ROW_NUMBER() | 分配唯一连续整数 | 即使值相同,排名也不同 | 去重、取Top N |
RANK() | 跳跃式排名 | 值相同排名相同,后续排名跳跃 | 比赛排名、销售排名 |
DENSE_RANK() | 密集式排名 | 值相同排名相同,后续排名连续 | 等级划分、薪资分级 |
NTILE(n) | 分桶函数 | 将数据均匀分成n个桶 | 数据抽样、百分位划分 |
PERCENT_RANK() | 相对排名 | 取值范围[0,1] | 计算百分比排名 |
完整示例:
SELECTname,department,salary,ROW_NUMBER()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASrow_num,RANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASrank,DENSE_RANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASdense_rank,NTILE(4)OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASquartile,PERCENT_RANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASpercent_rankFROMemployees;4.2 聚合类窗口函数
将普通聚合函数用作窗口函数,在指定窗口范围内执行聚合计算。
常用函数:SUM(),AVG(),COUNT(),MAX(),MIN(),STDDEV(),VARIANCE()
高级示例:
SELECTsale_date,amount,-- 累计销售额SUM(amount)OVER(ORDERBYsale_date)AScumulative_sales,-- 7天移动平均AVG(amount)OVER(ORDERBYsale_dateROWS6PRECEDING)ASmoving_avg_7d,-- 30天滚动总和SUM(amount)OVER(ORDERBYsale_date RANGEINTERVAL30DAYSPRECEDING)ASrolling_30d,-- 与部门平均工资的差值salary-AVG(salary)OVER(PARTITIONBYdepartment)ASsalary_diff_from_avgFROMsales;4.3 偏移类函数
用于获取当前行前后行的值,无需自连接。
| 函数 | 功能 | 参数说明 |
|---|---|---|
LAG(col, n, default) | 获取当前行之前第n行的值 | n: 偏移量(默认1) default: 超出范围时的默认值 |
LEAD(col, n, default) | 获取当前行之后第n行的值 | 同上 |
FIRST_VALUE(col) | 获取窗口内第一行的值 | - |
LAST_VALUE(col) | 获取窗口内最后一行的值 | - |
NTH_VALUE(col, n) | 获取窗口内第n行的值 | n: 行号(从1开始) |
注意:LAST_VALUE()和NTH_VALUE()默认只计算到当前行,必须显式指定框架才能获取整个窗口的结果。
正确用法:
SELECTname,department,salary,FIRST_VALUE(name)OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)AShighest_paid,LAST_VALUE(name)OVER(PARTITIONBYdepartmentORDERBYsalaryDESCROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING)ASlowest_paid,NTH_VALUE(name,2)OVER(PARTITIONBYdepartmentORDERBYsalaryDESCROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING)ASsecond_highestFROMemployees;4.4 统计类函数
用于进行更复杂的统计分析。
| 函数 | 功能 |
|---|---|
CUME_DIST() | 累计分布函数 |
PERCENTILE_CONT(p) | 连续百分位数 |
PERCENTILE_DISC(p) | 离散百分位数 |
5. 生产级应用场景
5.1 同比环比计算
SELECTyear,month,sales,-- 环比增长率ROUND((sales-LAG(sales)OVER(ORDERBYyear,month))/LAG(sales)OVER(ORDERBYyear,month)*100,2)ASmom_growth,-- 同比增长率ROUND((sales-LAG(sales,12)OVER(ORDERBYyear,month))/LAG(sales,12)OVER(ORDERBYyear,month)*100,2)ASyoy_growthFROMmonthly_sales;5.2 连续行为分析
-- 找出连续登录7天以上的用户WITHlogin_groupsAS(SELECTuser_id,login_date,DATE_SUB(login_date,INTERVALROW_NUMBER()OVER(PARTITIONBYuser_idORDERBYlogin_date)DAY)ASgroup_idFROMuser_logins)SELECTuser_id,MIN(login_date)ASstart_date,MAX(login_date)ASend_date,COUNT(*)ASconsecutive_daysFROMlogin_groupsGROUPBYuser_id,group_idHAVINGCOUNT(*)>=7;5.3 分组Top N查询
-- 每个类别销售额前3的商品WITHranked_productsAS(SELECTcategory,product_name,total_sales,ROW_NUMBER()OVER(PARTITIONBYcategoryORDERBYtotal_salesDESC)ASrankFROMproduct_sales)SELECTcategory,product_name,total_salesFROMranked_productsWHERErank<=3;5.4 数据去重
-- 保留每个用户最新的一条记录WITHranked_recordsAS(SELECT*,ROW_NUMBER()OVER(PARTITIONBYuser_idORDERBYupdate_timeDESC)ASrnFROMuser_info)SELECT*EXCEPT(rn)FROMranked_recordsWHERErn=1;6. 性能优化最佳实践
6.1 分区优化
- 选择高基数分区键:使数据均匀分布在各个Executor
- 避免数据倾斜:如果某个分区数据量过大,考虑拆分或加盐
- 控制分区大小:每个分区建议128MB~1GB,过小会增加调度开销,过大会导致OOM
6.2 排序优化
- 减少排序列数量:只保留必要的排序列
- 利用已有排序:如果数据已经按排序列存储(如分区表),Spark会自动跳过排序
- 设置合理的并行度:通过
spark.sql.shuffle.partitions调整shuffle并行度
6.3 窗口复用
- 使用命名窗口:多个函数使用相同窗口定义时,Spark只计算一次
- 避免重复定义:不要为相同的窗口定义写多次OVER子句
6.4 框架优化
- 精确指定框架范围:不要使用默认的全分区框架
- 优先使用ROWS框架:比RANGE框架性能更好
- 避免大窗口:窗口越大,内存消耗越高
7. 常见错误与解决方法
7.1 语法错误类
错误1:排名函数缺少ORDER BY子句
错误信息:
org.apache.spark.sql.AnalysisException: Window function RANK() requires an ORDER BY clause.原因:RANK(),DENSE_RANK(),ROW_NUMBER()等排名函数必须指定ORDER BY子句。
解决方法:添加ORDER BY子句定义排名顺序。
-- 错误SELECTRANK()OVER(PARTITIONBYdepartment)FROMemployees;-- 正确SELECTRANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)FROMemployees;错误2:在WHERE子句中使用窗口函数
错误信息:
org.apache.spark.sql.AnalysisException: Window functions are not allowed in WHERE clause.原因:SQL执行顺序中,WHERE子句在窗口函数之前执行,无法引用窗口函数的结果。
解决方法:使用子查询或CTE先计算窗口函数,再在外部查询中过滤。
-- 错误SELECTname,salaryFROMemployeesWHERERANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)<=3;-- 正确WITHranked_employeesAS(SELECTname,salary,RANK()OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)ASrankFROMemployees)SELECTname,salaryFROMranked_employeesWHERErank<=3;错误3:在GROUP BY子句中使用窗口函数
错误信息:
org.apache.spark.sql.AnalysisException: Window functions are not allowed in GROUP BY clause.原因:GROUP BY在窗口函数之前执行,无法引用窗口函数的结果。
解决方法:先执行窗口函数,再进行分组聚合。
7.2 逻辑错误类
错误4:LAST_VALUE()返回当前行而非最后一行
问题现象:LAST_VALUE()总是返回当前行的值。
原因:默认框架是RANGE UNBOUNDED PRECEDING TO CURRENT ROW,窗口只包含到当前行。
解决方法:显式指定框架为整个分区。
-- 错误SELECTLAST_VALUE(name)OVER(PARTITIONBYdepartmentORDERBYsalaryDESC)FROMemployees;-- 正确SELECTLAST_VALUE(name)OVER(PARTITIONBYdepartmentORDERBYsalaryDESCROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING)FROMemployees;错误5:RANK()和ROW_NUMBER()混淆使用
问题现象:去重时保留了多条相同值的记录,或Top N查询结果不符合预期。
原因:RANK()会为相同值分配相同排名,而ROW_NUMBER()总是分配唯一排名。
解决方法:
- 去重时使用
ROW_NUMBER() - 需要保留并列排名时使用
RANK()或DENSE_RANK()
错误6:RANGE框架使用不当导致结果异常
问题现象:窗口包含的行数比预期多。
原因:RANGE框架基于值而非行号,当排序列有重复值时,会包含所有值相同的行。
解决方法:如果需要精确控制行数,使用ROWS框架。
7.3 性能问题类
错误7:窗口函数执行缓慢
可能原因:
- 分区键选择不当导致数据倾斜
- 没有使用命名窗口导致重复计算
- 窗口框架过大导致内存消耗高
- shuffle并行度设置不合理
解决方法:
- 检查数据分布,调整分区键
- 使用WINDOW子句复用窗口定义
- 缩小窗口框架范围
- 增加
spark.sql.shuffle.partitions的值
错误8:Executor OOM(内存溢出)
可能原因:
- 单个分区数据量过大
- 窗口框架包含整个分区
- 同时执行多个大窗口函数
解决方法:
- 拆分大分区,增加分区数
- 优化窗口框架,只包含必要的行
- 拆分查询,分步骤执行
7.4 数据倾斜问题
错误9:某个分区执行时间过长
问题现象:大部分任务很快完成,但有1~2个任务一直卡在99%。
原因:分区键分布不均匀,某些分区数据量远大于其他分区。
解决方法:
- 加盐法:给倾斜的分区键添加随机后缀,拆分后再合并
- 过滤空值:如果大量行的分区键为null,先过滤再处理
- 动态分区:使用
DISTRIBUTE BY RAND()随机分发数据
加盐法示例:
-- 处理用户ID倾斜的情况WITHsalted_dataAS(SELECT*,-- 给倾斜的user_id添加0~9的随机后缀CONCAT(user_id,'_',FLOOR(RAND()*10))ASsalted_user_idFROMuser_behaviorWHEREuser_id='skewed_user_id'UNIONALLSELECT*,user_idASsalted_user_idFROMuser_behaviorWHEREuser_id!='skewed_user_id')SELECTCASEWHENsalted_user_idLIKE'skewed_user_id_%'THEN'skewed_user_id'ELSEsalted_user_idENDASuser_id,COUNT(*)AScntFROMsalted_dataGROUPBYsalted_user_id;8. 总结
Spark SQL窗口函数是数据分析的强大工具,掌握它可以大幅提升数据处理效率和代码质量。在使用过程中,需要特别注意:
- 理解窗口三要素(分区、排序、框架)的作用
- 避免常见的语法和逻辑错误
- 关注性能优化,特别是数据倾斜问题
- 优先使用命名窗口提高代码可读性和执行效率
