RDD API 学习
📊 RDD vs DataFrame 对比
| 特性 | RDD | DataFrame |
|---|---|---|
| API 风格 | 函数式(Scala/Java) | 声明式(SQL) |
| 性能 | 较慢 | 更快(Catalyst 优化) |
| 类型安全 | 编译时 | 运行时 |
| 内存管理 | 手动(JVM) | 自动(Tungsten) |
| 适用场景 | 复杂 ETL/算法 | 结构化查询 |
学习目标:理解 Spark 底层原理,处理复杂数据转换。
🔧 RDD 基础操作
1.1 从 DataFrame 转换为 RDD
scala
// 将 DataFrame 转为 RDD val rdd = df.rdd // 查看 RDD 类型 println(rdd.getClass) // 查看分区数 println(s"RDD 分区数: ${rdd.getNumPartitions}") // 查看第一个元素 rdd.first()1.2 创建 RDD 的几种方式
scala
// 方式1:从集合创建 val dataRDD = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9,10)) dataRDD.collect().foreach(print)// 方式2:从文件创建 val textRDD = spark.sparkContext.textFile("/opt/data/user_behavior.csv") println(s"文件 RDD 行数: ${textRDD.count()}")
1.3 RDD 基本操作:Transformation
scala
// map:逐行转换 val userRDD = textRDD.map(line => { val arr = line.split(",") (arr(0).toLong, arr(3)) // (user_id, behavior_type) }) userRDD.take(10).foreach(println)
// filter:过滤 val pvRDD = userRDD.filter(_._2 == "pv") println(s"PV 数量: ${pvRDD.count()}")
// distinct:去重 val distinctUsers = userRDD.map(_._1).distinct() println(s"去重用户数: ${distinctUsers.count()}")![]()
🔧 RDD 聚合操作
2.1 reduceByKey:按 Key 聚合
scala
// 统计每个用户的行为次数 val userBehaviorCount = userRDD.map(x => (x._1, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) println("行为次数 Top 10 用户:") userBehaviorCount.take(10).foreach(println)2.2 groupByKey vs reduceByKey
scala
// groupByKey(不推荐,性能差) val grouped = userRDD.groupByKey() println("groupByKey 结果示例:") grouped.mapValues(_.size).take(5).foreach(println) // reduceByKey(推荐,预聚合) val reduced = userRDD.mapValues(_ => 1).reduceByKey(_ + _) println("reduceByKey 结果示例:") reduced.take(5).foreach(println)1. groupByKey
先把所有数据 shuffle 拉过去,再分组、再计算
数据全部在网络传输
中间不做任何合并
容易OOM、卡死、倾斜
2. reduceByKey
先在本地预聚合(Map 端聚合),再 shuffle
网络传输数据量大大减少
速度快、不占内存、不倾斜
2.3 aggregateByKey:自定义聚合
scala
// 统计每个用户的行为类型分布 val behaviorAgg = userRDD.map(x => (x._1, Seq[String](x._2))) .aggregateByKey(Seq.empty[String])( (seq, value) => seq ++ Seq(value), // seqOp: 分区内合并 (seq1, seq2) => seq1 ++ seq2 // combOp: 分区间合并 ) behaviorAgg.take(5).foreach(println)
🔧 RDD 高级操作
3.1 join:关联两个 RDD
scala
// 创建用户维度 RDD val userDimRDD = spark.sparkContext.parallelize(Seq( (3987L, "活跃用户"), (8527L, "新用户"), (124L, "高活用户"), (7450L, "付费用户") ))// 行为 RDD val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))
// 执行 Join val joinedRDD = behaviorRDD.join(userDimRDD) joinedRDD.foreach(println)
// 遇到的报错
val joinedRDD = behaviorRDD.join(userDimRDD)
<console>:24: error: value join is not a member of Array[(Long, String)]
val joinedRDD = behaviorRDD.join(userDimRDD)
之前val behaviorRDD = userRDD.take(100).map(x => (x._1, x._2))
take操作使得behaviorRDD变成普通数组,普通数组没有join方法
更改
val behaviorRDD = spark.sparkContext.parallelize( userRDD.take(100) )
先take取数据再转成rdd
3.2 mapPartitions:分区级别操作
scala
// 每个分区内计算 val partitionCounts = rdd.mapPartitions(iter => { var count = 0 while (iter.hasNext) { count += 1 iter.next() } Iterator(count) }).collect() println(s"每个分区的数据量: ${partitionCounts.mkString(", ")}")3.3 持久化 RDD
scala
// 缓存 RDD val cachedRDD = userRDD.cache() println(s"缓存后触发计算: ${cachedRDD.count()}") // 查看存储级别 println(cachedRDD.getStorageLevel) // 查看缓存状态 spark.sparkContext.getExecutorMemoryStatus.foreach(println)📊 RDD 操作分类总结
| 类型 | 操作 | 说明 |
|---|---|---|
| Transformation | map,filter,flatMap | 懒执行,返回新 RDD |
| 聚合 | reduceByKey,groupByKey | 按 Key 聚合 |
| 排序 | sortBy,sortByKey | 全局排序 |
| 关联 | join,leftOuterJoin | 两个 RDD 关联 |
| 分区 | repartition,coalesce | 调整分区数 |
| Action | count,collect,take | 触发计算 |
