提高MapReduce性能的方法有哪些

85次阅读
没有评论

共计 7451 个字符,预计需要花费 19 分钟才能阅读完成。

本篇内容介绍了“提高 MapReduce 性能的方法有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

  Cloudera 提供给客户的服务内容之一就是调整和优化 MapReduce job 执行性能。MapReduce 和 HDFS 组成一个复杂的分布式系统,并且它们运行着各式各样用户的代码,这样导致没有一个快速有效的规则来实现优化代码性能的目的。在我看来,调整 cluster 或 job 的运行更像一个医生对待病人一样,找出关键的“症状”,对于不同的症状有不同的诊断和处理方式。

  在医学领域,没有什么可以代替一位经验丰富的医生;在复杂的分布式系统上,这个道理依然正确—有经验的用户和操作者在面对很多常见问题上都会有“第六感”。我曾经为 Cloudera 不同行业的客户解决过问题,他们面对的工作量、数据集和 cluster 硬件有很大区别,因此我在这方面积累了很多的经验,并且想把这些经验分享给诸位。

  在这篇 blog 里,我会高亮那些提高 MapReduce 性能的建议。前面的一些建议是面向整个 cluster 的,这可能会对 cluster 操作者和开发者有帮助。后面一部分建议是为那些用 Java 编写 MapReduce job 的开发者而提出。在每一个建议中,我列出一些“症状”或是“诊断测试”来说明一些针对这些问题的改进措施,可能会对你有所帮助。

  请注意,这些建议中包含很多我以往从各种不同场景下总结出来的直观经验。它们可能不太适用于你所面对的特殊的工作量、数据集或 cluster,如果你想使用它,就需要测试使用前和使用后它在你的 cluster 环境中的表现。对于这些建议,我会展示一些对比性的数据,数据产生的环境是一个 4 个节点的 cluster 来运行 40GB 的 Wordcount job。应用了我以下所提到的这些建议后,这个 job 中的每个 map task 大概运行 33 秒,job 总共执行了差不多 8 分 30 秒。

第一点   正确地配置你的 Cluster
诊断结果 / 症状:
1. Linux top 命令的结果显示 slave 节点在所有 map 和 reduce slot 都有 task 运行时依然很空闲。
2. top 命令显示内核的进程,如 RAID(mdX_raid*) 或 pdflush 占去大量的 CPU 时间。
3. Linux 的平均负载通常是系统 CPU 数量的 2 倍。
4. 即使系统正在运行 job,Linux 平均负载总是保持在系统 CPU 数量的一半的状态。
5. 一些节点上的 swap 利用率超过几 MB

  优化你的 MapReduce 性能的第一步是确保你整个 cluster 的配置文件被调整过。对于新手,请参考这里关于配置参数的一篇 blog:配置参数。除了这些配置参数,在你想修改 job 参数以期提高性能时,你应该参照下我这里的一些你应该注意的项:

1.  确保你正在 DFS 和 MapReduce 中使用的存储 mount 被设置了 noatime 选项。这项如果设置就不会启动对磁盘访问时间的记录,会显著提高 IO 的性能。

2. 避免在 TaskTracker 和 DataNode 的机器上执行 RAID 和 LVM 操作,这通常会降低性能

3. 在这两个参数 mapred.local.dir 和 dfs.data.dir 配置的值应当是分布在各个磁盘上目录,这样可以充分利用节点的 IO 读写能力。运行 Linux sysstat 包下的 iostat -dx 5 命令可以让每个磁盘都显示它的利用率。

4. 你应该有一个聪明的监控系统来监控磁盘设备的健康状态。MapReduce job 的设计是可容忍磁盘失败,但磁盘的异常会导致一些 task 重复执行而使性能下降。如果你发现在某个 TaskTracker 被很多 job 中列入黑名单,那么它就可能有问题。

5. 使用像 Ganglia 这样的工具监控并绘出 swap 和网络的利用率图。如果你从监控的图看出机器正在使用 swap 内存,那么减少 mapred.child.java.opts 属性所表示的内存分配。

基准测试:
  很遗憾我不能为这个建议去生成一些测试数据,因为这需要构建整个 cluster。如果你有相关的经验,请把你的建议及结果附到下面的留言区。

