当前位置: 首页 > news >正文

大数据Spark(八十):Action行动算子fold和aggregate使用案例

文章目录

Action行动算子fold和aggregate使用案例

一、fold使用案例

二、aggregate使用案例


Action行动算子fold和aggregate使用案例

一、fold使用案例

fold用于对RDD中的元素进行聚合操作,最终返回一个结果。类似reduce算子,但与reduce不同的是其可以对每个分区中的数据提供一个初始值,让分区中的数据与该初始值进行聚合,最终该初始值还会与各个分区的结果再次聚合。

fold的函数签名如下:

def fold(zeroValue: T)(op: (T, T) => T): T
  • zeroValue:聚合操作的初始值,类型为 T。
  • op:用于合并元素的二元操作函数。

fold的工作原理:在每个分区内,fold 使用初始值 zeroValue 和二元操作函数 op,将该分区内的所有元素进行聚合。在所有分区内的聚合完成后,fold 将各分区的结果与初始值 zeroValue 一起,使用相同的二元操作函数 op 进行全局聚合,得到最终结果。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("FoldTest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","b","c","d","e","f"), 3); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception { ArrayList<String> list = new ArrayList<>(); while (iter.hasNext()) { String next = iter.next(); list.add("rdd partition index: " + index + " current value: " + next); } return list.iterator(); } },true).foreach(x-> System.out.println(x)); /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello~hello~a~b~hello~c~d~hello~e~f */ String str = rdd.fold("hello", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + "~" + v2; } }); System.out.println(str); sc.stop();

Scala代码:

val conf = new SparkConf().setMaster("local").setAppName("FoldTest") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3) rdd.mapPartitionsWithIndex((index, iter) => { val list = new ListBuffer[String]() while (iter.hasNext) { list.append(s"rdd partition index: $index ,current value: ${iter.next()}") } list.iterator }).foreach(println) /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * map端聚合: * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello~hello~a~b~hello~c~d~hello~e~f */ val result: String = rdd.fold("hello")((v1, v2) => { v1 + "~" + v2 }) println(result) sc.stop()

二、aggregate使用案例

aggregate用于对RDD中的元素进行聚合操作,最终返回一个结果。与 fold 和 reduce 等算子不同,aggregate 允许用户分别定义分区内和分区间的聚合函数,提供了更大的灵活性。

aggregate函数签名如下:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  • zeroValue:聚合操作的初始值,类型为 U。
  • seqOp:分区内的聚合函数,用于将分区内的元素与累加器进行合并,类型为 (U, T) => U。
  • combOp:分区间的聚合函数,用于将不同分区的累加器结果进行合并,类型为 (U, U) => U

aggregate工作原理:在每个分区内,使用初始值 zeroValue 和函数 seqOp,将该分区内的所有元素进行聚合。在所有分区内的聚合完成后,使用初始值 zeroValue 和函数 combOp,将各分区的结果进行全局聚合,得到最终结果。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("AggregateTest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","b","c","d","e","f"), 3); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception { ArrayList<String> list = new ArrayList<>(); while (iter.hasNext()) { String next = iter.next(); list.add("rdd partition index: " + index + " current value: " + next); } return list.iterator(); } },true).foreach(x-> System.out.println(x)); /** * 0号分区:a b * 1号分区:c d * 2号分区:e f * * map端聚合: * 0号分区:hello~a~b * 1号分区:hello~c~d * 2号分区:hello~e~f * * 最终结果:hello@hello~a~b@hello~c~d@hello~e~f */ String result = rdd.aggregate("hello", new Function2<String, String, String>() { @Override public String call(String s1, String s2) throws Exception { return s1 + "~" + s2; } }, new Function2<String, String, String>() { @Override public String call(String s1, String s2) throws Exception { return s1 + "@" + s2; } }); System.out.println(result); sc.stop();

Scala代码:

val conf = new SparkConf().setMaster("local").setAppName("FoldTest") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3) rdd.mapPartitionsWithIndex((index, iter) => { val list = new ListBuffer[String]() while (iter.hasNext) { list.append(s"rdd partition index: $index ,current value: ${iter.next()}") } list.iterator }).foreach(println) val result: String = rdd.aggregate("hello")( (v1, v2) => { v1 + "~" + v2 }, (v1, v2) => { v1 + "@" + v2 } ) println(result) sc.stop()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
http://www.jsqmd.com/news/356868/

相关文章:

  • 连云港成宏口腔牙齿矫正真实反馈|3位牙友亲测,30年老牌机构,圆我自信笑容梦 - 品牌鉴赏师
  • 2026年知名的航天钎焊炉/飞机发动机钎焊炉哪家强公司实力参考(精选) - 品牌宣传支持者
  • 2026年靠谱的AI液冷板钎焊炉哪家强生产厂家实力参考 - 品牌宣传支持者
  • 2026西安楼市品质之选:三大专业开发商深度解析 - 2026年企业推荐榜
  • 2026年陕西基本农田调整技术服务机构综合实力TOP5 - 2026年企业推荐榜
  • 2026年靠谱的立体桁架真实参考销售厂家参考怎么选 - 品牌宣传支持者
  • 2026年比较好的认证机构本地口碑推荐 - 品牌宣传支持者
  • 网络复习篇——网络基础(一)
  • 2026年一站式认证辅导本地口碑推荐 - 品牌宣传支持者
  • 2026年初重庆专业极压齿轮油品牌评测与选型指南 - 2026年企业推荐榜
  • 四川家装电线市场价格趋势与优质供应商综合评估 - 2026年企业推荐榜
  • 2026铝单板门头选购指南:数据驱动的优质厂家选择标准 - 2026年企业推荐榜
  • 危化仓储高风险作业点人员到位与作业半径空间校验场景
  • 郑州合成高温润滑油可靠厂家盘点与采购指南 - 2026年企业推荐榜
  • 2026年安徽杀菌剂直销工厂综合评估与选购指南 - 2026年企业推荐榜
  • 危化品库区异常停留、违规进入行为的三维空间识别场景
  • 危险化工仓储区空间结构透视与人员作业行为协同管控场景
  • 2026年国内智能母线槽优质生产商联系与选型指南 - 2026年企业推荐榜
  • 危化品作业区域人员—设备安全距离的三维空间监测场景
  • 军储库区异常进入、违规停留行为的三维空间识别与追溯场景
  • 军储仓库作业区人车交叉冲突的空间级预警与态势研判场景
  • 2026年专业的认证品牌综合实力推荐 - 品牌宣传支持者
  • 2026年河北市场优质吉林白石材厂家综合评测 - 2026年企业推荐榜
  • 2026年持续改善合规达标工厂布局/工业园区工厂布局实际效果推荐企业 - 品牌宣传支持者
  • 2026年抗皱紧致护肤品推荐,国货新锐品牌抗皱实力实测解读 - 品牌鉴赏师
  • 2026年初,湖北顶尖循环水药剂厂家专业度深度解析 - 2026年企业推荐榜
  • 2026年小麦除草剂实力厂家综合评测与选购指南 - 2026年企业推荐榜
  • 2026年抗皱紧致护肤品单品推荐,专业测评与高效抗皱选购指南 - 品牌鉴赏师
  • 2026年ai人工智能搜索公司最新推荐,AI核心算法检索实力与创新能力综合解析 - 品牌鉴赏师
  • 2026年初天津工程采购必看:高性价比路边石批发商深度评测 - 2026年企业推荐榜