丸趣 TV 小编给大家分享一下 hadoop-reduce 的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
Map 的结果,会通过 partition 分发到 Reducer 上,Reducer 做完 Reduce 操作后,通过 OutputFormat,进行输出。
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
import java.io.IOException;
* Reduces a set of intermediate values which share a key to a smaller set of
public class Reducer KEYIN,VALUEIN,KEYOUT,VALUEOUT {
public class Context
extends ReduceContext KEYIN,VALUEIN,KEYOUT,VALUEOUT {
public Context(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter KEYOUT,VALUEOUT output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator KEYIN comparator,
Class KEYIN keyClass,
Class VALUEIN valueClass
) throws IOException, InterruptedException {
super(conf, taskid, input, inputKeyCounter, inputValueCounter,
output, committer, reporter,
comparator, keyClass, valueClass);
}
}
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings(unchecked)
protected void reduce(KEYIN key, Iterable VALUEIN values, Context context
) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException { setup(context);
while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
}
Mapper 的结果,可能送到可能的 Combiner 做合并,Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner 的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。
Mapper 最终处理的结果对 key, value,是需要送到 Reducer 去合并的,合并的时候,有相同 key 的键 / 值对会送到同一个 Reducer 那,哪个 key 到哪个 Reducer 的分配过程,是由 Partitioner 规定的,它只有一个方法,输入是 Map 的结果对 key, value 和 Reducer 的数目,输出则是分配的 Reducer(整数编号)。系统缺省的 Partitioner 是 HashPartitioner,它以 key 的 Hash 值对 Reducer 的数目取模,得到对应的 Reducer。
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
* Partitions the key space.
public abstract class Partitioner KEY, VALUE {
/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* p Typically a hash function on a all or a subset of the key. /p
*
* @param key the key to be partioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the code key /code .
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner K, V extends Partitioner K, V { /** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) { return (key.hashCode() Integer.MAX_VALUE) % numReduceTasks;
}
}
Reducer 是所有用户定制 Reducer 类的基类,和 Mapper 类似,它也有 setup,reduce,cleanup 和 run 方法,其中 setup 和 cleanup 含义和 Mapper 相同,reduce 是真正合并 Mapper 结果的地方,它的输入是 key 和这个 key 对应的所有 value 的一个迭代器,同时还包括 Reducer 的上下文。系统中定义了两个非常简单的 Reducer,IntSumReducer 和 LongSumReducer,分别用于对整形 / 长整型的 value 求和。
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce.lib.reduce;
import java.io.IOException;
public class IntSumReducer Key extends Reducer Key,IntWritable,
Key,IntWritable { private IntWritable result = new IntWritable();
public void reduce(Key key, Iterable IntWritable values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) { sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Reduce 的结果,通过 Reducer.Context 的方法 collect 输出到文件中,和输入类似,Hadoop 引入了 OutputFormat。OutputFormat 依赖两个辅助接口:RecordWriter 和 OutputCommitter,来处理输出。RecordWriter 提供了 write 方法,用于输出 key, value 和 close 方法,用于关闭对应的输出。OutputCommitter 提供了一系列方法,用户通过实现这些方法,可以定制 OutputFormat 生存期某些阶段需要的特殊操作。我们在 TaskInputOutputContext 中讨论过这些方法(明显,TaskInputOutputContext 是 OutputFormat 和 Reducer 间的桥梁)。
OutputFormat 和 RecordWriter 分别对应着 InputFormat 和 RecordReader,系统提供了空输出 NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter 只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件 FileOutputFormat 的 SequenceFileOutputFormat 和 TextOutputFormat 输出。
基于文件的输出 FileOutputFormat 利用了一些配置项配合工作,包括 mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat 还依赖于 FileOutputCommitter,通过 FileOutputCommitter 提供一些和 Job,Task 相关的临时文件管理功能。如 FileOutputCommitter 的 setupJob,会在输出路径下创建一个名为_temporary 的临时目录,cleanupJob 则会删除这个目录。
SequenceFileOutputFormat 输出和 TextOutputFormat 输出分别对应输入的 SequenceFileInputFormat 和 TextInputFormat
以上是“hadoop-reduce 的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!