当前位置: 首页 > news >正文

MapReduce使用和原理(三)

Combiner预聚合

Combiner是一个可选的优化步骤,在Map任务输出结果后、Reduce输入前执行。其作用是对Map任务的输出进行局部合并,将具有相同键的键值对合并为一个,以减少需要传输到Reduce节点的数据量,降低网络开销,并提高整体性能。

Combiner实际上是一种轻量级的Reduce操作,用于减少数据在网络传输过程中的负担。需要注意的是,Combiner的执行并不是强制的,而是由开发人员根据具体情况决定是否使用,一些情况下不适合使用Combiner,例如:对数据进行均值计算场景。

在MapReduce中使用Combiner预聚合需要两个步骤:

1. 自定义类实现Reducer,实现reduce方法,完成聚合逻辑

2. 在Driver中设置“job.setCombinerClass(YourCombiner.class)”在Map端使用Combiner预聚合

下面对WordCount案例进行改造,实现Map端进行相同单词的预聚合。

1) 自定义类WordCountCombiner类实现Reducer类

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { //创建写出的value IntWritable total = new IntWritable(); //每组key会调用一次 @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; //累加 for (IntWritable value : values) { sum += value.get(); } //设置当前key对应value结果值 total.set(sum); //结果写出 context.write(key,total); } }

自定义Reduce端分组比较器

默认在MapReduce Reduce端每个key对应一组数据,一个Redcue Task可以处理多组key,默认哪些数据分配到相同的组就是按照key是否相等决定的。我们也可以通过在自定义分组比较器来决定将哪些数据看成同一个组进行处理(相同key)。

使用自定义Redcue端分组比较器需要如下两个步骤:

1) 自定义Reduce端分组比较器

2) 在Driver中通过”job.setGroupingComparatorClass(YourGroupingComparator.class)”进行设置。

案例需求

不使用自定义分组比较器实现

1) Temperature

import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 温度实体类 */ public class Temperature implements WritableComparable<Temperature> { private String year; private String month; private String day; private Integer temp; //空构造 public Temperature() { } //有参构造 public Temperature(String year, String month, String day, Integer temp) { this.year = year; this.month = month; this.day = day; this.temp = temp; } //getter setter public String getYear() { return year; } public void setYear(String year) { this.year = year; } public String getMonth() { return month; } public void setMonth(String month) { this.month = month; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public Integer getTemp() { return temp; } public void setTemp(Integer temp) { this.temp = temp; } //toString() @Override public String toString() { return year + "-" +month + "-" +day + "\t" +temp; } //序列化与反序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.year); dataOutput.writeUTF(this.month); dataOutput.writeUTF(this.day); dataOutput.writeInt(this.temp); } @Override public void readFields(DataInput dataInput) throws IOException { year = dataInput.readUTF(); month = dataInput.readUTF(); day = dataInput.readUTF(); temp = dataInput.readInt(); } //两个对象如何比较数据 @Override public int compareTo(Temperature o) { //按照相同的年月、温度降序排序 int yearCompare = this.getYear().compareTo(o.getYear()); int monthCompare = this.getMonth().compareTo(o.getMonth()); if(yearCompare==0){ if(monthCompare==0){ //按照温度大的降序排序 return this.temp > o.temp ? -1:1; } return monthCompare; } return yearCompare; } }

4) TemperatureReducer

