当前位置: 首页 > news >正文

Spark 行动算子(Action)全面解析

Spark 行动算子(Action)全面解析

摘要:本文系统梳理 Spark 中的行动算子,涵盖触发机制、常用 API 分类、执行原理与实际使用场景,帮助你真正理解 Action 背后发生了什么。


一、为什么要讲行动算子?

在 Spark 的编程模型中,算子分为两大类:

类型特点是否触发计算
转换算子(Transformation)惰性求值,返回新 RDD❌ 不触发
行动算子(Action)触发 DAG 执行,返回结果或写出数据✅ 触发

核心理解:每一次调用 Action,Spark 就会向 Driver 提交一个Job,Driver 将 DAG 切分成若干Stage,Stage 内部拆分为Task并分发到各 Executor 执行。

调用 Action └─► 提交 Job └─► DAG Scheduler 切分 Stage └─► Task Scheduler 分发 Task └─► Executor 执行 → 结果返回 Driver

二、行动算子分类总览

Action 算子 ├── 聚合类 collect / count / countByKey / countByValue ├── 取值类 first / take / takeOrdered / takeSample ├── 归约类 reduce / fold / aggregate ├── 遍历类 foreach / foreachPartition ├── 保存类 saveAsTextFile / saveAsObjectFile / saveAsSequenceFile └── 统计类 max / min / sum / mean / variance / stdev

三、常用行动算子详解

3.1 collect()

将 RDD 中所有数据收集到 Driver 端,返回Array

valrdd=sc.parallelize(List(1,2,3,4,5))valresult=rdd.collect()// result: Array[Int] = Array(1, 2, 3, 4, 5)

⚠️注意:生产环境中慎用!数据量过大会导致 Driver OOM。仅适合数据量可控的场景(如测试、调试)。


3.2 count()

返回 RDD 中元素的总个数。

valrdd=sc.parallelize(List("a","b","c","a"))println(rdd.count())// 4

3.3 countByKey()

仅适用于PairRDD (K, V),统计每个 Key 出现的次数,返回Map[K, Long]

valrdd=sc.parallelize(List(("a",1),("b",2),("a",3)))valresult=rdd.countByKey()// result: Map(a -> 2, b -> 1)

3.4 countByValue()

统计 RDD 中每个元素出现的次数,返回Map[T, Long]

valrdd=sc.parallelize(List("apple","banana","apple","orange"))valresult=rdd.countByValue()// result: Map(apple -> 2, banana -> 1, orange -> 1)

3.5 first()

返回 RDD 中的第一个元素,等价于take(1)(0)

valrdd=sc.parallelize(List(10,20,30))println(rdd.first())// 10

3.6 take(n)

返回 RDD 前 n 个元素组成的数组,不保证顺序(按分区顺序扫描)。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.take(3)// Array(5, 3, 1)

3.7 takeOrdered(n)

返回 RDD 中最小的 n 个元素(升序),可自定义排序规则。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.takeOrdered(3)// Array(1, 2, 3)rdd.takeOrdered(3)(Ordering[Int].reverse)// Array(5, 4, 3) 降序

take的区别:takeOrdered会在每个 Partition 局部排序后再归并,效率优于sortBy + take


3.8 reduce(func)

通过一个二元函数对 RDD 所有元素进行归约,要求函数满足交换律和结合律

valrdd=sc.parallelize(List(1,2,3,4,5))valsum=rdd.reduce((a,b)=>a+b)// 15valmax=rdd.reduce((a,b)=>if(a>b)aelseb)// 5

执行过程:先在每个 Partition 内部归约,再将各 Partition 结果汇总到 Driver 做最终归约。


3.9 aggregate(zeroValue)(seqOp, combOp)

aggregate是最通用的归约算子,允许返回值类型与输入类型不同

  • seqOp:分区内的聚合函数(U, T) => U
  • combOp:分区间的合并函数(U, U) => U
// 同时计算总和与元素个数,从而得到平均值valrdd=sc.parallelize(List(1,2,3,4,5),2)val(sum,count)=rdd.aggregate((0,0))((acc,num)=>(acc._1+num,acc._2+1),// seqOp(a,b)=>(a._1+b._1,a._2+b._2)// combOp)valavg=sum.toDouble/count// 3.0

3.10 foreach(func)

对 RDD 每个元素执行函数,在 Executor 端执行,不返回值。常用于写入外部系统。

rdd.foreach(x=>println(x))// 输出在 Executor 端,Driver 不可见

3.11 foreachPartition(func)

分区为单位执行函数,每个分区调用一次。适合需要建立连接的场景(如数据库写入),避免每条数据都创建连接。

