当前位置: 首页 > news >正文

实验7 Spark初级编程实践(Scala版)

(1)Spark读取文件系统的数据

编写独立应用程序(推荐使用Scala语言),读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过sbt工具将整个应用程序编译打包成 JAR包,并将生成的JAR包通过 spark-submit 提交到 Spark 中运行命令。

(2)编写独立应用程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序(推荐使用Scala语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

输入文件B的样例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根据输入的文件A和B合并得到的输出文件C的样例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

(3)编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

小明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

(小红,83.67)

(小新,88.33)

(小明,89.67)

(小丽,88.67)

具体代码:

导入依赖:

build.sbt
name := "spark1"
version := "1.0-SNAPSHOT"
scalaVersion := "2.12.17"// Spark依赖 - 使用provided,因为运行时Spark环境会提供
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.0" % "provided"

步骤一:

LineCount.scala
package com.exampleimport org.apache.spark.sql.SparkSessionobject LineCount {def main(args: Array[String]): Unit = {// 检查参数if (args.length < 1) {println("用法: LineCount <hdfs文件路径>")println("示例: LineCount hdfs://node1:8020/user/hadoop/test.txt")sys.exit(1)}val filePath = args(0)// 创建SparkSessionval spark = SparkSession.builder().appName("LineCount Application").getOrCreate()val sc = spark.sparkContexttry {println(s"正在读取文件: $filePath")// 读取HDFS文件val lines = sc.textFile(filePath)// 统计行数val lineCount = lines.count()// 输出结果println("=" * 50)println(s"文件路径: $filePath")println(s"文件总行数: $lineCount")println("=" * 50)// 可选:显示前几行内容println("文件前5行内容:")lines.take(5).foreach(println)println("=" * 50)} catch {case e: Exception =>println(s"错误: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()println("SparkSession已关闭")}}
}

步骤二:

DeduplicateApp.scala
package com.example
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDDobject DeduplicateApp {def main(args: Array[String]): Unit = {// 检查参数数量if (args.length < 3) {println("用法: DeduplicateApp <输入文件A路径> <输入文件B路径> <输出文件路径>")println("示例: DeduplicateApp hdfs://node1:8020/spark/A.txt hdfs://node1:8020/spark/B.txt hdfs://node1:8020/spark/C")sys.exit(1)}val inputPathA = args(0)val inputPathB = args(1)val outputPath = args(2)// 创建SparkSessionval spark = SparkSession.builder().appName("Data Deduplication Application").getOrCreate()val sc = spark.sparkContexttry {println("=" * 60)println(s"输入文件A: $inputPathA")println(s"输入文件B: $inputPathB")println(s"输出文件: $outputPath")println("=" * 60)// 读取文件A和文件Bval rddA: RDD[String] = sc.textFile(inputPathA)val rddB: RDD[String] = sc.textFile(inputPathB)// 显示原始文件内容println("\n原始文件A内容:")rddA.take(10).foreach(println)println("\n原始文件B内容:")rddB.take(10).foreach(println)// 关键修改:规范化数据格式 - 将多个空格/制表符替换为单个制表符def normalizeLine(line: String): String = {// 去除首尾空格,然后将一个或多个空白字符(空格、制表符)替换为单个制表符line.trim.replaceAll("\\s+", "\t")}// 规范化两个RDDval normalizedRddA: RDD[String] = rddA.map(normalizeLine).filter(_.nonEmpty)val normalizedRddB: RDD[String] = rddB.map(normalizeLine).filter(_.nonEmpty)// 显示规范化后的文件内容println("\n规范化后文件A内容:")normalizedRddA.take(10).foreach(println)println("\n规范化后文件B内容:")normalizedRddB.take(10).foreach(println)// 合并两个RDD并去重val combinedRDD: RDD[String] = normalizedRddA.union(normalizedRddB)val distinctRDD: RDD[String] = combinedRDD.distinct()// 按日期和值排序val sortedRDD: RDD[String] = distinctRDD.map(line => {val parts = line.split("\t")if (parts.length >= 2) {(parts(0), parts(1), line) // (日期, 值, 完整行)} else {(line, "", line) // 如果分割后只有一部分,则作为日期,值为空}}).sortBy(tuple => (tuple._1, tuple._2)).map(tuple => tuple._3)// 统计去重前后的数据量val countA = rddA.count()val countB = rddB.count()val countCombined = combinedRDD.count()val countDistinct = distinctRDD.count()println("\n" + "=" * 60)println("统计信息:")println(s"原始文件A行数: $countA")println(s"原始文件B行数: $countB")println(s"合并后总行数: $countCombined")println(s"去重后行数: $countDistinct")println(s"去重掉的行数: ${countCombined - countDistinct}")println("=" * 60)// 显示去重后的前几行数据println("\n去重后数据(前20行):")sortedRDD.take(20).foreach(println)// 保存结果到HDFS(使用制表符分隔,确保格式一致)println(s"\n正在保存结果到: $outputPath")// 先删除已存在的输出目录,避免错误try {val hadoopConf = sc.hadoopConfigurationval fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)val output = new org.apache.hadoop.fs.Path(outputPath)if (fs.exists(output)) {fs.delete(output, true)println(s"已删除已存在的输出目录: $outputPath")}} catch {case e: Exception => println(s"清理输出目录时警告: ${e.getMessage}")}sortedRDD.saveAsTextFile(outputPath)// 验证保存的文件println(s"\n已保存到HDFS,可以通过以下命令查看结果:")println(s"hdfs dfs -cat $outputPath/part-* | head -20")} catch {case e: Exception =>println(s"错误: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()println("\nSparkSession已关闭")}}
}

步骤三:

AverageScoreApp.scala
package com.exampleimport org.apache.spark.sql.SparkSessionobject AverageScoreApp {def main(args: Array[String]): Unit = {if (args.length < 2) {println("用法: AverageScoreApp <输入文件目录> <输出文件路径>")println("示例: AverageScoreApp hdfs://node1:8020/spark/ hdfs://node1:8020/spark/average_scores")sys.exit(1)}val inputDir = args(0)val outputPath = args(1)val spark = SparkSession.builder().appName("Student Average Score Calculator").getOrCreate()val sc = spark.sparkContexttry {println("=" * 60)println("学生平均成绩计算应用")println("=" * 60)// 读取目录下所有文件println(s"读取目录: $inputDir")val allFilesRDD = sc.textFile(s"$inputDir/*")// 显示部分原始数据println("\n原始数据示例:")allFilesRDD.take(10).foreach(println)// 解析数据并转换为 (姓名, (总分, 科目数))val studentScorePairs = allFilesRDD.map(_.trim).filter(_.nonEmpty).flatMap { line =>val parts = line.split("\\s+")if (parts.length >= 2) {try {val name = parts(0)val score = parts(1).toDoubleSome((name, (score, 1)))  // (姓名, (成绩, 1))} catch {case _: NumberFormatException => None}} else {None}}// 使用reduceByKey合并每个学生的成绩val studentTotalAndCount = studentScorePairs.reduceByKey { (a, b) =>(a._1 + b._1, a._2 + b._2)  // 累加总分和科目数}// 计算平均成绩val averageScores = studentTotalAndCount.map { case (name, (total, count)) =>val average = total / count(name, BigDecimal(average).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble)}// 按平均成绩降序排序,成绩相同按姓名升序val sortedResults = averageScores.sortBy { case (name, avg) => (-avg, name) }// 转换为输出格式val formattedResults = sortedResults.map { case (name, avg) =>f"($name,$avg%.2f)"}// 显示结果println("\n" + "=" * 60)println("平均成绩如下:")formattedResults.collect().foreach(println)// 保存结果println(s"\n正在保存结果到: $outputPath")// 清理输出目录try {val fs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration)val path = new org.apache.hadoop.fs.Path(outputPath)if (fs.exists(path)) fs.delete(path, true)} catch {case e: Exception => // 忽略}formattedResults.coalesce(1).saveAsTextFile(outputPath)println("\n作业完成!")} catch {case e: Exception =>println(s"错误: ${e.getMessage}")e.printStackTrace()} finally {spark.stop()}}
}
http://www.jsqmd.com/news/57599/

相关文章:

  • 2025 黑珍珠一钻认证!无味舒食属于什么档次?中高端素食标杆揭秘
  • 在测试领域,如何写一个更好的prompt来进行测试提效
  • 2025最新测评:无味舒食怎么样吗?口味、服务与性价比深度分析
  • go安装配置
  • 2025最新探店报告:无味舒食餐厅评价如何?近期口碑怎么样?
  • 2025热门美食解析:无味舒食起源地与菜品特色解读
  • 2025年度盘点:无味舒食是否值得推荐?四大维度深度解析
  • 2025 实测无味舒食素食怎么样?健康疗愈 + 山海本味值得试吗?
  • 2025广东企业数字化转型服务商最新TOP5评测:引领企业变革新航向
  • 2025年广东飞书服务商综合实力TOP5:赋能企业智能化协作新生态
  • 2025最新青岛防水补漏施工单位推荐 堵漏检修守护建筑安全防线
  • 测试面试经验1
  • 2025年合肥笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星如何选择?多维度对比与排名指南
  • 2025年沈阳维修点推荐:哪个性价比更高?联想华硕戴尔微软惠普宏碁三星排名解析
  • 2025最新沈阳防水补漏工程施工单位口碑推荐 防水堵漏检测全流程专业可靠
  • 四川省宏正大耐火材料有限公司联系方式:产品类型与施工服务介绍
  • 2025年南通笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星选择指南:专业技术与用户满意度测评
  • 2025年沈阳笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星维修点怎么选?深度调查与选择指南
  • 2025年辽宁朋友圈广告公司最新综合实力评测top5:社交营销生态的领军者与创新者,腾讯全域广告投放
  • 四川省宏正大耐火材料有限公司联系方式:材料选购与施工注意事项
  • 2025专业防水补漏公司推荐—尤卉防水,连锁企业,上海/青岛/沈阳/沧州等多城市首选品牌
  • 2025年太原笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星哪家维修更高效?客户评价与服务质量解析
  • 2025辽宁网络推广公司最新TOP5推荐:企业数字化增长新引擎
  • 2025年南通笔记本电脑售后维修点推荐:哪家服务更可靠?联想华硕戴尔微软惠普宏碁三星维修口碑深度评测
  • 2025年济南笔记本电脑售后维修点推荐:哪家性价比更高?联想华硕戴尔微软惠普宏碁三星排名指南
  • 2025年福州笔记本电脑售后维修点推荐:哪家性价比更高?实测数据与用户评价解析
  • 2025年合肥笔记本电脑售后维修点推荐:哪个性价比更高?全方位维修案例与用户评价解析
  • 2025年成都笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星服务哪家强?技术实力与维修案例比对
  • rust基础第四篇:#[derive()]使用
  • 2025年东莞笔记本电脑售后维修点推荐:联想华硕戴尔微软惠普宏碁三星服务如何?权威评测与用户满意度分析