import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.*; public class TemperatureReducer extends Reducer<Temperature, Temperature, Temperature,NullWritable> { int cnt ; String year; String month; String day; //用来标记某个分区中是否处理过相同日期数据,map中key为年月,value:<day,年月计数> HashMap<String, String> flagMap = new HashMap<>(); //相同的key分为一组,这里需要将分区中所有的数据拿在一起最后比较获取日期最大的数据 ArrayList<Temperature> list = new ArrayList<>(); @Override protected void reduce(Temperature key, Iterable<Temperature> values, Reducer<Temperature, Temperature, Temperature, NullWritable>.Context context) throws IOException, InterruptedException { Iterator<Temperature> iterator = values.iterator(); while(iterator.hasNext()){ Temperature next = iterator.next(); list.add(next); } //最后比较得到温度较高的两条数据,日期不能相同 for (Temperature temperature : list) { year = temperature.getYear(); month = temperature.getMonth(); day = temperature.getDay(); //第一次处理某个年月日数据 if(!flagMap.containsKey(year+"-"+month)){ cnt = 1 ; context.write(temperature,NullWritable.get()); flagMap.put(year+"-"+month,day+","+cnt); } //如果flagMap中包含年月数据,判断value是不是同一日期,是同一日期不输出,不是同一日期输出数据 if(flagMap.containsKey(year+"-"+month)&&!day.equals(flagMap.get(year+"-"+month).split(",")[0])){ //获取当前年月记录的条数 cnt = Integer.valueOf(flagMap.get(year + "-" + month).split(",")[1]); cnt +=1; //说明当前年月下不够2条数据 if(cnt == 2){ context.write(temperature,NullWritable.get()); } flagMap.put(year+"-"+month,day+","+cnt); } } } }

使用自定义分组比较器实现

相比以上代码,使用自定义分区比较器首先需要自定义类继承WritableComparator抽象类并实现构造和compare方法,在构造方法中需要调用父类构造传入排序对象类型及是否创建实例,在compare方法中实现决定将哪些数据放入同一组的比较逻辑。

自定义输出格式

在MapReduce中Reduce写出数据时根据不同的OutputFormat格式化类来决定数据如何写出,OutputFormat格式化类中通过getRecordWriter

方法获取RecordWriter对象进而将数据通过RecordWriter.write()方法写出到外部系统,默认写出格式类为TextOutputFormat(该类继承自抽象类FileOutputFormat,FileOutputFormat又继承自顶级的OutputFormat抽象类),即:一行行将数据写出到外部text文件中,生成的文件名称为part-r-00000、part-r-00001... 如果我们想要改变写出文件名称也可以通过定义类继承FileOutputFormat抽象类并实现对应方法即可。

自定义OutputFormat及使用自定义输出格式步骤如下:

1) 自定义类继承FileOutputFormat并实现getRecordWriter方法

2) 在getRecordWriter方法中返回自定义RecordWriter类,该类需要集成RecordWriter对象实现对应的数据写出逻辑。

3) 在Driver中设置“job.setOutputFormatClass(YourOutputFormat.class)”使用自定义outputFormat。

案例:学生成绩数据studentscore.txt,内容如下:

/** * 学员信息 */ public class StudentInfo implements WritableComparable<StudentInfo> { private String name; private int score; // 无参构造方法 public StudentInfo() { } // 带参构造方法 public StudentInfo(String name, int score) { this.name = name; this.score = score; } // Getter和Setter方法 public String getName() { return name; } public void setName(String name) { this.name = name; } public int getScore() { return score; } public void setScore(int score) { this.score = score; } @Override public String toString() { return "StudentInfo{" + "name='" + name + '\'' + ", score=" + score + '}'; } // 实现序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(score); } // 实现反序列化方法 @Override public void readFields(DataInput in) throws IOException { name = in.readUTF(); score = in.readInt(); } @Override public int compareTo(StudentInfo o) { if(this.score > o.score){ return -1; }else if(this.score < o.score){ return 1; }else{ return 0; } } }

4) MyOutputFormat

MyOutputFormat类需要继承FileOutputFormat,并实现getRecoreWriter方法,返回RecordWriter对象完成自定义数据输出。

import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MyOutputFormat extends FileOutputFormat<StudentInfo, NullWritable> { // 获取RecordWriter对象 @Override public RecordWriter<StudentInfo, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 根据job来创建文件输出流,需要传入job MyRecordWriter myRecordWriter = new MyRecordWriter(job); return myRecordWriter; } } class MyRecordWriter extends RecordWriter<StudentInfo,NullWritable>{ private FSDataOutputStream passOutputStream; private FSDataOutputStream failOutputStream; //根据job来创建文件输出流 public MyRecordWriter(TaskAttemptContext job) throws IOException { FileSystem fileSystem = FileSystem.get(job.getConfiguration()); // 创建及格成绩输出流 passOutputStream = fileSystem.create(new Path("D:\\mapreduce\\pass.txt")); // 创建不及格成绩输出流 failOutputStream = fileSystem.create(new Path("D:\\mapreduce\\fail.txt")); } //写出数据 @Override public void write(StudentInfo key, NullWritable value) throws IOException, InterruptedException { int score = key.getScore(); if(score >=80){ passOutputStream.writeBytes(score+"\n"); }else{ failOutputStream.writeBytes(score+"\n"); } } //关闭资源 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭输出流并释放资源 IOUtils.closeStreams(passOutputStream,failOutputStream); } }

5) Driver

