共计 7916 个字符,预计需要花费 20 分钟才能阅读完成。
这篇文章主要介绍 Mapreduce RCFile 如何写入和读取 API,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
RCFile 是 FaceBook 开发的高压缩比、高效读的行列存储结构。通常在 Hive 中可以直接对一张 Text 表使用 insert-select 转换,但有时希望使用 Mapreduce 进行 RCFile 的读写。
dependency
groupId org.apache.hadoop /groupId
artifactId hadoop-client /artifactId
version 2.5.0-cdh6.2.1 /version
/dependency
dependency
groupId org.apache.hive /groupId
artifactId hive-serde /artifactId
version 0.13.1-cdh6.2.1 /version
/dependency
dependency
groupId org.apache.hive.hcatalog /groupId
artifactId hive-hcatalog-core /artifactId
version 0.13.1-cdh6.2.1 /version
/dependency
读取文本文件,使用 mapreduce 生成 RCFile 格式文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;
import java.io.IOException;
public class RcFileReaderJob {
static class RcFileMapper extends Mapper Object, BytesRefArrayWritable, Text, NullWritable {
@Override
protected void map(Object key, BytesRefArrayWritable value,
Context context)
throws IOException, InterruptedException {
Text txt = new Text();
StringBuffer sb = new StringBuffer();
for (int i = 0; i value.size(); i++) {
BytesRefWritable v = value.get(i);
txt.set(v.getData(), v.getStart(), v.getLength());
if (i == value.size() – 1) {
sb.append(txt.toString());
} else {
sb.append(txt.toString() + \t
}
}
context.write(new Text(sb.toString()), NullWritable.get());
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
}
}
static class RcFileReduce extends Reducer Text, NullWritable, Text, NullWritable {
@Override
protected void reduce(Text key, Iterable NullWritable values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static boolean runLoadMapReducue(Configuration conf, Path input, Path output) throws IOException,
ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(conf);
job.setJarByClass(RcFileReaderJob.class);
job.setJobName(RcFileReaderJob
job.setNumReduceTasks(1);
job.setMapperClass(RcFileMapper.class);
job.setReducerClass(RcFileReduce.class);
job.setInputFormatClass(RCFileMapReduceInputFormat.class);
// MultipleInputs.addInputPath(job, input, RCFileInputFormat.class);
RCFileMapReduceInputFormat.addInputPath(job, input);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println(Usage: rcfile in out
System.exit(2);
}
RcFileReaderJob.runLoadMapReducue(conf, new Path(args[0]), new Path(args[1]));
}
}
读取 RCFile 格式文件,使用 mapreduce 生成 Text 格式文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceOutputFormat;
import java.io.IOException;
public class RcFileWriterJob extends Configured implements Tool{
public static class Map extends Mapper Object, Text, NullWritable, BytesRefArrayWritable {
private byte[] fieldData;
private int numCols;
private BytesRefArrayWritable bytes;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
numCols = context.getConfiguration().getInt( hive.io.rcfile.column.number.conf , 0);
bytes = new BytesRefArrayWritable(numCols);
}
public void map(Object key, Text line, Context context
) throws IOException, InterruptedException {
bytes.clear();
String[] cols = line.toString().split(\t , -1);
System.out.println(SIZE : +cols.length);
for (int i=0; i numCols; i++){
fieldData = cols[i].getBytes(UTF-8
BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
bytes.set(i, cu);
}
context.write(NullWritable.get(), bytes);
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length 2){
System.out.println(Usage: +
hadoop jar RCFileLoader.jar main class +
-tableName tableName -numCols numberOfColumns -input input path +
-output output path -rowGroupSize rowGroupSize -ioBufferSize ioBufferSize
System.out.println(For test
System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 +
-output RCFileLoaderTest/RCFile_test1
System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 +
-output RCFileLoaderTest/RCFile_test2
return 2;
}
String tableName =
int numCols = 0;
String inputPath =
String outputPath =
int rowGroupSize = 16 *1024*1024;
int ioBufferSize = 128*1024;
for (int i=0; i otherArgs.length – 1; i++){
if(-tableName .equals(otherArgs[i])){
tableName = otherArgs[i+1];
}else if (-numCols .equals(otherArgs[i])){
numCols = Integer.parseInt(otherArgs[i+1]);
}else if (-input .equals(otherArgs[i])){
inputPath = otherArgs[i+1];
}else if(-output .equals(otherArgs[i])){
outputPath = otherArgs[i+1];
}else if(-rowGroupSize .equals(otherArgs[i])){
rowGroupSize = Integer.parseInt(otherArgs[i+1]);
}else if(-ioBufferSize .equals(otherArgs[i])){
ioBufferSize = Integer.parseInt(otherArgs[i+1]);
}
}
conf.setInt(hive.io.rcfile.record.buffer.size , rowGroupSize);
conf.setInt(io.file.buffer.size , ioBufferSize);
Job job = Job.getInstance(conf);
job.setJobName(RcFileWriterJob
job.setJarByClass(RcFileWriterJob.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesRefArrayWritable.class);
// job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(inputPath));
job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
RCFileMapReduceOutputFormat.setCompressOutput(job, false);
System.out.println(Loading table + tableName + from + inputPath + to RCFile located at + outputPath);
System.out.println(number of columns: + job.getConfiguration().get(hive.io.rcfile.column.number.conf));
System.out.println(RCFile row group size: + job.getConfiguration().get(hive.io.rcfile.record.buffer.size));
System.out.println(io bufer size: + job.getConfiguration().get(io.file.buffer.size));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RcFileWriterJob(), args);
System.exit(res);
}
}
以上是“Mapreduce RCFile 如何写入和读取 API”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注丸趣 TV 行业资讯频道!