hadoop map

82次阅读
没有评论

共计 2577 个字符,预计需要花费 7 分钟才能阅读完成。

本篇内容主要讲解“hadoop map-reduce 中的文件并发操作介绍”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“hadoop map-reduce 中的文件并发操作介绍”吧!

这样的操作在 map 端或者 reduce 端均可。下面以一个实际业务场景中的例子来简要说明。

问题简要描述:

假如 reduce 输入的 key 是 Text(String),value 是 BytesWritable(byte[]), 不同 key 的种类为 100 万个,value 的大小平均为 30k 左右,每个 key 大概对应 100 个 value, 要求对每一个 key 建立两个文件,一个用来不断添加 value 中的二进制数据,一个用来记录各个 value 在文件中的位置索引。(大量的小文件会影响 HDFS 的性能,所以最好对这些小文件进行拼接)

当文件数量较小时,可以考虑使用 MultipleOutput 来进行 key-value 的分流,可以按照 key 的不同,将其输出到不同的文件或者目录中。但是 reduce 的数量只能为 1, 不然每个 reduce 都会生成相同的目录或者文件,不能达到最终的目的。此外最重要的是,操作系统对每个进程打开的文件数量的限制,默认为 1024,集群的各个 datanode 可能会配置更高的值,但最多在几万左右,仍然是一个限制因素。不能满足百万文件的需求。

reduce 的主要目的是用来归并 key-value 并输出到 HDFS 上,我们当然也可以在 reduce 中进行其他的操作,比如文件读写。因为默认的 partitioner 保证同一个 key 的数据肯定会在同一个 reduce 中,所以在每个 reduce 中只用打开两个文件进行读写即可 (一个索引文件,一个数据文件)。并发度由 reduce 数量决定,将 reduce 数量设为 256,那我们就可以同时处理 256 个 key 的数据 (partioner 保证了不同 reduce 处理的 key 不同,不会引起文件读写冲突)。这样的并发度的效率是很客观的,可以在较短的时间内完成需求。

思路是这样,但同时由于 hdfs 的特性以及 hadoop 的任务调度,在文件读写过程中,仍有可能会出现很多问题,下面简要说些一些常见的会碰到的问题。

1.org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException 异常

这可能是最经常碰到的一个问题。可能的原因如下:

(1) 文件流冲突。

一般创建文件时都会打开一个供写入的文件流。而我们希望是追加,所以如果使用了错误的 API,就有可能引起上述问题。以 FileSystem 类为例,如果使用 create() 方法之后再调用 append() 方法,就会抛出上述异常。所以最好使用 createNewFile 方法,只创建文件,不打开流。

(2)mapreduce 推测执行机制

mapreduce 为了提高效率,会在一个任务启动之后,同时启动一些相同的任务 (attempt),其中有一个 attempt 成功完成之后,视为整个 task 完成,其结果 作为最终结果,并且杀掉那些较慢的 attempt。集群一般会开启此选项以优化性能 (以空间换时间)。但在本问题环境下推测执行却不太合适。因为我们一般希望一个 task 用来处理一个文件,但如果启动推测执行,会有几个 attempt 同时试图操作同一个文件,就会引发异常。所以最好关掉此选项,将 mapred.reduce.max.attempts 设为 1, 或者将 mapred.reduce.tasks.speculative.execution 设为 false.

但此时仍有可能会出现问题。因为如果一个 task 的唯一 attempt 出现问题,在被 kill 掉之后,task 仍会另起一个 attempt,此时因为前一个 attempt 异常终止,仍有可能会影响到新起的 attempt 的文件操作,引发异常。所以最安全的方法是,借鉴推测执行的机制 (每个 attempt 各自生成自己的结果,最终选择一个作为最终结果),以每个 attempt 的 id 号为后缀附加到所操作的文件上,同时捕获所有文件操作的异常并处理,这样可避免文件的读写冲突。Context 可以用来获取运行时的一些上下文信息,可以很容易得到 attempt 的 id 号。注意,此时如果开启推测执行也可以,但是会生成很多相同的文件 (每个 attempt 一份), 仍然不是最好的解决方法。

同时,我们可以利用 reduce 的输出来记录运行“不正常的”key. 这些 task 大多数是 attempt_0 被杀掉而重启了一个 attempt_1,所以下面的文件一般为两份。可以对这些情况的 key 输出 (文件异常或者 attemptID 0),并进行一些后续处理,比如文件重命名,或者紧对这些 key 重新写入。因为此种情况的 key 一般只占极少数,所以并不影响总体的效率。

2. 文件异常处理

最好能将 mapreduce 中的所有文件操作都设置好异常处理。不然一个文件异常就有可能会使整个 job 失败。所以从效率来讲,最好是在文件发生异常时将其 key 作为 reduce 的输出以进行记录。因为同时 mapreduce 会重启一个 task attempts 重新进行文件读写,可保证我们得到最终的数据,最后所需的只是对那些异常的 key 进行一些简单的文件重命名操作即可。

3. 多目录以及文件拼接

如果我们将 key 的种类设为 1000 万,上述方法会生成太多的小文件从而影响 hdfs 的性能,另外,因为所有文件都在同一个目录下,会导致同一个目录下文件数目过多而影响访问效率。

在创建文件的同时建立多个子目录,一个有用的方法是以 reduce 的 taskid 来建立子目录。这样有多少个 reduce 就可以建立多少个子目录,不会有文件冲突。同一个 reduce 处理的 key 都会在同一个目录下。

文件拼接要考虑的一个索引的问题。为了将文件索引建立的尽量简单,应该尽量保证同一个 key 的所有数据都在同一个大文件中。这可以利用 key 的 hashCode 来实现。如果我们想在每个目录下建立 1000 个文件,只需将 hashCode 对 1000 取余即可。

到此,相信大家对“hadoop map-reduce 中的文件并发操作介绍”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计2577字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)