在Driver中通过设置“job.setOutputFormatClass(MyOutputFormat.class)”指定自定义outputFormat,在实现中指定了数据写出的文件,另外FileOutputFormat.setOutputPath(...)指定的路径中会存放“_SUCCESS”标志文件。

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ScoreDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1.获取配置信息及job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2.设置Driver 程序对应的jar/类 job.setJarByClass(ScoreDriver.class); //3.设置Mapper和Reducer对应的类 job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //4.设置Mapper输出key、value类型 job.setMapOutputKeyClass(StudentInfo.class); job.setMapOutputValueClass(Text.class); //5.设置最终输出K,V类型 job.setOutputKeyClass(StudentInfo.class); job.setOutputValueClass(NullWritable.class); //设置自定义outputFormat job.setOutputFormatClass(MyOutputFormat.class); //6.设置数据输入和结果写出路径 FileInputFormat.setInputPaths(job,new Path("data/studentscore.txt")); //使用了自定义输出类,结果数据会写入自定义输出类中指定的路径,这里设置的目录只是最后写出的_success标记文件路径 FileOutputFormat.setOutputPath(job,new Path("output6/")); //7.运行任务,运行成功返回true boolean success = job.waitForCompletion(true); if (success) { // 任务执行成功的逻辑 System.out.println("任务执行成功"); } else { // 任务执行失败的逻辑 System.out.println("任务执行失败"); } } }
http://www.jsqmd.com/news/852120/

相关文章:

  • 3分钟学会AutoMdxBuilder:零基础制作专业MDX词典的完整指南
  • 淘系,淘宝1688滑块,x82y,x5sec滑块,直滑,窗帘滑块FAIL_SYS_USER_VALIDATE::哎哟喂,被挤爆啦,请稍后重试
  • 2026水处理设备厂家:反渗透、超纯水、软化水设备靠谱品牌推荐 - 深度智识库
  • 我写了个Python脚本,把全城1045座加油站的底价扒干净了——还带环比监控!
  • 嘉兴黄金回收三大正规品牌,闪明钻排行第一,价格透明全城上门安心 - 润富黄金珠宝行
  • DistroAV完全指南:5步打造专业级OBS网络视频系统
  • 怎样高效配置开源风扇控制软件:FanControl终极实战指南与技巧
  • 5分钟解锁Windows字体自由:No!! MeiryoUI个性化指南
  • 2026TOP5黄石市铁山区黄金,白银,铂金回收门店推荐及联系方式权威发布 - 前途无量YY
  • Light Chaser:现代数据可视化设计平台的技术探索与实践指南
  • 2026年JAVA语言要学习多久能去工作 学多久能应付工作
  • ME-Mamba:多通道生存分析的多专家Mamba及其知识获取与融合
  • 企业乔迁家庭搬家选搬家公司的4条实用经验 - 速递信息
  • Hermes Agent 如何配置 custom provider 以接入 Taotoken 聚合服务
  • 2026年宁波同城实体店短视频引流与GEO精准获客完全指南 - 优质企业观察收录
  • 新加坡家具物流哪家服务商亲测靠谱?
  • 2026TOP5黄石市西塞山区黄金,白银,铂金回收门店推荐及联系方式权威发布 - 前途无量YY
  • Taotoken 用量看板如何帮助开发者清晰掌握项目资源消耗
  • 从A类到C类:PA工作点怎么选?聊聊手机射频前端和基站功放背后的效率与线性度权衡
  • 2026年云南房屋加固与昆明旧房改造市场深度横评:从危旧建筑到城市更新的全链条解决方案 - 企业名录优选推荐
  • 南京宝玑腕表维保到底多贵?传世系列洗油费用大揭秘:这样做每年能省下一笔保养基金 ——“宝玑传世,一块表顶一辆车,保养一次要多少钱?” - 亨得利官方维修中心
  • 嵌入式IAP远程升级方案:从Bootloader设计到安全固件更新实战
  • 号易项目开放合作,官方邀请码08888助你快速入局,创业新机遇 - 号易官方邀请码08888
  • 小程序制作平台选型踩坑记录:2026年五大主流方案横向对比 - 速递信息
  • 2026TOP5黄石市下陆区黄金,白银,铂金回收门店推荐及联系方式权威发布 - 前途无量YY
  • 2026嘉兴黄金回收口碑梯队,三大正规店铺全上榜,全城覆盖实时报价 - 润富黄金珠宝行
  • 从原理图到高直通车量产:如何通过一体化 PCB 设计服务破解“交期与良率”隐痛
  • 从Bing搜索日志到AI基准:MS MARCO数据集的前世今生与实战应用避坑指南
  • 广州注册五金批发公司代办机构top4推荐 首选正规机构 高效省心办执照 - 速递信息
  • 2026TOP5惠州市惠城区黄金,白银,铂金回收门店推荐及联系方式权威发布 - 前途无量YY