当前位置: 首页 > news >正文

实验5 MapReduce初级编程实践

实验步骤

(一)编程实现文件合并和去重操作

对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。

输入文件A的样例如下:

 

20170101     x

20170102     y

20170103     x

20170104     y

20170105     z

20170106     x

 

输入文件B的样例如下:

20170101      y

20170102      y

20170103      x

20170104      z

20170105      y

 

根据输入文件A和B合并得到的输出文件C的样例如下:

20170101      x

20170101      y

20170102      y

20170103      x

20170104      y

20170104      z

20170105      y

20170105      z

20170106      x

 

(二)编写程序实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

输入文件1的样例如下:

33

37

12

40

 

输入文件2的样例如下:

4

16

39

5

 

输入文件3的样例如下:

1

45

25

 

根据输入文件1、2和3得到的输出文件如下:

1 1

2 4

3 5

4 12

5 16

6 25

7 33

8 37

9 39

10 40

11 45

       

(三)对给定的表格进行信息挖掘

下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

输入文件内容如下:

child          parent

Steven        Lucy

Steven        Jack

Jone         Lucy

Jone         Jack

Lucy         Mary

Lucy         Frank

Jack         Alice

Jack         Jesse

David       Alice

David       Jesse

Philip       David

Philip       Alma

 Mark       David

Mark       Alma

 

输出文件内容如下:

grandchild       grandparent

Steven          Alice

Steven          Jesse

Jone            Alice

Jone            Jesse

Steven          Mary

Steven          Frank

Jone            Mary

Jone            Frank

Philip           Alice

Philip           Jesse

Mark           Alice

Mark           Jesse

具体代码

导入依赖

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>step1</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

任务一

MergeDeduplicateMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MergeDeduplicateMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行if (line.isEmpty()) {return;}// 按制表符或空格分割String[] parts = line.split("\\s+");if (parts.length >= 2) {// 以日期和值的组合作为key,这样可以去除重复的记录String date = parts[0];String val = parts[1];// 创建复合键:日期+值String compositeKey = date + "\t" + val;outputKey.set(compositeKey);outputValue.set("");  // 值为空,因为我们只需要keycontext.write(outputKey, outputValue);}}
}
MergeDeduplicateReducer
package com.example;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class MergeDeduplicateReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 由于在mapper阶段已经用复合键去重,这里每个key只会出现一次// 直接输出键(包含日期和值)String compositeKey = key.toString();String[] parts = compositeKey.split("\t");if (parts.length == 2) {Text outputKey = new Text(parts[0]);Text outputValue = new Text(parts[1]);context.write(outputKey, outputValue);}}
}
MergeDeduplicateDriver
package com.example;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MergeDeduplicateDriver {public static void main(String[] args) throws Exception {if (args.length != 3) {System.err.println("Usage: MergeDeduplicate <input path A> <input path B> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Merge and Deduplicate Files");// 设置Jar类job.setJarByClass(MergeDeduplicateDriver.class);// 设置Mapper和Reducer类job.setMapperClass(MergeDeduplicateMapper.class);job.setReducerClass(MergeDeduplicateReducer.class);// 设置输出键值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));  // 文件AFileInputFormat.addInputPath(job, new Path(args[1]));  // 文件BFileOutputFormat.setOutputPath(job, new Path(args[2])); // 输出文件C// 设置Reducer任务数量job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

任务二

SortMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class SortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {private final IntWritable number = new IntWritable();private static final IntWritable one = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行if (line.isEmpty()) {return;}try {// 将字符串转换为整数int num = Integer.parseInt(line);number.set(num);// 输出 (数字, 1),其中1是占位符context.write(number, one);} catch (NumberFormatException e) {// 如果遇到非数字内容,跳过System.err.println("跳过非数字内容: " + line);}}
}
SortReducer
package com.example;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;public class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private final IntWritable rank = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {// 由于MapReduce已经按key(数字)排序,我们只需要按顺序输出排名for (IntWritable value : values) {context.write(rank, key);// 排名递增rank.set(rank.get() + 1);}}
}
SortDriver
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SortDriver {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: SortDriver <input path> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Number Sort with Rank");// 设置Jar类job.setJarByClass(SortDriver.class);// 设置Mapper和Reducer类job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);// 设置Mapper输出键值类型job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(IntWritable.class);// 设置最终输出键值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));  // 输入目录(包含所有输入文件)FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出目录// 设置Reducer任务数量为1,确保全局排序job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

任务三

GrandParentMapper
package com.example;import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class GrandParentMapper extends Mapper<LongWritable, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString().trim();// 跳过空行和标题行if (line.isEmpty() || line.startsWith("child") || line.startsWith("grandchild")) {return;}// 按制表符或空格分割String[] parts = line.split("\\s+");if (parts.length >= 2) {String child = parts[0];String parent = parts[1];// 输出两种关系:// 1. 作为子代关系:key=子, value="1:"+父 (1表示这是子代关系)// 2. 作为父代关系:key=父, value="2:"+子 (2表示这是父代关系)outputKey.set(child);outputValue.set("1:" + parent);context.write(outputKey, outputValue);outputKey.set(parent);outputValue.set("2:" + child);context.write(outputKey, outputValue);}}
}
GrandParentReducer
package com.example;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class GrandParentReducer extends Reducer<Text, Text, Text, Text> {private final Text outputKey = new Text();private final Text outputValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 存储该人的子女列表和父母列表List<String> children = new ArrayList<>();List<String> parents = new ArrayList<>();for (Text value : values) {String valStr = value.toString();if (valStr.startsWith("1:")) {// 这是父母关系:1:父母名parents.add(valStr.substring(2));} else if (valStr.startsWith("2:")) {// 这是子女关系:2:子女名children.add(valStr.substring(2));}}// 生成祖孙关系:该人的父母(祖父母) × 该人的子女(孙子)for (String parent : parents) {for (String child : children) {outputKey.set(child);        // 孙子outputValue.set(parent);     // 祖父母context.write(outputKey, outputValue);}}}
}
GrandParentDriver
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class GrandParentDriver {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: GrandParentDriver <input path> <output path>");System.exit(-1);}Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Find Grandparent Relationships");// 设置Jar类job.setJarByClass(GrandParentDriver.class);// 设置Mapper和Reducer类job.setMapperClass(GrandParentMapper.class);job.setReducerClass(GrandParentReducer.class);// 设置输出键值类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入输出格式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 设置输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置Reducer任务数量job.setNumReduceTasks(1);// 等待作业完成System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
http://www.jsqmd.com/news/53073/

相关文章:

  • 2025年燃气低氮热水锅炉加工厂权威推荐榜单:家庭燃气热水锅炉/立式卧式燃气热水锅炉/半吨燃气热水锅炉设备源头厂家精选
  • 08.入门篇-Java程序运行原理
  • rust关键字unsafe
  • 完整教程:TouchDIVER Pro 触觉手套:Weart把火星岩石触感、手术操作感搬进 XR
  • 2025 年液化气泵厂家最新推荐榜,聚焦技术创新与质量保障的优质品牌深度解析无密封/磁力/倒罐/双端面机械密封/屏蔽/增压液化气泵公司推荐
  • 【水印检查】字符串处理和矩阵的存入
  • 高品质牛肉品牌推荐:安心之选,守护家庭餐桌
  • 06.入门篇-AI编程助手
  • 中药品牌十强排名彰显实力,好医生以完整产业链布局未来
  • 2025年11月电线电缆最新推荐厂家,高压电缆、中压电缆、低压电缆、铜芯电缆、铝芯电缆、铝合金电缆多维度综合考量
  • 从零部署网站客服系统:我踩过的域名和服务器坑,帮你省下几千块!
  • U634637 Star way to heaven
  • 【51单片机】【protues仿真】基于51单片机自动浇花强大的系统
  • 2025 年不锈钢水管厂家最新推荐榜,深度剖析品牌技术实力与市场口碑的核心竞争力薄壁/沟槽/卫生级/环压/快装/316/卡压式不锈钢水管/不锈钢水管工程/不锈钢水管管件/不锈钢水管安装公司推荐
  • 产学研融合!2025年中成药品牌排行榜10强好医生集团的创新引擎
  • FrameWork4.5 项目下使用EF6 同一项目操作多种数据库
  • 微波烘干设备厂家技术实力与行业应用解析
  • 2025年定期排污扩容器生产商权威推荐榜单:电厂疏水扩容器/定连排疏水扩容器/定期排污疏水扩容器源头厂家精选
  • 2025 年最新推荐激光切管机厂家排行榜:聚焦高效高精度设备,助力企业提升金属管材加工品质高速 / 高精度 / 零尾料 / 免画图 / 全自动 / 三卡盘激光切管机公司推荐
  • 2025 年升降柱机芯厂家最新推荐榜,技术实力与市场口碑深度解析,筛选高性能可靠货源IP68 升降柱机芯 / 防撞升降柱机芯 / 低压升降柱机芯 / 液压升降柱机芯 / 路障机升降柱机芯公司推荐
  • 不只是制药!中药品牌排行榜10强好医生,用石榴谱写产业富民传奇
  • java 上转型对象调用
  • 比较好吸收的奶粉怎么选?这篇文章里有答案
  • PostgreSQL 18 - 时间约束 (Temporal Constraints)
  • 深入解析:Angular【基础语法】
  • 微波烘干设备哪家好?国内优质企业及业务解析
  • U635097 有向图
  • 升级Win11专业工作站版密钥
  • 多线程+asyncio端口扫描器
  • U635735 Treap=Tree+Heap