MR程序的组件combiner怎么使用

77次阅读
没有评论

共计 3670 个字符,预计需要花费 10 分钟才能阅读完成。

这篇文章主要介绍“MR 程序的组件 combiner 怎么使用”,在日常操作中,相信很多人在 MR 程序的组件 combiner 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MR 程序的组件 combiner 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

  用一句简单的话语描述 combiner 组件作用:降低 map 任务输出, 减少 reduce 任务数量, 从而降低网络负载

  工作机制:

    Map 任务允许在提交给 Reduce 任务之前在本地执行一次汇总的操作,那就是 combiner 组件,combiner 组件的行为模式和 Reduce 一样,都是接收 key/values,产生 key/value 输出

 

  注意:

 1、combiner 的输出是 reduce 的输入

 2、如果 combiner 是可插拔的,那么 combiner 绝不能改变最终结果

 3、combiner 是一个优化组件, 但是并不是所有地方都能用到, 所以 combiner 只能用于 reduce 的输入、输出 key/value 类型完全一致且不影响最终结果的场景。

  例子:WordCount 程序中, 通过统计每一个单词出现的次数, 我们可以首先通过 Map 任务本地进行一次汇总 (Combiner),然后将汇总的结果交给 Reduce,完成各个 Map 任务存在相同 KEY 的数据进行一次总的汇总,图:

 

Combiner 代码:

 Combiner 类,直接打开 Combiner 类源码是直接继承 Reducer 类,所以我们直接继承 Reducer 类即可,最终在提交时指定咱们定义的 Combiner 类即可

package com.itheima.hadoop.mapreduce.combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountCombiner extends
 Reducer Text, LongWritable, Text, LongWritable  {
 @Override
 protected void reduce(Text key, Iterable LongWritable  values, Context context)
 throws IOException, InterruptedException {
 long count = 0 ;
 for (LongWritable value : values) { count += value.get();
 }
 context.write(key, new LongWritable(count));
 }
}

Mapper 类:

package com.itheima.hadoop.mapreduce.mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountCombinerMapper extends
 Mapper LongWritable, Text, Text, LongWritable  { public void map(LongWritable key, Text value, Context context)
 throws java.io.IOException, InterruptedException {
 
 String line = value.toString(); // 获取一行数据
 String[] words = line.split(    // 获取各个单词
 for (String word : words) {
 //  将每一个单词写出去
 context.write(new Text(word), new LongWritable(1));
 }
 
 
 
 }
}

驱动类:

package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner;
import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper;
public class WordCountCombinerDriver extends Configured implements Tool{
 @Override
 public int run(String[] args) throws Exception {
 /**
 *  提交五重奏: * 1、产生作业
 * 2、指定 MAP/REDUCE
 * 3、指定 MAPREDUCE 输出数据类型
 * 4、指定路径
 * 5、提交作业
 */
 Configuration conf = new Configuration();
 Job job = Job.getInstance(conf);
 job.setJarByClass(WordCountCombinerDriver.class);
 job.setMapperClass(WordCountCombinerMapper.class);
 
 /*** 此处中间小插曲:combiner 组件 ***/
 job.setCombinerClass(WordCountCombiner.class);
 /*** 此处中间小插曲:combiner 组件 ***/
 
 //reduce 逻辑和 combiner 逻辑一致且 combiner 又是 reduce 的子类
 job.setReducerClass(WordCountCombiner.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(LongWritable.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(LongWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 return job.waitForCompletion(true) ? 0 : 1;
 }
}

主类:

package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.drivers.WordCountCombinerDriver;
public class WordCountCombinerRunner { public static void main(String[] args) throws Exception {
 
 int res = ToolRunner.run(new WordCountCombinerDriver(), args);
 System.exit(res);
 }
}

运行结果:

到此,关于“MR 程序的组件 combiner 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3670字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)