当前位置: 首页 > news >正文

Spark 的文件处理与状态跟踪(Java+Python+Scala三版本)

下面提供使用Spark读取目录文件、判断已处理文件并逐个输出内容的PySpark和Java示例。示例采用简单的持久化记录(文件列表)来跟踪已处理文件,并演示了核心逻辑。


思路说明

  1. 从指定目录读取所有文件(使用wholeTextFiles,每个文件作为一个键值对,键为文件路径,值为文件内容)。

  2. 从记录文件中加载已处理文件路径集合。

  3. 过滤出未处理过的文件。

  4. 逐个处理新文件(此处仅打印文件名和内容,可替换为实际业务逻辑)。

  5. 将本次处理的文件路径追加到记录文件中,便于下次运行时跳过。

注意wholeTextFiles适用于小文件场景,会将整个文件内容加载到内存。对于大文件,应改用其他方式(如先列出文件列表,再逐个使用textFile读取),本例为简化采用前者。


1. PySpark 版本

from pyspark import SparkContext, SparkConf import os import sys def process_new_files(directory_path, processed_files_path): conf = SparkConf().setAppName("ProcessNewFiles-PySpark") sc = SparkContext(conf=conf) # 读取已处理文件列表 processed = set() if os.path.exists(processed_files_path): with open(processed_files_path, 'r') as f: processed = set(line.strip() for line in f if line.strip()) # 读取目录下所有文件 (key: 文件绝对路径, value: 文件内容) files_rdd = sc.wholeTextFiles(directory_path) # 过滤掉已处理文件 new_files_rdd = files_rdd.filter(lambda kv: kv[0] not in processed) # 处理每个新文件:打印内容,并返回文件路径 def process_file(kv): file_path, content = kv print(f"Processing file: {file_path}") print("Content:") print(content) # 这里可加入实际处理逻辑 return file_path # 收集本次处理的所有文件路径 newly_processed = new_files_rdd.map(process_file).collect() # 将新处理的文件路径追加到记录文件 if newly_processed: with open(processed_files_path, 'a') as f: for path in newly_processed: f.write(path + '\n') sc.stop() if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: process_new_files.py <directory_path> <processed_files_path>") sys.exit(1) process_new_files(sys.argv[1], sys.argv[2])

