共计 5267 个字符,预计需要花费 14 分钟才能阅读完成。
这篇文章主要介绍“spark shuffle 调优的方法是什么”,在日常操作中,相信很多人在 spark shuffle 调优的方法是什么问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spark shuffle 调优的方法是什么”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
什么情况下会发生 shuffle,然后 shuffle 的原理是什么?
在 spark 中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。
groupByKey,要把分布在集群各个节点上的数据中的同一个 key,对应的 values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个 executor 的一个 task 中。然后呢,集中一个 key 对应的 values 之后,才能交给我们来进行处理,key, Iterable value;reduceByKey,算子函数去对 values 集合进行 reduce 操作,最后变成一个 value;countByKey,需要在一个 task 中,获取到一个 key 对应的所有的 value,然后进行计数,统计总共有多少个 value;join,RDD key, value,RDD key, value,只要是两个 RDD 中,key 相同对应的 2 个 value,都能到一个节点的 executor 的 task 中,给我们进行处理。
问题在于,同一个单词,比如说(hello, 1),可能散落在不同的节点上;对每个单词进行累加计数,就必须让所有单词都跑到同一个节点的一个 task 中,给一个 task 来进行处理;
每一个 shuffle 的前半部分 stage 的 task,每个 task 都会创建下一个 stage 的 task 数量相同的文件,比如下一个 stage 会有 100 个 task,那么当前 stage 每个 task 都会创建 100 份文件;会将同一个 key 对应的 values,一定是写入同一个文件中的;
shuffle 的后半部分 stage 的 task,每个 task 都会从各个节点上的 task 写的属于自己的那一份文件中,拉取 key, value 对;然后 task 会有一个内存缓冲区,然后会用 HashMap,进行 key, values 的汇聚;(key ,values);
task 会用我们自己定义的聚合函数,比如 reduceByKey(_+_),把所有 values 进行一对一的累加,聚合出来最终的值。就完成了 shuffle;
shuffle,一定是分为两个 stage 来完成的。因为这其实是个逆向的过程,不是 stage 决定 shuffle,是 shuffle 决定 stage。
reduceByKey(_+_),在某个 action 触发 job 的时候,DAGScheduler,会负责划分 job 为多个 stage。划分的依据,就是,如果发现有会触发 shuffle 操作的算子,比如 reduceByKey,就将这个操作的前半部分,以及之前所有的 RDD 和 transformation 操作,划分为一个 stage;shuffle 操作的后半部分,以及后面的,直到 action 为止的 RDD 和 transformation 操作,划分为另外一个 stage;
shuffle 前半部分的 task 在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲满溢之后,再 spill 溢写到磁盘文件中。
如果不合并 map 端输出文件的话,会怎么样?
减少网络传输、disk io、减少 reduce 端内存缓冲
实际生产环境的条件:
100 个节点(每个节点一个 executor):100 个 executor,每个 executor:2 个 cpu core,总共 1000 个 task:每个 executor 平均 10 个 task,上游 1000 个 task,下游 1000 个 task,每个节点,10 个 task,每个节点或者说每一个 executor 会输出多少份 map 端文件?10 * 1000= 1 万个文件(M*R)
总共有多少份 map 端输出文件?100 * 10000 = 100 万。
问题来了:默认的这种 shuffle 行为,对性能有什么样的恶劣影响呢?
shuffle 中的写磁盘的操作,基本上就是 shuffle 中性能消耗最为严重的部分。
通过上面的分析,一个普通的生产环境的 spark job 的一个 shuffle 环节,会写入磁盘 100 万个文件。
磁盘 IO 对性能和 spark 作业执行速度的影响,是极其惊人和吓人的。
基本上,spark 作业的性能,都消耗在 shuffle 中了,虽然不只是 shuffle 的 map 端输出文件这一个部分,但是这里也是非常大的一个性能消耗点。
new SparkConf().set( spark.shuffle.consolidateFiles , true)
开启 shuffle map 端输出文件合并的机制;默认情况下,是不开启的,就是会发生如上所述的大量 map 端输出文件的操作,严重影响性能。
开启了 map 端输出文件的合并机制之后:
第一个 stage,同时就运行 cpu core 个 task,比如 cpu core 是 2 个,并行运行 2 个 task;
每个 task 都创建下一个 stage 的 task 数量个文件;
第一个 stage,并行运行的 2 个 task 执行完以后,就会执行另外两个 task;
另外 2 个 task 不会再重新创建输出文件;而是复用之前的 task 创建的 map 端输出文件,将数据写入上一批 task 的输出文件中;
第二个 stage,task 在拉取数据的时候,就不会去拉取上一个 stage 每一个 task 为自己创建的那份输出文件了;
提醒一下(map 端输出文件合并):
只有并行执行的 task 会去创建新的输出文件;
下一批并行执行的 task,就会去复用之前已有的输出文件;
但是有一个例外,比如 2 个 task 并行在执行,但是此时又启动要执行 2 个 task(不是同一批次);
那么这个时候的话,就无法去复用刚才的 2 个 task 创建的输出文件了;
而是还是只能去创建新的输出文件。
要实现输出文件的合并的效果,必须是一批 task 先执行,然后下一批 task 再执行,
才能复用之前的输出文件;负责多批 task 同时起来执行,还是做不到复用的。
开启了 map 端输出文件合并机制之后,生产环境上的例子,会有什么样的变化?
实际生产环境的条件:
100 个节点(每个节点一个 executor):100 个 executor
每个 executor:2 个 cpu core
总共 1000 个 task:每个 executor 平均 10 个 task
上游 1000 个 task,下游 1000 个 task
每个节点,2 个 cpu core,有多少份输出文件呢?2 * 1000 = 2000 个(C*R)
总共 100 个节点,总共创建多少份输出文件呢?100 * 2000 = 20 万个文件
相比较开启合并机制之前的情况,100 万个
map 端输出文件,在生产环境中,立减 5 倍!
合并 map 端输出文件,对咱们的 spark 的性能有哪些方面的影响呢?
map task 写入磁盘文件的 IO,减少:100 万文件 – 20 万文件
第二个 stage,原本要拉取第一个 stage 的 task 数量份文件,1000 个 task,第二个 stage 的每个 task,都要拉取 1000 份文件,走网络传输;合并以后,100 个节点,每个节点 2 个 cpu core,第二个 stage 的每个 task,主要拉取 1000 * 2 = 2000 个文件即可;网络传输的性能消耗是不是也大大减少分享一下,实际在生产环境中,使用了 spark.shuffle.consolidateFiles 机制以后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。
spark 作业,5 个小时 – 2~3 个小时。
大家不要小看这个 map 端输出文件合并机制。实际上,在数据量比较大,你自己本身做了前面的性能调优,
executor 上去 - cpu core 上去 - 并行度(task 数量)上去,shuffle 没调优,shuffle 就很糟糕了;
大量的 map 端输出文件的产生。对性能有比较恶劣的影响。
这个时候,去开启这个机制,可以很有效的提升性能。
spark.shuffle.manager hash M*R 个小文件
spark.shuffle.manager sort C*R 个小文件 (默认的 shuffle 管理机制)
spark.shuffle.file.buffer,默认 32k
spark.shuffle.memoryFraction,0.2
默认情况下,shuffle 的 map task,输出到磁盘文件的时候,统一都会先写入每个 task 自己关联的一个内存缓冲区。这个缓冲区大小,默认是 32kb。每一次,当内存缓冲区满溢之后,才会进行 spill 操作,溢写操作,溢写到磁盘文件中去 reduce 端 task,在拉取到数据之后,会用 hashmap 的数据格式,来对各个 key 对应的 values 进行汇聚。针对每个 key 对应的 values,执行我们自定义的聚合函数的代码,比如_ + _(把所有 values 累加起来)reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的 executor 的内存,executor(jvm 进程,堆),默认 executor 内存中划分给 reduce task 进行聚合的比例,是 0.2。问题来了,因为比例是 0.2,所以,理论上,很有可能会出现,拉取过来的数据很多,那么在内存中,放不下;这个时候,默认的行为,就是说,将在内存放不下的数据,都 spill(溢写)到磁盘文件中去。
原理说完之后,来看一下,默认情况下,不调优,可能会出现什么样的问题?
默认,map 端内存缓冲是每个 task,32kb。
默认,reduce 端聚合内存比例,是 0.2,也就是 20%。
如果 map 端的 task,处理的数据量比较大,但是呢,你的内存缓冲大小是固定的。
可能会出现什么样的情况?
每个 task 就处理 320kb,32kb,总共会向磁盘溢写 320 / 32 = 10 次。
每个 task 处理 32000kb,32kb,总共会向磁盘溢写 32000 / 32 = 1000 次。
在 map task 处理的数据量比较大的情况下,而你的 task 的内存缓冲默认是比较小的,32kb。可能会造成多次的 map 端往磁盘文件的 spill 溢写操作,发生大量的磁盘 IO,从而降低性能。
reduce 端聚合内存,占比。默认是 0.2。如果数据量比较大,reduce task 拉取过来的数据很多,那么就会频繁发生 reduce 端聚合内存不够用,频繁发生 spill 操作,溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。
默认不调优,在数据量比较大的情况下,可能频繁地发生 reduce 端的磁盘文件的读写。
这两个点之所以放在一起讲,是因为他们俩是有关联的。数据量变大,map 端肯定会出点问题;
reduce 端肯定也会出点问题;出的问题是一样的,都是磁盘 IO 频繁,变多,影响性能。
调优:
调节 map task 内存缓冲:spark.shuffle.file.buffer,默认 32k(spark 1.3.x 不是这个参数,
后面还有一个后缀,kb;spark 1.5.x 以后,变了,就是现在这个参数)
调节 reduce 端聚合内存占比:spark.shuffle.memoryFraction,0.2
在实际生产环境中,我们在什么时候来调节两个参数?
看 Spark UI,如果你的公司是决定采用 standalone 模式,那么很简单,你的 spark 跑起来,会显示一个 Spark UI 的地址,4040 的端口,进去看,依次点击进去,可以看到,你的每个 stage 的详情,有哪些 executor,有哪些 task,每个 task 的 shuffle write 和 shuffle read 的量,shuffle 的磁盘和内存,读写的数据量;如果是用的 yarn 模式来提交,课程最前面,从 yarn 的界面进去,点击对应的 application,进入 Spark UI,查看详情。
如果发现 shuffle 磁盘的 write 和 read,很大,可以调节这两个参数
调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高 0.1,看看效果。不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。
调节了以后,效果?map task 内存缓冲变大了,减少 spill 到磁盘文件的次数;reduce 端聚合内存变大了,
减少 spill 到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。
到此,关于“spark shuffle 调优的方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!