共计 3670 个字符,预计需要花费 10 分钟才能阅读完成。
这篇文章主要介绍“MR 程序的组件 combiner 怎么使用”,在日常操作中,相信很多人在 MR 程序的组件 combiner 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MR 程序的组件 combiner 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
用一句简单的话语描述 combiner 组件作用:降低 map 任务输出, 减少 reduce 任务数量, 从而降低网络负载
工作机制:
Map 任务允许在提交给 Reduce 任务之前在本地执行一次汇总的操作,那就是 combiner 组件,combiner 组件的行为模式和 Reduce 一样,都是接收 key/values,产生 key/value 输出
注意:
1、combiner 的输出是 reduce 的输入
2、如果 combiner 是可插拔的,那么 combiner 绝不能改变最终结果
3、combiner 是一个优化组件, 但是并不是所有地方都能用到, 所以 combiner 只能用于 reduce 的输入、输出 key/value 类型完全一致且不影响最终结果的场景。
例子:WordCount 程序中, 通过统计每一个单词出现的次数, 我们可以首先通过 Map 任务本地进行一次汇总 (Combiner),然后将汇总的结果交给 Reduce,完成各个 Map 任务存在相同 KEY 的数据进行一次总的汇总,图:
Combiner 代码:
Combiner 类,直接打开 Combiner 类源码是直接继承 Reducer 类,所以我们直接继承 Reducer 类即可,最终在提交时指定咱们定义的 Combiner 类即可
package com.itheima.hadoop.mapreduce.combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountCombiner extends
Reducer Text, LongWritable, Text, LongWritable {
@Override
protected void reduce(Text key, Iterable LongWritable values, Context context)
throws IOException, InterruptedException {
long count = 0 ;
for (LongWritable value : values) { count += value.get();
}
context.write(key, new LongWritable(count));
}
}
Mapper 类:
package com.itheima.hadoop.mapreduce.mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountCombinerMapper extends
Mapper LongWritable, Text, Text, LongWritable { public void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString(); // 获取一行数据
String[] words = line.split( // 获取各个单词
for (String word : words) {
// 将每一个单词写出去
context.write(new Text(word), new LongWritable(1));
}
}
}
驱动类:
package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner;
import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper;
public class WordCountCombinerDriver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
/**
* 提交五重奏: * 1、产生作业
* 2、指定 MAP/REDUCE
* 3、指定 MAPREDUCE 输出数据类型
* 4、指定路径
* 5、提交作业
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountCombinerDriver.class);
job.setMapperClass(WordCountCombinerMapper.class);
/*** 此处中间小插曲:combiner 组件 ***/
job.setCombinerClass(WordCountCombiner.class);
/*** 此处中间小插曲:combiner 组件 ***/
//reduce 逻辑和 combiner 逻辑一致且 combiner 又是 reduce 的子类
job.setReducerClass(WordCountCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
主类:
package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.drivers.WordCountCombinerDriver;
public class WordCountCombinerRunner { public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new WordCountCombinerDriver(), args);
System.exit(res);
}
}
运行结果:
到此,关于“MR 程序的组件 combiner 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!