第二点   使用 LZO 压缩
诊断结果 / 症状:
1. 对 job 的中间结果数据使用压缩是很好的想法。
2. MapReduce job 的输出数据大小是不可忽略的。
3. 在 job 运行时,通过 linux top 和 iostat 命令可以看出 slave 节点的 iowait 利用率很高。

  几乎每个 Hadoop job 都可以通过对 map task 输出的中间数据做 LZO 压缩获得较好的空间效益。尽管 LZO 压缩会增加一些 CPU 的负载,但在 shuffle 过程中会减少磁盘 IO 的数据量,总体上总是可以节省时间的。

  当一个 job 需要输出大量数据时,应用 LZO 压缩可以提高输出端的输出性能。这是因为默认情况下每个文件的输出都会保存 3 个幅本,1GB 的输出文件你将要保存 3GB 的磁盘数据,当采用压缩后当然更能节省空间并提高性能。

  为了使 LZO 压缩有效,请设置参数 mapred.compress.map.output 值为 true。

基准测试:
  在我的 cluster 里,Wordcount 例子中不使用 LZO 压缩的话,job 的运行时间只是稍微增加。但 FILE_BYTES_WRITTEN 计数器却从 3.5GB 增长到 9.2GB,这表示压缩会减少 62% 的磁盘 IO。在我的 cluster 里,每个数据节点上磁盘数量对 task 数量的比例很高,但 Wordcount job 并没有在整个 cluster 中共享,所以 cluster 中 IO 不是瓶颈,磁盘 IO 增长不会有什么大的问题。但对于磁盘因很多并发活动而受限的环境来说,磁盘 IO 减少 60% 可以大幅提高 job 的执行速度。

第三点   调整 map 和 reduce task 的数量到合适的值

自己的经验,一般来说,不可能跑一个 job,改变整个集群的 hdfs block 的大小。通常提交 job 时候设置参数。

  job.setMapOutputValueClass(IntWritable.class);
       job.setNumReduceTasks(1);
       // 设置最小分片为 512M
       FileInputFormat.setMinInputSplitSize(job, 1024*1024*512);
       FileInputFormat.addInputPath(job, new Path( /usr/keyword/input));

诊断结果 / 症状:
1. 每个 map 或 reduce task 的完成时间少于 30 到 40 秒。
2. 大型的 job 不能完全利用 cluster 中所有空闲的 slot。
3. 大多数 map 或 reduce task 被调度执行了,但有一到两个 task 还在准备状态,在其它 task 完成之后才单独执行

  调整 job 中 map 和 reduce task 的数量是一件很重要且常常被忽略的事情。下面是我在设置这些参数时的一些直观经验:

1. 如果每个 task 的执行时间少于 30 到 40 秒,就减少 task 的数量。Task 的创建与调度一般耗费几秒的时间,如果 task 完成的很快,我们就是在浪费时间。同时,设置 JVM 重用也可以解决这个问题。

2. 如果一个 job 的输入数据大于 1TB,我们就增加 block size 到 256 或者 512,这样可以减少 task 的数量。你可以使用这个命令去修改已存在文件的 block size: hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata  /path/to/inputdata-with/largeblocks。在执行完这个命令后,你就可以删除原始的输入文件了 (/path/to/inputdata)。

3. 只要每个 task 运行至少 30 到 40 秒,那么就增加 map task 的数量,增加到整个 cluster 上 map slot 总数的几倍。如果你的 cluster 中有 100 个 map slot,那就避免运行一个有 101 个 map task 的 job — 如果运行的话,前 100 个 map 同时执行,第 101 个 task 会在 reduce 执行之前单独运行。这个建议对于小型 cluste 和小型 job 是很重要的。

4. 不要调度太多的 reduce task — 对于大多数 job 来说,我们推荐 reduce task 的数量应当等于或是略小于 cluster 中 reduce slot 的数量。

基准测试:
  为了让 Wordcount job 有很多的 task 运行,我设置了如下的参数:Dmapred.max.split.size=$[16*1024*1024]。以前默认会产生 360 个 map task,现在就会有 2640 个。当完成这个设置之后,每个 task 执行耗费 9 秒,并且在 JobTracker 的 Cluster Summar 视图中可以观看到,正在运行的 map task 数量在 0 到 24 之间浮动。job 在 17 分 52 秒之后结束,比原来的执行要慢两倍多。

