如何实现RecordReader按行读取

119次阅读
没有评论

共计 4921 个字符,预计需要花费 13 分钟才能阅读完成。

丸趣 TV 小编给大家分享一下如何实现 RecordReader 按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

public class CustomLineRecordReader 
 extends RecordReader LongWritable, Text  {
 
 private long start;
 private long pos;
 private long end;
 private LineReader in;
 private int maxLineLength;
 private LongWritable key = new LongWritable();
 private Text value = new Text();
 
 private static final Log LOG = LogFactory.getLog( CustomLineRecordReader.class);
 
 /**
 * From Design Pattern, O Reilly...
 * This method takes as arguments the map task’s assigned InputSplit and
 * TaskAttemptContext, and prepares the record reader. For file-based input
 * formats, this is a good place to seek to the byte position in the file to
 * begin reading.
 */
 @Override
 public void initialize(
 InputSplit genericSplit, 
 TaskAttemptContext context)
 throws IOException {
 
 // This InputSplit is a FileInputSplit
 FileSplit split = (FileSplit) genericSplit;
 
 // Retrieve configuration, and Max allowed
 // bytes for a single record
 Configuration job = context.getConfiguration();
 this.maxLineLength = job.getInt(
  mapred.linerecordreader.maxlength ,
 Integer.MAX_VALUE);
 
 // Split  S  is responsible for all records
 // starting from  start  and  end  positions
 start = split.getStart();
 end = start + split.getLength();
 
 // Retrieve file containing Split  S 
 final Path file = split.getPath();
 FileSystem fs = file.getFileSystem(job);
 FSDataInputStream fileIn = fs.open(split.getPath());
 
 // If Split  S  starts at byte 0, first line will be processed
 // If Split  S  does not start at byte 0, first line has been already
 // processed by  S-1  and therefore needs to be silently ignored
 boolean skipFirstLine = false;
 if (start != 0) {
 skipFirstLine = true;
 // Set the file pointer at  start - 1  position.
 // This is to make sure we won t miss any line
 // It could happen if  start  is located on a EOL
 --start;
 fileIn.seek(start);
 }
 
 in = new LineReader(fileIn, job);
 
 // If first line needs to be skipped, read first line
 // and stores its content to a dummy Text
 if (skipFirstLine) { Text dummy = new Text();
 // Reset  start  to  start + line offset 
 start += in.readLine(dummy, 0,
 (int) Math.min( (long) Integer.MAX_VALUE, 
 end - start));
 }
 
 // Position is the actual start
 this.pos = start;
 
 }
 
 /**
 * From Design Pattern, O Reilly...
 * Like the corresponding method of the InputFormat class, this reads a
 * single key/ value pair and returns true until the data is consumed.
 */
 @Override
 public boolean nextKeyValue() throws IOException {
 
 // Current offset is the key
 key.set(pos);
 
 int newSize = 0;
 
 // Make sure we get at least one record that starts in this Split
 while (pos   end) {
 
 // Read first line and store its content to  value 
 newSize = in.readLine(value, maxLineLength,
 Math.max((int) Math.min( Integer.MAX_VALUE, end - pos),
 maxLineLength));
 
 // No byte read, seems that we reached end of Split
 // Break and return false (no key / value)
 if (newSize == 0) {
 break;
 }
 
 // Line is read, new position is set
 pos += newSize;
 
 // Line is lower than Maximum record line size
 // break and return true (found key / value)
 if (newSize   maxLineLength) {
 break;
 }
 
 // Line is too long
 // Try again with position = position + line offset,
 // i.e. ignore line and go to next one
 // TODO: Shouldn t it be LOG.error instead ??
 LOG.info( Skipped line of size   + 
 newSize +   at pos  
 + (pos - newSize));
 }
 
 
 if (newSize == 0) {
 // We ve reached end of Split
 key = null;
 value = null;
 return false;
 } else {
 // Tell Hadoop a new line has been found
 // key / value will be retrieved by
 // getCurrentKey getCurrentValue methods
 return true;
 }
 }
 
 /**
 * From Design Pattern, O Reilly...
 * This methods are used by the framework to give generated key/value pairs
 * to an implementation of Mapper. Be sure to reuse the objects returned by
 * these methods if at all possible!
 */
 @Override
 public LongWritable getCurrentKey() throws IOException,
 InterruptedException {
 return key;
 }
 
 /**
 * From Design Pattern, O Reilly...
 * This methods are used by the framework to give generated key/value pairs
 * to an implementation of Mapper. Be sure to reuse the objects returned by
 * these methods if at all possible!
 */
 @Override
 public Text getCurrentValue() throws IOException, InterruptedException {
 return value;
 }
 
 /**
 * From Design Pattern, O Reilly...
 * Like the corresponding method of the InputFormat class, this is an
 * optional method used by the framework for metrics gathering.
 */
 @Override
 public float getProgress() throws IOException, InterruptedException { if (start == end) {
 return 0.0f;
 } else { return Math.min(1.0f, (pos - start) / (float) (end - start));
 }
 }
 
 /**
 * From Design Pattern, O Reilly...
 * This method is used by the framework for cleanup after there are no more
 * key/value pairs to process.
 */
 @Override
 public void close() throws IOException { if (in != null) { in.close();
 }
 }
 
}

以上是“如何实现 RecordReader 按行读取”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计4921字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)