当前位置: 首页 > news >正文

Spark SQL详解(二):RDD转换DataFrame与Spark SQL读写数据库

摘要:本文深入讲解Spark SQL中RDD与DataFrame的互转机制,包括反射推断模式和编程式定义模式两种转换方式。同时系统讲解Spark SQL通过JDBC连接MySQL数据库的完整流程,涵盖依赖导入、数据读取、数据写入等实战操作,配合完整的Scala代码示例和常见错误排查。


一、RDD转换DataFrame

Spark提供了两种方法将RDD转换为DataFrame:利用反射机制推断RDD模式,以及使用编程方式定义RDD模式。两种方法各有适用场景,开发者可根据实际情况选择。

1.1 方法一:利用反射机制推断RDD模式

原理:通过定义case class,利用Spark的隐式转换机制,自动将RDD[CaseClass]转换为DataFrame。

核心特点

  • 简洁高效,代码量少
  • 自动推断字段名和类型
  • 必须使用case class(普通class不支持)
  • case class必须定义在main方法之外

完整代码实现:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,SparkSession}objectRDDToDFByReflection{// case class必须放到main方法之外,伴生对象下// 因为隐式转换时会通过 伴生对象名.case类名 来调用caseclassPerson(name:String,age:Long)defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("RDD-To-DF-Reflection").getOrCreate()// 导入隐式转换,这里的spark是SparkSession对象,不是org.apache.spark包importspark.implicits._// 1. 读取文本文件,解析为RDD[Person]valrdd:RDD[Person]=spark.sparkContext.textFile("data/sql/people.txt").map(line=>line.split(",")).map(t=>Person(t(0).trim,t(1).trim.toLong))// 2. 隐式转换:RDD[Person] -> DataFramevaldf:DataFrame=rdd.toDF()// 3. 注册临时视图,执行SQL查询df.createOrReplaceTempView("people")spark.sql("SELECT * FROM people WHERE age > 20").show()spark.stop()}}

输入数据(people.txt):

Tom, 21 Mike, 25 Andy, 18

运行结果:

+----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| +----+---+

关键注意事项:

注意点说明错误后果
case class位置必须放在main方法之外,伴生对象下编译报错,找不到case类
implicits导入import spark.implicits._中的spark是SparkSession对象导入错误将无法隐式转换
数据类型匹配case class字段类型需与数据匹配类型转换异常
空值处理数值类型建议用Long/DoubleInt可能溢出

数据流转图解:

文本文件: "Tom, 21" "Mike, 25" "Andy, 18" ↓ textFile + map + map RDD[Person]: Person("Tom", 21) Person("Mike", 25) Person("Andy", 18) ↓ toDF() (隐式转换) DataFrame: +----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| |Andy| 18| +----+---+

1.2 方法二:使用编程方式定义RDD模式

原理:通过StructType定义Schema(表头),通过Row定义每条记录,最后调用createDataFrame将两者拼接。

核心特点

  • 无需定义case class,更灵活
  • 适合动态Schema场景(字段不确定)
  • 代码稍繁琐,但不易出错
  • 运行时类型安全

完整代码实现:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}objectRDDToDFByProgramming{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("RDD-To-DF-Programming").getOrCreate()// Step 1: 制作表头 - 定义Schema结构valschema:StructType=StructType(Array(StructField("name",StringType,nullable=true),StructField("age",IntegerType,nullable=true)))// Step 2: 制作表中记录 - 读取文件生成RDD[Row]valrowRDD:RDD[Row]=spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr=>Row(attr(0).trim,attr(1).trim.toInt))// Step 3: 拼接表头和记录 - 创建DataFramevalpeopleDF:DataFrame=spark.createDataFrame(rowRDD,schema)// 注册临时视图并查询peopleDF.createOrReplaceTempView("people")spark.sql("SELECT * FROM people WHERE age > 20").show()spark.stop()}}

运行结果(同上):

+----+---+ |name|age| +----+---+ | Tom| 21| |Mike| 25| +----+---+

三个核心步骤详解:

步骤操作代码作用
1制作表头StructType(Array(StructField(...)))定义字段名、类型、可空性
2制作记录RDD[Row]将原始数据转换为Row对象
3拼接合并spark.createDataFrame(rowRDD, schema)将Schema和RowRDD合并为DataFrame

Row对象的创建方式:

// 方式1:按位置传入值(需与Schema顺序一致)valrow1=Row("Tom",21)valrow2=Row("Mike",25)// 方式2:通过索引访问值valname=row1.getString(0)// "Tom"valage=row1.getInt(1)// 21// 方式3:类型安全的获取(推荐)valname=row1.getAs[String](0)valage=row1.getAs[Int](1)

1.3 两种方法对比

特性反射推断模式编程式定义模式
代码量较多
灵活性低(需预定义case class)高(动态定义Schema)
类型安全编译时检查运行时检查
适用场景字段固定的结构化数据字段动态变化的数据
性能相同(底层都转为RDD)相同
错误排查相对困难相对容易
case class必须不需要

选择建议:

  • 字段固定、类型明确 → 反射推断模式(代码简洁)
  • 字段动态、Schema不确定 → 编程式定义模式(灵活可控)

二、Spark SQL读写MySQL数据库

Spark SQL通过JDBC连接器可以方便地读写关系型数据库,本节以MySQL为例进行讲解。

2.1 导入依赖

在项目的pom.xml中添加MySQL JDBC驱动依赖:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency>

版本注意事项:

MySQL版本JDBC驱动类URL格式
MySQL 5.xcom.mysql.jdbc.Driverjdbc:mysql://host:3306/db
MySQL 8.xcom.mysql.cj.jdbc.Driverjdbc:mysql://host:3306/db?serverTimezone=UTC

注意:MySQL 8.0必须使用com.mysql.cj.jdbc.Driver,使用旧驱动会报错。


2.2 读取MySQL数据

通过spark.read.format("jdbc")读取数据库表数据。

完整代码:

importorg.apache.spark.sql.{DataFrame,SparkSession}objectReadMySQL{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("Read-MySQL").getOrCreate()// 方式1:使用format("jdbc") + option链式配置valmysqlDF:DataFrame=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("user","root").option("password","your_password").load()// 方式2:使用jdbc()方法(更简洁)valmysqlDF2=spark.read.jdbc("jdbc:mysql://localhost:3306/spark","student",properties)mysqlDF.show()spark.stop()}}

JDBC常用配置选项:

选项必填说明示例
urlJDBC连接URLjdbc:mysql://localhost:3306/spark
driverJDBC驱动类名com.mysql.cj.jdbc.Driver
dbtable表名或SQL子查询student(SELECT * FROM student WHERE age>20) tmp
user数据库用户名root
password数据库密码123456
partitionColumn分区列(用于并行读取)id
lowerBound分区下界1
upperBound分区上界10000
numPartitions并行分区数4
fetchsize每次获取行数1000

运行结果:

+---+----+---+---+ | id|name|age|sex| +---+----+---+---+ | 1| Tom| 21| 男| | 2|Andy| 20| 女| +---+----+---+---+

并行读取优化:

// 通过分区列实现并行读取,提升大数据量读取性能valdf=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","student").option("user","root").option("password","123456").option("partitionColumn","id")// 按id列分区.option("lowerBound","1")// 最小id.option("upperBound","1000")// 最大id.option("numPartitions","4")// 分4个分区并行读取.load()

2.3 向MySQL写入数据

通过df.write.mode().jdbc()将DataFrame数据写入数据库。

完整代码:

importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame,Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}importjava.util.PropertiesobjectWriteMySQL{defmain(args:Array[String]):Unit={valspark=SparkSession.builder().master("local[*]").appName("Write-MySQL").getOrCreate()// Step 1: 准备要写入的数据(从RDD创建)valrdd:RDD[Array[String]]=spark.sparkContext.parallelize(Array("3 Mike 22 男","4 Cindy 23 女")).map(_.split(" "))// Step 2: 定义Schema(表头)valschema:StructType=StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true),StructField("sex",StringType,true)))// Step 3: 创建Row RDD(记录)valrowRDD:RDD[Row]=rdd.map(stu=>Row(stu(0).toInt,stu(1),stu(2).toInt,stu(3)))// Step 4: 创建DataFramevaldf:DataFrame=spark.createDataFrame(rowRDD,schema)// Step 5: 配置JDBC连接参数valprop=newProperties()prop.put("user","root")prop.put("password","your_password")prop.put("driver","com.mysql.cj.jdbc.Driver")// Step 6: 写入数据(append模式追加)df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)// 验证写入结果valresult=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","spark.student").option("user","root").option("password","your_password").load()result.show()spark.stop()}}

写入模式说明:

模式说明适用场景
append追加数据到已有表增量写入
overwrite先删除表数据再写入全量覆盖
ignore表存在则忽略,不写入避免重复写入
errorIfExists表存在则报错(默认)防止误操作

写入前数据库表结构:

CREATETABLEspark.student(idINTPRIMARYKEY,nameVARCHAR(50),ageINT,sexVARCHAR(10));

写入后数据:

+---+-----+---+---+ | id| name|age|sex| +---+-----+---+---+ | 1| Tom| 21| 男| | 2| Andy| 20| 女| | 3| Mike| 22| 男| | 4|Cindy| 23| 女| +---+-----+---+---+

2.4 读写数据库完整流程图

读取流程: MySQL数据库 ↓ JDBC连接 (url, driver, user, password) ↓ spark.read.format("jdbc").option(...).load() ↓ DataFrame ↓ 数据处理/分析 写入流程: 原始数据 (RDD/集合/文件) ↓ 定义Schema + 创建Row RDD ↓ spark.createDataFrame(rowRDD, schema) ↓ DataFrame ↓ df.write.mode("append").jdbc(url, table, properties) ↓ MySQL数据库

三、常见问题排查

3.1 ClassNotFoundException: com.mysql.cj.jdbc.Driver

原因:缺少MySQL JDBC驱动依赖,或驱动类名错误。

解决

  1. 确认pom.xml中已添加mysql-connector-java依赖
  2. 确认MySQL 8.x使用com.mysql.cj.jdbc.Driver,5.x使用com.mysql.jdbc.Driver
  3. 提交集群任务时,使用--jars参数携带驱动jar包
spark-submit--jarsmysql-connector-java-8.0.31.jar your_app.jar

3.2 时区错误:The server time zone value ‘xxx’ is unrecognized

原因:MySQL 8.0默认时区与JDBC驱动不匹配。

解决:在URL中添加时区参数

.option("url","jdbc:mysql://localhost:3306/spark?serverTimezone=UTC")

3.3 写入时表不存在

原因:目标表未提前创建。

解决

  • 方式1:提前在MySQL中创建表
  • 方式2:使用df.write.mode("overwrite").option("createTableOptions", "...").jdbc(...)自动创建

3.4 数据类型不匹配

原因:DataFrame字段类型与数据库表字段类型不兼容。

解决

  • 检查Schema定义与数据库表结构是否一致
  • 注意Spark的IntegerType对应MySQL的INTLongType对应BIGINT
  • 字符串长度不足时,调整MySQL字段的VARCHAR长度

四、总结

本文系统讲解了RDD与DataFrame的转换以及Spark SQL的数据库操作:

核心知识点

  1. RDD转DataFrame两种方法

    • 反射推断模式:定义case class +import spark.implicits._+rdd.toDF()
    • 编程式定义模式StructType定义Schema +RDD[Row]创建记录 +createDataFrame()拼接
  2. Spark SQL读取MySQL

    • 导入mysql-connector-java依赖
    • 使用spark.read.format("jdbc").option(...).load()
    • 关键参数:url、driver、dbtable、user、password
  3. Spark SQL写入MySQL

    • 准备数据为DataFrame格式
    • 使用df.write.mode("append").jdbc(url, table, properties)
    • 支持append/overwrite/ignore/errorIfExists四种模式

方法选择指南

场景推荐方法原因
字段固定、类型明确反射推断模式代码简洁,自动推断
字段动态、Schema不确定编程式定义模式灵活可控,运行时安全
读取数据库全表format(“jdbc”)标准JDBC方式
大数据量读取JDBC + 分区参数并行读取,提升性能
增量写入数据库write.mode(“append”)不破坏已有数据
全量覆盖写入write.mode(“overwrite”)替换旧数据
http://www.jsqmd.com/news/943909/

相关文章:

  • 如何高效批量下载抖音直播回放:开源工具终极指南
  • Windows 11终极优化指南:用Win11Debloat一键提升51%系统性能,轻松解决系统卡顿问题
  • 2026工业农村医院印染废水一体化污水处理设备厂家盘点 - 栗子测评
  • 2026年全球ODM电脑代工企业综合实力排行盘点 - 奔跑123
  • Windows 11 WLAN图标消失别慌!手把手教你用设备管理器‘回滚’驱动(保姆级图文)
  • 【AI风控融合实战指南】:20年专家亲授3大落地陷阱与5步集成方法论
  • lazarus鸿蒙开发5:编译ohos_hap_project
  • 从限速困境到下载自由:一个开源工具如何改变你的文件传输体验
  • 【工业级AI仓储整合白皮书】:基于127家客户数据,提炼9个不可绕过的数据治理关卡
  • 苹果香蕉梨葡萄四类水果新鲜度分级图像数据集(3-4级标注,含train/val划分)
  • 外贸网站建站哪家专业?靠谱建站公司筛选指南 - 麦麦唛
  • 2026年主流ODM电脑代工公司综合实力排行 - 奔跑123
  • 伊犁草原游旅行社盘点 聚焦资源与服务适配性 - 互联网科技品牌测评
  • OpCore-Simplify终极指南:30分钟完成专业级OpenCore EFI配置
  • Switch手柄电脑使用终极指南:BetterJoy让你轻松搞定Windows/macOS适配
  • 全自动脚本自动收藏成功--------我发现以前有人给我刷过短视频点赞收藏转发
  • 解放华硕笔记本性能潜力:轻量级控制工具G-Helper革新硬件管理体验
  • 基于Arduino与LM35的温度响应装置:从传感器到步进电机的创客实践
  • 革命性智能桌面自动化控制:UI-TARS桌面应用终极指南
  • 如何用Gazebo Sim解决机器人开发难题:从零到精通的实战指南
  • 2026全球MiniPC代工公司实力排行及选型指南 - 奔跑123
  • 长标题:威海全屋定制哪家好?2026威海本地靠谱装修/房屋装饰优秀厂家盘点推荐 - 栗子测评
  • 2026年 广东/湖南大功率柴油发电机厂家推荐:发电机组优质品牌与稳定动力口碑之选 - 品牌企业推荐师(官方)
  • 保姆级教程:在CentOS 7上从零部署Openfire 4.5.2即时通讯服务器(含MySQL 8.0配置)
  • 基于Arduino的智能温控风扇:从传感器到PWM控制的完整实践
  • 2026厦门工商注册公司推荐厦门注册记账报税哪家好厦门办营业执照公司甄选 - 栗子测评
  • 2026谁家薄膜生产在线质控薄膜试验仪精度高?主流品牌实测对比推荐 - 品牌推荐大师1
  • 抖音无水印下载神器:3分钟搞定批量视频保存与智能管理
  • AI工具如何真正听懂用户?揭秘智能反馈整合的7层信号处理链路与实时校准公式
  • 基于555定时器的PWM直流电机调速电路设计与实践