第四点   为 job 添加一个 Combiner
诊断结果 / 症状:
1. job 在执行分类的聚合时,REDUCE_INPUT_GROUPS 计数器远小于 REDUCE_INPUT_RECORDS 计数器。
2. job 执行一个大的 shuffle 任务 (例如,map 的输出数据每个节点就是好几个 GB)。
3. 从 job 计数器中看出,SPILLED_RECORDS 远大于 MAP_OUTPUT_RECORDS。

  如果你的算法涉及到一些分类的聚合,那么你就可以使用 Combiner 来完成数据到达 reduce 端之前的初始聚合工作。MapReduce 框架很明智地运用 Combiner 来减少写入磁盘以及通过网络传输到 reduce 端的数据量。

基准测试:
  我删去 Wordcount 例子中对 setCombinerClass 方法的调用。仅这个修改就让 map task 的平均运行时间由 33 秒增长到 48 秒,shuffle 的数据量也从 1GB 提高到 1.4GB。整个 job 的运行时间由原来的 8 分 30 秒变成 15 分 42 秒,差不多慢了两倍。这次测试过程中开启了 map 输出结果的压缩功能,如果没有开启这个压缩功能的话,那么 Combiner 的影响就会变得更加明显。

第五点   为你的数据使用最合适和简洁的 Writable 类型
诊断 / 症状:
1. Text 对象在非文本或混合数据中使用。
2. 大部分的输出值很小的时候使用 IntWritable 或 LongWritable 对象。

  当一个开发者是初次编写 MapReduce,或是从开发 Hadoop Streaming 转到 Java MapReduce,他们会经常在不必要的时候使用 Text 对象。尽管 Text 对象使用起来很方便,但它在由数值转换到文本或是由 UTF8 字符串转换到文本时都是低效的,且会消耗大量的 CPU 时间。当处理那些非文本的数据时,可以使用二进制的 Writable 类型,如 IntWritable,FloatWritable 等。

  除了避免文件转换的消耗外,二进制 Writable 类型作为中间结果时会占用更少的空间。当磁盘 IO 和网络传输成为大型 job 所遇到的瓶颈时,减少些中间结果的大小可以获得更好的性能。在处理整形数值时,有时使用 VIntWritable 或 VLongWritable 类型可能会更快些—这些实现了变长整形编码的类型在序列化小数值时会更节省空间。例如,整数 4 会被序列化成单字节,而整数 10000 会被序列化成两个字节。这些变长类型用在统计等任务时更加有效,在这些任务中我们只要确保大部分的记录都是一个很小的值,这样值就可以匹配一或两个字节。

  如果 Hadoop 自带的 Writable 类型不能满足你的需求,你可以开发自己的 Writable 类型。这应该是挺简单的,可能会在处理文本方面更快些。如果你编写了自己的 Writable 类型,请务必提供一个 RawComparator 类—你可以以内置的 Writable 类型做为例子。

基准测试:
  对于 Wordcount 例子,我修改了它在 map 计数时的中间变量,由 IntWritable 改为 Text。并且在 reduce 统计最终和时使用 Integer.parseString(value.toString) 来转换出真正的数值。这个版本比原始版本要慢近 10%—整个 job 完成差不多超过 9 分钟,且每个 map task 要运行 36 秒,比之前的 33 秒要慢。尽量看起来整形转换还是挺快的,但这不说明什么情况。在正常情况下,我曾经看到过选用合适的 Writable 类型可以有 2 到 3 倍的性能提升的例子。

第六点   重用 Writable 类型
诊断 / 症状:
1. 在 mapred.child.java.opts 参数上增加 -verbose:gc -XX:+PriintGCDetails,然后查看一些 task 的日志。如果垃圾回收频繁工作且消耗一些时间,你需要注意那些无用的对象。
2. 在你的代码中搜索 new Text 或 new IntWritable。如果它们出现在一个内部循环或是 map/reduce 方法的内部时,这条建议可能会很有用。
3. 这条建议在 task 内存受限的情况下特别有用。

  很多 MapReduce 用户常犯的一个错误是,在一个 map/reduce 方法中为每个输出都创建 Writable 对象。例如,你的 Wordcout mapper 方法可能这样写:

Java 代码

 

