Mapreduce RCFile如何写入和读取API

61次阅读
没有评论

共计 7916 个字符,预计需要花费 20 分钟才能阅读完成。

这篇文章主要介绍 Mapreduce RCFile 如何写入和读取 API,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

RCFile 是 FaceBook 开发的高压缩比、高效读的行列存储结构。通常在 Hive 中可以直接对一张 Text 表使用 insert-select 转换,但有时希望使用 Mapreduce 进行 RCFile 的读写。

        dependency
            groupId org.apache.hadoop /groupId
            artifactId hadoop-client /artifactId
            version 2.5.0-cdh6.2.1 /version
        /dependency

        dependency
            groupId org.apache.hive /groupId
            artifactId hive-serde /artifactId
            version 0.13.1-cdh6.2.1 /version
        /dependency

        dependency
            groupId org.apache.hive.hcatalog /groupId
            artifactId hive-hcatalog-core /artifactId
            version 0.13.1-cdh6.2.1 /version
        /dependency

读取文本文件,使用 mapreduce 生成 RCFile 格式文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;

import java.io.IOException;

public class RcFileReaderJob {
    static class RcFileMapper extends Mapper Object, BytesRefArrayWritable, Text, NullWritable {
        @Override
        protected void map(Object key, BytesRefArrayWritable value,
                           Context context)
                throws IOException, InterruptedException {
            Text txt = new Text();
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i value.size(); i++) {
                BytesRefWritable v = value.get(i);
                txt.set(v.getData(), v.getStart(), v.getLength());
                if (i == value.size() – 1) {
                    sb.append(txt.toString());
                } else {
                    sb.append(txt.toString() + \t
                }
            }
            context.write(new Text(sb.toString()), NullWritable.get());
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);

        }
    }

    static class RcFileReduce extends Reducer Text, NullWritable, Text, NullWritable {
        @Override
        protected void reduce(Text key, Iterable NullWritable values,
                              Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    public static boolean runLoadMapReducue(Configuration conf, Path input, Path output) throws IOException,
            ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(conf);
        job.setJarByClass(RcFileReaderJob.class);
        job.setJobName(RcFileReaderJob
        job.setNumReduceTasks(1);
        job.setMapperClass(RcFileMapper.class);
        job.setReducerClass(RcFileReduce.class);
        job.setInputFormatClass(RCFileMapReduceInputFormat.class);
//        MultipleInputs.addInputPath(job, input, RCFileInputFormat.class);
        RCFileMapReduceInputFormat.addInputPath(job, input);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, output);
        return job.waitForCompletion(true);
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println(Usage: rcfile in out
            System.exit(2);
        }
        RcFileReaderJob.runLoadMapReducue(conf, new Path(args[0]), new Path(args[1]));
    }
}  

读取 RCFile 格式文件,使用 mapreduce 生成 Text 格式文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceOutputFormat;

import java.io.IOException;

public class RcFileWriterJob extends Configured implements Tool{
   public static class Map extends Mapper Object, Text, NullWritable, BytesRefArrayWritable {
     private byte[] fieldData;
     private int numCols;
     private BytesRefArrayWritable bytes;
     
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
       numCols = context.getConfiguration().getInt( hive.io.rcfile.column.number.conf , 0);
       bytes = new BytesRefArrayWritable(numCols);
     }
     
     public void map(Object key, Text line, Context context
                ) throws IOException, InterruptedException {
       bytes.clear();
       String[] cols = line.toString().split(\t , -1);
       System.out.println(SIZE : +cols.length);
       for (int i=0; i numCols; i++){
             fieldData = cols[i].getBytes(UTF-8
             BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
               bytes.set(i, cu);
           }
       context.write(NullWritable.get(), bytes);
     }
   }
   
   public int run(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if(otherArgs.length 2){
         System.out.println(Usage: +
              hadoop jar RCFileLoader.jar main class +
              -tableName tableName -numCols numberOfColumns -input input path +
              -output output path -rowGroupSize rowGroupSize -ioBufferSize ioBufferSize
         System.out.println(For test
         System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
              -tableName test1 -numCols 10 -input RCFileLoaderTest/test1 +
              -output RCFileLoaderTest/RCFile_test1
         System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
              -tableName test2 -numCols 5 -input RCFileLoaderTest/test2 +
              -output RCFileLoaderTest/RCFile_test2
         return 2;
       }

     String tableName =
     int numCols = 0;
     String inputPath =
     String outputPath =
     int rowGroupSize = 16 *1024*1024;
     int ioBufferSize = 128*1024;
       for (int i=0; i otherArgs.length – 1; i++){
         if(-tableName .equals(otherArgs[i])){
           tableName = otherArgs[i+1];
         }else if (-numCols .equals(otherArgs[i])){
           numCols = Integer.parseInt(otherArgs[i+1]);
         }else if (-input .equals(otherArgs[i])){
           inputPath = otherArgs[i+1];
         }else if(-output .equals(otherArgs[i])){
           outputPath = otherArgs[i+1];
         }else if(-rowGroupSize .equals(otherArgs[i])){
           rowGroupSize = Integer.parseInt(otherArgs[i+1]);
         }else if(-ioBufferSize .equals(otherArgs[i])){
           ioBufferSize = Integer.parseInt(otherArgs[i+1]);
         }
         
       }
       
       conf.setInt(hive.io.rcfile.record.buffer.size , rowGroupSize);
       conf.setInt(io.file.buffer.size , ioBufferSize);

     Job job = Job.getInstance(conf);
     job.setJobName(RcFileWriterJob
       job.setJarByClass(RcFileWriterJob.class);
       job.setMapperClass(Map.class);
       job.setMapOutputKeyClass(NullWritable.class);
       job.setMapOutputValueClass(BytesRefArrayWritable.class);
//       job.setNumReduceTasks(0);
       
       FileInputFormat.addInputPath(job, new Path(inputPath));
       
       job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
       RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
       RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
       RCFileMapReduceOutputFormat.setCompressOutput(job, false);

       System.out.println(Loading table + tableName + from + inputPath + to RCFile located at + outputPath);
       System.out.println(number of columns: + job.getConfiguration().get(hive.io.rcfile.column.number.conf));
       System.out.println(RCFile row group size: + job.getConfiguration().get(hive.io.rcfile.record.buffer.size));
       System.out.println(io bufer size: + job.getConfiguration().get(io.file.buffer.size));
       
       return (job.waitForCompletion(true) ? 0 : 1);
   }
   
   public static void main(String[] args) throws Exception {
       int res = ToolRunner.run(new Configuration(), new RcFileWriterJob(), args);
       System.exit(res);
   }

}

以上是“Mapreduce RCFile 如何写入和读取 API”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7916字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)