当前位置: 首页 > news >正文

sparksql读取mysql表处理etl数据加工过程在把结果反插入库

package main.scala.Mimport org.apache.spark.sql.{DataFrame, SparkSession}object amJobTask {def main(args: Array[String]): Unit = {// 1. 创建SparkSession并启用Hive支持val spark: SparkSession = SparkSession.builder().appName("job_name_id").master("local[10]")// .config("hive.metastore.uris", "thrift://hadoop01:9083") // 在 window的hosts配置IP映射:192.168.1.1 master//.config("spark.sql.warehouse.dir", "hdfs://master:9000/opt/hiveDataFile")  //配置远程hdfs数据文件存储路径.enableHiveSupport().getOrCreate()//    val df =  spark.sql("select * from test_db.room2")//    df.printSchema()//    df.show()// 2. 配置MySQL连接参数val jdbcUrl = "jdbc:mysql://localhost:3306/hivedata?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8"val mysqlProps = new java.util.Properties()mysqlProps.setProperty("user", "root")mysqlProps.setProperty("password", "123456")//mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")mysqlProps.setProperty("driver","com.mysql.cj.jdbc.Driver")// 3. 从MySQL读取数据
//    val querSql:String = "(SELECT r.Gender,count(*) as cnt from room1 r group by r.Gender) as tmp"
//    val mysqlDF: DataFrame = spark.read.jdbc(jdbcUrl, querSql, mysqlProps)
//    mysqlDF.show()// 4。注册表在查询val tab_name: List[String] = List("room1","room2")// 遍历创建视图for (table <- tab_name) {val mysqlDF: DataFrame = spark.read.jdbc(jdbcUrl, table, mysqlProps)mysqlDF.createOrReplaceTempView(table)}//    val mysqlDF: DataFrame = spark.read.jdbc(jdbcUrl, "room1", mysqlProps)
//    mysqlDF.createOrReplaceTempView("room1")//    val rls: DataFrame = spark.sql("SELECT r.Gender,count(*) as cnt from room1 r " +
//      "   where length(r.Gender)>0   and r.address like  '%上海%' group by r.Gender")
//    rls.show()val rls1: DataFrame = spark.sql("SELECT r.Gender,count(*) as cnt from room1 r group by r.Gender \nunion all\nSELECT r.Gender,count(*) as cnt from room2 r group by r.Gender ")rls1.show(100)// 获取所有临时视图(会话级别)val tempViews = spark.catalog.listTables().filter(_.isTemporary).select("name").collect().map(_.getString(0))tempViews.foreach(println)// 获取全局临时视图
//    val globalTempViews = spark.catalog.listTables("global_temp")
//      .select("name")
//      .collect()
//      .map(_.getString(0))
//
//    println("Global temporary views:")
//    globalTempViews.foreach(println)spark.sql("select COUNT(1) CNT from (\nselect *  from room1 where 1>0\nunion all\nselect *  from room2 where 20>3\nunion all\nselect *  from room1 where 10>5\nunion all\nselect *  from room2 where 223>5\nunion all\nselect *  from room1 where 12>9\nunion all\nselect *  from room2 where 4>1.5\nunion all\nselect *  from room1 where 9>8\nunion all\nselect *  from room2 where 7>6\nunion all\nselect *  from room1 where 4>2\nunion all\nselect *  from room2 where 7>6\n)").show()//    spark.sql("select count(*) as cnt from room1\nunion all\nselect count(*) as cnt from room2").show()val datacnt:DataFrame = spark.sql("select T.Gender,COUNT(1) as CNT from (\nselect *  from room1\nunion all\nselect *  from room2\nunion all\nselect *  from room1\nunion all\nselect *  from room2\nunion all\nselect *  from room1\nunion all\nselect *  from room2\nunion all\nselect *  from room1\nunion all\nselect *  from room2\n) t where t.Gender in('M','F')\n\tand SUBSTR(T.Version,1,10)>= '1910-01-01'\ngroup by T.Gender")datacnt.show()datacnt.write.mode("append").jdbc(jdbcUrl,"room_cnt",mysqlProps)spark.stop()}}

  

http://www.jsqmd.com/news/792200/

相关文章:

  • 跨境电商物流解决方案-恒盛通国际快递服务 - 恒盛通物流
  • day05补发补充
  • 2026 年豆包开启付费订阅,中国 AI 大模型商业化迎来大考!
  • 时序数据库详解
  • 软工5月10号
  • Display Driver Uninstaller (DDU):彻底清理显卡驱动的终极解决方案
  • STM32 SDIO+PCM5102成功播放《义妹》
  • day04补发
  • 深入了解Python并发编程
  • 如何通过Noto Emoji实现跨平台表情符号统一:技术原理与应用实践
  • Qt/C++实战:手把手教你用QCustomPlot实现动态刷新热力图(模拟实时数据)
  • MySQL高级特性:索引优化详解
  • 2026年4月优质的初中效袋式过滤器批发厂家推荐,防潮设计适应潮湿环境 - 品牌推荐师
  • Redis数据结构与性能优化详解
  • 使用本地浏览器打开远程服务器生成的网页——详细教程
  • 打破语言壁垒:Translumo屏幕实时翻译工具的终极使用指南
  • 2026 年 Q1 全球互联网中断报告:断网、停电与战争
  • 20253221 2025-2026-2 《Python程序设计》实验3报告
  • Python函数中的全局变量详解
  • 量子计算机来了,你的企业网络隧道还安全吗?
  • PostgreSQL高级特性详解
  • Redis学习8 Redis数据结构(1)
  • 基于Vue.js与AI对话的智能思维导图生成器开发实践
  • 终极解决方案:如何快速批量转换GBK到UTF-8编码文件
  • 一次例行密钥轮换,让数百万德国域名集体蒸发
  • 独立开发者工具箱:2026年全栈与AI应用高效开发技术栈指南
  • MongoDB聚合与查询优化详解
  • 如何在 Docker 容器中部署企业微信机器人服务保证高可用
  • 31_AI短片实战第四弹:主观视角空间控制与分屏快速剪辑的AI生成策略(附提示词)
  • 高管求职渠道公司实测:4家机构核心能力对比评测 - 得赢