public void map(…) { 

 … 

 for (String word : words) { 

 output.collect(new Text(word), new IntWritable(1)); 

 } 

  这样会导致程序分配出成千上万个短周期的对象。Java 垃圾收集器就要为此做很多的工作。更有效的写法是:

Java 代码

 

class MyMapper … { 

 Text wordText = new Text(); 

 IntWritable one = new IntWritable(1); 

 public void map(…) { 

 for (String word: words) { 

 wordText.set(word); 

 output.collect(wordText, one); 

 } 

 } 

基准测试:
  当我以上面的描述修改了 Wordcount 例子后,起初我发现 job 运行时与修改之前没有任何不同。这是因为在我的 cluster 中默认为每个 task 都分配一个 1GB 的堆大小,所以垃圾回收机制没有启动。当我重新设置参数,为每个 task 只分配 200MB 的堆时,没有重用 Writable 对象的这个版本执行出现了很严重的减缓 —job 的执行时间由以前的大概 8 分 30 秒变成现在的超过 17 分钟。原始的那个重用 Writable 的版本,在设置更小的堆时还是保持相同的执行速度。因此重用 Writable 是一个很简单的问题修正,我推荐大家总是这样做。它可能不会在每个 job 的执行中获得很好的性能,但当你的 task 有内存限制时就会有相当大的区别。

第七点   使用简易的剖析方式查看 task 的运行
  这是我在查看 MapReduce job 性能问题时常用的一个小技巧。那些不希望这些做的人就会反对说这样是行不通的,但是事实是摆在面前。

  为了实现简易的剖析,可以当 job 中一些 task 运行很慢时,用 ssh 工具连接上 task 所在的那台 task tracker 机器。执行 5 到 10 次这个简单的命令 sudo killall -QUIT java(每次执行间隔几秒)。别担心,不要被命令的名字吓着,它不会导致任何东西退出。然后使用 JobTracker 的界面跳转到那台机器上某个 task 的 stdout 文件上,或者查看正在运行的机器上 /var/log/hadoop/userlogs/ 目录中那个 task 的 stdout 文件。你就可以看到当你执行那段命令时,命令发送到 JVM 的 SIGQUIT 信号而产生的栈追踪信息的 dump 文件。([译] 在 JobTracker 的界面上有 Cluster Summary 的表格,进入 Nodes 链接,选中你执行上面命令的 server,在界面的最下方有 Local Logs, 点击 LOG 进入,然后选择 userlogs 目录,这里可以看到以 server 执行过的 jobID 命名的几个目录,不管进入哪个目录都可以看到很多 task 的列表,每个 task 的 log 中有个 stdout 文件,如果这个文件不为空,那么这个文件就是作者所说的栈信息文件 )

  解析处理这个输出文件需要一点点以经验,这里我介绍下平时是怎样处理的:
对于栈信息中的每个线程,很快地查找你的 java 包的名字 (假如是 com.mycompany.mrjobs)。如果你当前线程的栈信息中没有找到任何与你的代码有关的信息,那么跳到另外的线程再看。

  如果你在某些栈信息中看到你查找的代码,很快地查阅并大概记下它在做什么事。假如你看到一些与 NumberFormat 相关的信息,那么此时你需要记下它,暂时不需要关注它是代码的哪些行。

  转到日志中的下一个 dump,然后也花一些时间做类似的事情然后记下些你关注的内容。

  在查阅了 4 到 5 个栈信息后,你可能会意识到在每次查阅时都会有一些似曾相识的东西。如果这些你意识到的问题是阻碍你的程序变快的原因,那么你可能就找到了程序真正的问题。假如你取到 10 个线程的栈信息,然后从 5 个里面看到过 NumberFormat 类似的信息,那么可能意味着你将 50% 的 CPU 浪费在数据格式转换的事情上了。

  当然,这没有你使用真正的分析程序那么科学。但我发现这是一种有效的方法,可以在不需要引入其它工具的时候发现那些明显的 CPU 瓶颈。更重要的是,这是一种让你会变的更强的技术,你会在实践中知道一个正常的和有问题的 dump 是啥样子。

  通过这项技术我发现了一些通常出现在性能调优方面的误解,列出在下面。
1. NumberFormat 相当慢,尽量避免使用它。
2. String.split—不管是编码或是解码 UTF8 的字符串都是慢的超出你的想像— 参照上面提到的建议,使用合适的 Writable 类型。
3. 使用 StringBuffer.append 来连接字符串

“提高 MapReduce 性能的方法有哪些”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7451字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)