rdd.foreachPartition{iter=>valconn=createDBConnection()// 每个分区只建一次连接iter.foreach{record=>conn.write(record)}conn.close()}

最佳实践:凡是涉及外部资源(数据库、消息队列、缓存),优先用foreachPartition而非foreach,显著降低连接开销。


3.12 saveAsTextFile(path)

将 RDD 保存为文本文件,每个元素调用toString写为一行。分区数决定输出文件数。

rdd.saveAsTextFile("hdfs://namenode/output/result")// 输出: /output/result/part-00000, part-00001, ...

四、行动算子执行原理深入

4.1 宽依赖与 Stage 划分

rdd1 ──map──► rdd2 ──filter──► rdd3 ──reduceByKey──► rdd4 ──collect() ▲ Shuffle 边界 ◄── Stage 0 ──┤──── Stage 1 ───►
  • mapfilter是窄依赖,同属一个 Stage
  • reduceByKey触发 Shuffle,产生 Stage 边界
  • 调用collect()触发整个 Job 执行

4.2 Action 与 Job 的关系

valrdd=sc.textFile("data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)rdd.count()// Job 1rdd.collect()// Job 2,DAG 重新执行(除非 cache)

每次 Action 都是一个独立 Job。如果 RDD 会被多次使用,务必调用cache()persist()避免重复计算。


五、常见误区与最佳实践

❌ 误区 1:在循环中多次调用 Action

// 错误:每次循环都触发一个 Jobfor(i<-1to10){println(rdd.count())// 触发 10 个 Job!}// 正确:缓存后复用valcached=rdd.cache()valtotal=cached.count()

❌ 误区 2:用 collect() 处理大数据集

// 危险:数据全量拉到 Driverrdd.collect().foreach(process)// 安全:在 Executor 端处理rdd.foreach(process)// 或写出到存储rdd.saveAsTextFile(outputPath)

✅ 最佳实践总结

场景推荐算子
调试/验证少量数据take(n)first()
统计元素数量count()
全局聚合(同类型)reduce()
全局聚合(跨类型)aggregate()
写入外部存储foreachPartition()
落地到 HDFSsaveAsTextFile()
RDD 多次复用cache(),再 Action

六、总结

行动算子是 Spark 程序的"触发器",理解它的核心在于:

  1. 惰性求值:Transformation 只是构建 DAG,Action 才真正触发计算
  2. Job 粒度:每个 Action 对应一个 Job,合理减少 Action 调用次数
  3. 数据位置collect/take将数据拉回 Driver,foreach/save在 Executor 端处理
  4. 缓存策略:多次复用同一 RDD 时,配合cache()避免重算

掌握行动算子的选择与优化,是写出高性能 Spark 程序的基础。


如有问题欢迎在评论区交流,也欢迎关注后续 Spark 系列文章 🚀

http://www.jsqmd.com/news/960870/

相关文章:

  • PHP多维数组操作与聚合分析
  • 2026 西安价格实惠厕所天花板漏水处理公司 TOP4:厨卫漏水修缮甄选榜单 专业防水公司排名推荐(2026年5月防水补漏最新TOP权威排名) - 冠盾建筑修缮
  • 手把手教你用STM32CubeMX和HAL库驱动ILI9341屏幕(附Proteus仿真文件)
  • 南京如景装饰材料:高淳专业的玻璃隔断安装公司有哪些 - LYL仔仔
  • Chromatic:如何像外科手术一样精准修改Chromium/V8应用?
  • CSDN发布文章 markdown格式语法
  • 保姆级教程:在Windows 10/11上用JDK 8/11成功安装BurpSuite Community 2024(附浏览器代理配置避坑指南)
  • 保定 8 区县全套文案(全区统一固定标题:2026 上海防水补漏 + 瓷砖空鼓修复推荐,苏易修缮本土直营,老城老房漏水、瓷砖翘边拱起就近微创修) - 苏易修缮
  • 【RT-DETR实战】156、改进六:设计轻量级混合编码器(MobileViT思想)
  • 算法复杂度的统计特征与实验验证的技术8
  • 聊城本地黄金回收|正规店铺报价与上门服务全指南 - 余生黄金回收
  • Lakehouse重构数据基建:ACID事务与统一治理如何让数据湖真正可信可用
  • 2026郑州黄金回收榜首榜单收的顶龙头领跑,全国连锁高价回收行业标杆 - 奢侈品回收评测
  • 告别理论!用Proteus仿真直观理解PID算法:以51单片机温控为例
  • UNNPK终极指南:高效解压网易游戏NPK文件的完整教程
  • 横河DLM2054示波器远程控制全攻略:用Xwirepuller软件在电脑上‘隔空’操作示波器
  • 保姆级教程:威纶通MT8071ip触摸屏与正点原子STM32F103的Modbus接线实战(附避坑清单)
  • 别再只用它开空调了!深度挖掘涂鸦万能红外遥控器的DIY模式:手把手教你学习并控制家里所有红外设备
  • BBDown:基于.NET的哔哩哔哩视频下载器架构解析与技术实现
  • 2026最新诚信优选深圳全市黄金回收铂金彩金白银回收靠谱商家TOP实测排行榜及联系方式推荐 - 余生黄金回收
  • 从一块硅片到一颗芯片:保姆级图解12个关键制造步骤(附工艺名词对照)
  • 别只盯着准确率!用PyTorch玩转MNIST:可视化训练过程与手写数字预测的趣味实践
  • 读懂上海黄金回收行情2026 优质合规机构权威盘点 - 开心测评
  • 从“彩票假设”到智能体学习:深度网络剪枝的前沿玩法与未来猜想
  • 【工具推荐】手机上直接查看 CAN Log!iOS App「CANviewer」—— 汽车工程师的随身 CAN 分析工具
  • 基于 S7-1200 的隧道综合监控系统模块化 PLC 编程设计
  • 2026最新诚信优选长春市黄金回收白银回收铂金回收彩金回收高口碑靠谱门店TOP5权威排行榜+联系方式推荐 - 前途无量YY
  • 基于OpenCV调用OpenPose MobileNet的人体关键点检测工具(支持摄像头实时识别与图片分析)
  • 校园资源整合视角下大学生创业者的多元盈利模式探索
  • 常州市天宁区黄金回收指南:金价高企如何安全变现? - 黄金上门回收