2. Java 版本

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.io.*; import java.util.HashSet; import java.util.List; import java.util.Set; public class ProcessNewFiles { public static void main(String[] args) { if (args.length != 2) { System.err.println("Usage: ProcessNewFiles <directory_path> <processed_files_path>"); System.exit(1); } String directoryPath = args[0]; String processedFilesPath = args[1]; SparkConf conf = new SparkConf().setAppName("ProcessNewFiles-Java"); JavaSparkContext sc = new JavaSparkContext(conf); // 读取已处理文件列表 Set<String> processed = new HashSet<>(); try (BufferedReader br = new BufferedReader(new FileReader(processedFilesPath))) { String line; while ((line = br.readLine()) != null) { line = line.trim(); if (!line.isEmpty()) { processed.add(line); } } } catch (FileNotFoundException e) { // 文件不存在则忽略(首次运行) } catch (IOException e) { e.printStackTrace(); sc.close(); return; } // 读取目录下所有文件 JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(directoryPath); // 过滤已处理文件 JavaPairRDD<String, String> newFilesRDD = filesRDD.filter( (Function<Tuple2<String, String>, Boolean>) kv -> !processed.contains(kv._1()) ); // 处理每个新文件:打印内容,返回文件路径 JavaPairRDD<String, String> processedInThisRun = newFilesRDD.mapToPair( (PairFunction<Tuple2<String, String>, String, String>) kv -> { String filePath = kv._1(); String content = kv._2(); System.out.println("Processing file: " + filePath); System.out.println("Content:"); System.out.println(content); return new Tuple2<>(filePath, ""); // 仅保留路径用于记录 } ); // 收集文件路径到Driver List<String> newlyProcessed = processedInThisRun.keys().collect(); // 追加到记录文件 if (!newlyProcessed.isEmpty()) { try (BufferedWriter bw = new BufferedWriter(new FileWriter(processedFilesPath, true))) { for (String path : newlyProcessed) { bw.write(path); bw.newLine(); } } catch (IOException e) { e.printStackTrace(); } } sc.close(); } }

3.Scala版本

import org.apache.spark.{SparkConf, SparkContext} import java.io.{BufferedWriter, FileWriter, File, PrintWriter} import scala.io.Source object ProcessNewFiles { def main(args: Array[String]): Unit = { if (args.length != 2) { println("Usage: ProcessNewFiles <directory_path> <processed_files_path>") System.exit(1) } val directoryPath = args(0) val processedFilesPath = args(1) // 初始化 Spark val conf = new SparkConf().setAppName("ProcessNewFiles-Scala") val sc = new SparkContext(conf) // 1. 从记录文件中加载已处理文件路径集合 val processedSet = loadProcessedFiles(processedFilesPath) // 2. 读取目录下所有文件,生成 (filePath, content) 对 val filesRDD = sc.wholeTextFiles(directoryPath) // 3. 过滤掉已处理文件 val newFilesRDD = filesRDD.filter { case (path, _) => !processedSet.contains(path) } // 4. 处理每个新文件:打印文件名和内容(可替换为实际逻辑) val newPaths = newFilesRDD.map { case (path, content) => println(s"Processing file: $path") println("Content:") println(content) // 这里可以加入实际业务处理代码 path // 返回文件路径以便记录 }.collect() // 收集到 Driver // 5. 将本次处理的文件路径追加到记录文件 if (newPaths.nonEmpty) { appendProcessedFiles(processedFilesPath, newPaths) } sc.stop() } /** * 从记录文件中读取已处理的文件路径,返回 Set[String] */ def loadProcessedFiles(path: String): Set[String] = { val file = new File(path) if (file.exists()) { Source.fromFile(file).getLines().map(_.trim).filter(_.nonEmpty).toSet } else { Set.empty[String] } } /** * 将新处理的文件路径追加到记录文件 */ def appendProcessedFiles(path: String, paths: Array[String]): Unit = { val writer = new BufferedWriter(new FileWriter(path, true)) try { paths.foreach { p => writer.write(p) writer.newLine() } } finally { writer.close() } } }

4. 准备 Sample 数据并运行

创建测试文件

在终端执行以下命令,创建包含两个小文件的目录,并初始化记录文件:

mkdir -p /tmp/sample_data echo "Hello, this is file1" > /tmp/sample_data/file1.txt echo "Content of file2" > /tmp/sample_data/file2.txt touch /tmp/processed_files.txt # 初始为空
运行 PySpark 版本

假设脚本保存为process_new_files.py,执行:

spark-submit process_new_files.py /tmp/sample_data /tmp/processed_files.txt

首次运行会处理 file1.txt 和 file2.txt,输出内容并更新/tmp/processed_files.txt

再次运行相同命令,将跳过已处理文件,不会输出任何内容(因为两个文件都已记录)。

运行 Java 版本

编译并打包成 JAR(例如process-new-files.jar),然后执行:

spark-submit --class ProcessNewFiles process-new-files.jar /tmp/sample_data /tmp/processed_files.txt

行为与 PySpark 版本一致。

运行 Scala 版本

假设项目结构如下:

text

ProcessNewFiles/ ├── build.sbt └── src/main/scala/ProcessNewFiles.scala

build.sbt内容示例:

name := "ProcessNewFiles" version := "1.0" scalaVersion := "2.12.17" // 请根据你的 Spark 版本选择合适的 Scala 版本 libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.8" % "provided"

然后执行sbt package,生成的 JAR 位于target/scala-2.12/processnewfiles_2.12-1.0.jar

提交运行

spark-submit \ --class ProcessNewFiles \ --master local[2] \ target/scala-2.12/processnewfiles_2.12-1.0.jar \ /tmp/sample_data \ /tmp/processed_files.txt

5. 注意事项

  • 文件路径标识wholeTextFiles返回的键包含file:前缀,记录文件中保存的也是完整路径,确保一致性。

  • 小文件场景:上述代码适合文件较小的情况。若文件很大,建议先用 Hadoop FileSystem 列出文件名,再逐一用textFile读取。

  • 并发写入记录文件:示例在 Driver 端追加写入,适用于单次批处理。若需多应用并发,应改用数据库或分布式锁。

http://www.jsqmd.com/news/423815/

相关文章:

  • 剖析Vera Rubin,读懂NVIDIA的下一个十年!
  • Ubuntu系统上安装Spark3.5.8+Hadoop3单节点运行环境
  • 基于NSGA-Ⅲ优化算法的梯级水电和火电机组的联合多目标调度研究附Matlab代码
  • 顶会FAST26最佳论文|阿里云本地存储的过去、现在与未来
  • MWC:苹果在6G领域从跟随者转身成为引领者
  • 基于MOEAD 和 NSGA-II多目标优化算法解决柔性车间调度问题附Python代码
  • 【路径规划】一种新型的基于采样的运动规划算法,集成了ADD-RRT、RRV和改进型Bridge Test复杂环境的优化改进附matlab代码
  • 基于NMPC的静态与动态障碍物环境下点镇定问题研究附Matlab代码、Simulink仿真
  • Kindle电子书阅读器的十个实用技巧
  • 风机在“摸鱼”你知道吗?风电功率预测最大的盲区:模型看见了风,却没看见设备在偷懒
  • 亚马逊、英伟达和软银向OpenAI投资1100亿美元
  • CANN NEXT学习周 - 面向下一代硬件的算子编程必修课(Ascend C)
  • MiniMax接入OpenClaw,我搭建了一支AI顾问团队
  • 智能体平台或将大幅降低SaaS软件许可成本
  • NVIDIA团队打造“罗马速建师“:一分钟重建千张照片的3D世界
  • 好写作AI:从“题眼”到手,当代大学生为何拥抱AI辅助论文?
  • LinkedIn突破:智能识别与纠正AI训练中的“自信错误”陷阱
  • AI智能体需要编排管理而非仅仅智能化
  • 上海有哪些专业做运动仿真服务的公司?2026原创优选指南 - 冠顶工业设备
  • 基于NSGA-III算法求解微电网多目标优化调度研究附Matlab代码
  • 2026锌包钢接地排名 - 非研科技
  • QA之三 - 变异测试 -- PITest
  • 后缀数组与马拉车学习笔记
  • 改稿速度拉满AI论文工具,千笔 VS WPS AI,本科生专属利器
  • 从此告别拖延,AI论文软件千笔AI VS Checkjie,MBA写作更高效!
  • 钛合金水质探头定制多少钱,德川电子价格合理吗 - 工业品牌热点
  • 基于Java+Springboot+Vue开发的网上服装销售管理系统源码+运行步骤+计算机技术
  • 【UI自动化测试】5_APP自动化测试 _Appium入门示例(重要)
  • Vibe Coding:AI驱动的心流编程,如何重塑开发者体验
  • AI写论文新选择,这4款AI论文写作工具精准攻克论文各环节!