当前位置: 首页 > news >正文

RDD API 学习

📊 RDD vs DataFrame 对比

特性RDDDataFrame
API 风格函数式(Scala/Java)声明式(SQL)
性能较慢更快(Catalyst 优化)
类型安全编译时运行时
内存管理手动(JVM)自动(Tungsten)
适用场景复杂 ETL/算法结构化查询

学习目标:理解 Spark 底层原理,处理复杂数据转换。

🔧 RDD 基础操作

1.1 从 DataFrame 转换为 RDD

scala

// 将 DataFrame 转为 RDD val rdd = df.rdd // 查看 RDD 类型 println(rdd.getClass) // 查看分区数 println(s"RDD 分区数: ${rdd.getNumPartitions}") // 查看第一个元素 rdd.first()

1.2 创建 RDD 的几种方式

scala

// 方式1:从集合创建 val dataRDD = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9,10)) dataRDD.collect().foreach(print)// 方式2:从文件创建 val textRDD = spark.sparkContext.textFile("/opt/data/user_behavior.csv") println(s"文件 RDD 行数: ${textRDD.count()}")

1.3 RDD 基本操作:Transformation

scala

// map:逐行转换 val userRDD = textRDD.map(line => { val arr = line.split(",") (arr(0).toLong, arr(3)) // (user_id, behavior_type) }) userRDD.take(10).foreach(println)// filter:过滤 val pvRDD = userRDD.filter(_._2 == "pv") println(s"PV 数量: ${pvRDD.count()}")// distinct:去重 val distinctUsers = userRDD.map(_._1).distinct() println(s"去重用户数: ${distinctUsers.count()}")

🔧 RDD 聚合操作

2.1 reduceByKey:按 Key 聚合

scala

// 统计每个用户的行为次数 val userBehaviorCount = userRDD.map(x => (x._1, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) println("行为次数 Top 10 用户:") userBehaviorCount.take(10).foreach(println)

2.2 groupByKey vs reduceByKey

scala

// groupByKey(不推荐,性能差) val grouped = userRDD.groupByKey() println("groupByKey 结果示例:") grouped.mapValues(_.size).take(5).foreach(println) // reduceByKey(推荐,预聚合) val reduced = userRDD.mapValues(_ => 1).reduceByKey(_ + _) println("reduceByKey 结果示例:") reduced.take(5).foreach(println)

1. groupByKey

  • 先把所有数据 shuffle 拉过去,再分组、再计算

  • 数据全部在网络传输

  • 中间不做任何合并

  • 容易OOM、卡死、倾斜

2. reduceByKey

  • 先在本地预聚合(Map 端聚合),再 shuffle

  • 网络传输数据量大大减少

  • 速度快、不占内存、不倾斜

2.3 aggregateByKey:自定义聚合

scala

// 统计每个用户的行为类型分布 val behaviorAgg = userRDD.map(x => (x._1, Seq[String](x._2))) .aggregateByKey(Seq.empty[String])( (seq, value) => seq ++ Seq(value), // seqOp: 分区内合并 (seq1, seq2) => seq1 ++ seq2 // combOp: 分区间合并 ) behaviorAgg.take(5).foreach(println)


🔧 RDD 高级操作

3.1 join:关联两个 RDD

scala

// 创建用户维度 RDD val userDimRDD = spark.sparkContext.parallelize(Seq( (3987L, "活跃用户"), (8527L, "新用户"), (124L, "高活用户"), (7450L, "付费用户") ))// 行为 RDD val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))// 执行 Join val joinedRDD = behaviorRDD.join(userDimRDD) joinedRDD.foreach(println)

// 遇到的报错

val joinedRDD = behaviorRDD.join(userDimRDD)
<console>:24: error: value join is not a member of Array[(Long, String)]
val joinedRDD = behaviorRDD.join(userDimRDD)

之前val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))

take操作使得behaviorRDD变成普通数组,普通数组没有join方法

更改

val behaviorRDD = spark.sparkContext.parallelize( userRDD.take(100) )

先take取数据再转成rdd

3.2 mapPartitions:分区级别操作

scala

// 每个分区内计算 val partitionCounts = rdd.mapPartitions(iter => { var count = 0 while (iter.hasNext) { count += 1 iter.next() } Iterator(count) }).collect() println(s"每个分区的数据量: ${partitionCounts.mkString(", ")}")

3.3 持久化 RDD

scala

// 缓存 RDD val cachedRDD = userRDD.cache() println(s"缓存后触发计算: ${cachedRDD.count()}") // 查看存储级别 println(cachedRDD.getStorageLevel) // 查看缓存状态 spark.sparkContext.getExecutorMemoryStatus.foreach(println)


📊 RDD 操作分类总结

类型操作说明
Transformationmap,filter,flatMap懒执行,返回新 RDD
聚合reduceByKey,groupByKey按 Key 聚合
排序sortBy,sortByKey全局排序
关联join,leftOuterJoin两个 RDD 关联
分区repartition,coalesce调整分区数
Actioncount,collect,take触发计算
http://www.jsqmd.com/news/755715/

相关文章:

  • RT-Thread 开发踩坑记:Cortex-M7 HardFault 现场如何完整“取证”?
  • 保姆级教程:在Ubuntu 22.04上,用rknn-toolkit2把PyTorch的ResNet18变成RK3588能跑的RKNN模型
  • 人类真理宣言—— 告别旧范式的守灵者,成为真理范式的开启者(Veritas Humana Manifesto)
  • Hugging Face模型加载超快
  • 世界模型如何提升LLM智能体决策能力
  • 2025年实时影响因子:中国期刊(26.5.3更新)
  • PromptBridge技术:实现跨大模型提示词无缝迁移
  • 手机号定位神器:一键查询陌生来电归属地,地图精准展示位置
  • 超导神经元原理与生物神经元模拟技术解析
  • 第1章 Nginx 简介与架构【20260503】-001篇
  • 怎样构建高效B站视频下载系统:DownKyi专业解决方案实战
  • 端到端GUI智能体UI-Venus-1.5:革新自动化测试与RPA
  • FastClaw:一键在Mac上创建预装OpenClaw的Linux虚拟机
  • EH-TEMPO算法:开放量子系统模拟的高效解决方案
  • Claude桌面应用效率增强:claude-hooks钩子机制详解与实战
  • Claude配置编辑器:可视化定制AI助手行为,提升工作效率
  • SPATIALGEN:智能3D场景生成框架解析与应用
  • 2026年4月有名的锁紧螺母生产厂家推荐,导轨压块/锁紧螺母/径向锁紧螺母/止退螺母/丝杠锁紧螺母,锁紧螺母公司推荐 - 品牌推荐师
  • 从‘三元悖论’到现实选择:用蒙代尔-弗莱明模型看懂央行政策困境(以近期热点为例)
  • dotclaude:基于Agent Skills标准的AI编码代理技能库实战指南
  • C++27范围库扩展开发倒计时:ISO正式FDIS投票仅剩117天,这份企业级迁移路线图已被12家头部嵌入式厂商内部采用
  • 第1章 Nginx 简介与架构【20260503】-002篇-Nginx日志切割
  • Copr命令行工具实战:从RPM打包到自动化构建发布
  • TSMaster实战:手把手教你将A2L标定变量和DBC信号录进同一个BLF文件
  • 开源三指机械爪OpenClaw-CN实践指南:从欠驱动原理到ROS控制
  • 探索Taotoken平台在应对突发性API流量激增时的路由表现
  • 从Program.cs到可维护微服务:C# 13顶级语句驱动的模块化分层架构,立即提升代码复用率47%
  • 避坑指南:SolidWorks模型导入MATLAB Simscape时,插件安装失败、连接错误的常见原因与解决方案
  • JavaSE-07
  • 实战应用:基于快马平台开发可数据交互的产区标准图分析系统