共计 5039 个字符,预计需要花费 13 分钟才能阅读完成。
这篇文章主要讲解了“spark 性能调优的方法是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“spark 性能调优的方法是什么”吧!
分配哪些资源?executor、cpu per executor、memory per executor、driver memory
在哪里分配这些资源?在我们在生产环境中,提交 spark 作业时,用的 spark-submit shell 脚本,里面调整对应的参数
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置 executor 的数量
--driver-memory 100m \ 配置 driver 的内存(影响很大)--executor-memory 100m \ 配置每个 executor 的内存大小
--executor-cores 3 \ 配置每个 executor 的 cpu core 数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
调节到多大,算是最大呢?
第一种,Spark Standalone,公司集群上,搭建了一套 Spark 集群,你心里应该清楚每台机器还能够给你使用的,大概有多少内存,多少 cpu core;那么,设置的时候,就根据这个实际的情况,去调节每个 spark 作业的资源分配。比如说你的每台机器能够给你使用 4G 内存,2 个 cpu core;20 台机器;executor,20;平均每个 executor:4G 内存,2 个 cpu core。
第二种,Yarn。资源队列。资源调度。应该去查看,你的 spark 作业,要提交到的资源队列,大概有多少资源?500G 内存,100 个 cpu core;executor,50;平均每个 executor:10G 内存,2 个 cpu core。
设置队列名称:spark.yarn.queue default
一个原则,你能使用的资源有多大,就尽量去调节到最大的大小(executor 的数量,几十个到上百个不等;executor 内存;executor cpu core)
为什么调节了资源以后,性能可以提升?
增加 executor:
如果 executor 数量比较少,那么,能够并行执行的 task 数量就比较少,就意味着,我们的 Application 的并行执行的能力就很弱。比如有 3 个 executor,每个 executor 有 2 个 cpu core,那么同时能够并行执行的 task,就是 6 个。6 个执行完以后,再换下一批 6 个 task。增加了 executor 数量以后,那么,就意味着,能够并行执行的 task 数量,也就变多了。比如原先是 6 个,现在可能可以并行执行 10 个,甚至 20 个,100 个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~ 数十倍。
有时候数据量比较少,增加大量的 task 反而性能会降低,为什么?(想想就明白了,你用多了,别人用的就少了。。。。)
增加每个 executor 的 cpu core:
也是增加了执行的并行能力。原本 20 个 executor,每个才 2 个 cpu core。能够并行执行的 task 数量,就是 40 个 task。现在每个 executor 的 cpu core,增加到了 5 个。能够并行执行的 task 数量,就是 100 个 task。执行的速度,提升了 2.5 倍。
SparkContext,DAGScheduler,TaskScheduler,会将我们的算子,切割成大量的 task,
提交到 Application 的 executor 上面去执行。
增加每个 executor 的内存量:
增加了内存量以后,对性能的提升,有三点:
1、如果需要对 RDD 进行 cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘 IO。
2、对于 shuffle 操作,reduce 端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给 executor 分配更多内存以后,就有更少的数据,需要写入磁盘,
甚至不需要写入磁盘。减少了磁盘 IO,提升了性能。
3、对于 task 的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致 JVM 堆内存满了,然后频繁 GC,垃圾回收,minor GC 和 full GC。(速度很慢)。内存加大以后,带来更少的 GC,垃圾回收,避免了速度变慢,速度变快了。
Spark 并行度指的是什么?
Spark 作业,Application,Jobs,action(collect)触发一个 job,1 个 job;每个 job 拆成多个 stage,
发生 shuffle 的时候,会拆分出一个 stage,reduceByKey。
stage0
val lines = sc.textFile(hdfs://)
val words = lines.flatMap(_.split( ))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)
stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()
reduceByKey,stage0 的 task,在最后,执行到 reduceByKey 的时候,会为每个 stage1 的 task,都创建一份文件(也可能是合并在少量的文件里面);每个 stage1 的 task,会去各个节点上的各个 task 创建的属于自己的那一份文件里面,拉取数据;每个 stage1 的 task,拉取到的数据,一定是相同 key 对应的数据。对相同的 key,对应的 values,才能去执行我们自定义的 function 操作(_ + _)
并行度:其实就是指的是,Spark 作业中,各个 stage 的 task 数量,也就代表了 Spark 作业的在各个阶段 (stage) 的并行度。
如果不调节并行度,导致并行度过低,会怎么样?
task 没有设置,或者设置的很少,比如就设置了,100 个 task。50 个 executor,每个 executor 有 3 个 cpu core,也就是说,你的 Application 任何一个 stage 运行的时候,都有总数在 150 个 cpu core,可以并行运行。但是你现在,只有 100 个 task,平均分配一下,每个 executor 分配到 2 个 task,ok,那么同时在运行的 task,只有 100 个,每个 executor 只会并行运行 2 个 task。每个 executor 剩下的一个 cpu core,就浪费掉了。
你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有 150 个 cpu core,可以并行运行 150 个 task。那么就应该将你的 Application 的并行度,至少设置成 150,才能完全有效的利用你的集群资源,让 150 个 task,并行执行;而且 task 增加到 150 个以后,即可以同时并行运行,还可以让每个 task 要处理的数据量变少;比如总共 150G 的数据要处理,如果是 100 个 task,每个 task 计算 1.5G 的数据;现在增加到 150 个 task,可以并行运行,而且每个 task 主要处理 1G 的数据就可以。
很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个 task 要处理的数据量,最终,就是提升你的整个 Spark 作业的性能和运行速度。
task 数量,至少设置成与 Spark application 的总 cpu core 数量相同(最理想情况,比如总共 150 个 cpu core,分配了 150 个 task,一起运行,差不多同一时间运行完毕)
官方是推荐,task 数量,设置成 spark application 总 cpu core 数量的 2~3 倍,比如 150 个 cpu core,基本要设置 task 数量为 300~500;实际情况,与理想情况不同的,有些 task 会运行的快一点,比如 50s 就完了,有些 task,可能会慢一点,要 1 分半才运行完,所以如果你的 task 数量,刚好设置的跟 cpu core 数量相同,可能还是会导致资源的浪费,因为,比如 150 个 task,10 个先运行完了,剩余 140 个还在运行,但是这个时候,有 10 个 cpu core 就空闲出来了,就导致了浪费。那如果 task 数量设置成 cpu core 总数的 2~3 倍,那么一个 task 运行完了以后,另一个 task 马上可以补上来,就尽量让 cpu core 不要空闲,同时也是尽量提升 spark 作业运行的效率和速度,提升性能。
如何设置一个 Spark Application 的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf().set( spark.default.parallelism , 500)
默认情况下,多次对一个 RDD 执行算子,去获取不同的 RDD;都会对这个 RDD 以及之前的父 RDD,全部重新计算一次;读取 HDFS- RDD1- RDD2-RDD4 这种情况,是绝对绝对,一定要避免的,一旦出现一个 RDD 重复计算的情况,就会导致性能急剧降低。比如,HDFS- RDD1-RDD2 的时间是 15 分钟,那么此时就要走两遍,变成 30 分钟
RDD 架构重构与优化尽量去复用 RDD,差不多的 RDD,可以抽取称为一个共同的 RDD,供后面的 RDD 计算时,反复使用。
公共 RDD 一定要实现持久化。就好比北方吃饺子,现包现煮。你人来了,要点一盘饺子。馅料 + 饺子皮 + 水 - 包好的饺子,对包好的饺子去煮,煮开了以后,才有你需要的熟的,热腾腾的饺子。现实生活中,饺子现包现煮,当然是最好的了。但是 Spark 中,RDD 要去“现包现煮”,那就是一场致命的灾难。对于要多次计算和使用的公共 RDD,一定要进行持久化。持久化,也就是说,将 RDD 的数据缓存到内存中 / 磁盘中,(BlockManager),以后无论对这个 RDD 做多少次计算,那么都是直接取这个 RDD 的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。
持久化,是可以进行序列化的如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致 OOM 内存溢出。当纯内存无法支撑公共 RDD 数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将 RDD 的每个 partition 的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。如果序列化纯内存方式,还是导致 OOM,内存溢出;就只能考虑磁盘的方式,内存 + 磁盘的普通方式(无序列化)。内存 + 磁盘,序列化。
为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足.
持久化,很简单,就是对 RDD 调用 persist()方法,并传入一个持久化级别
如果是 persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用 cache()方法来替代
StorageLevel.MEMORY_ONLY_SER(),第二选择
StorageLevel.MEMORY_AND_DISK(),第三选择
StorageLevel.MEMORY_AND_DISK_SER(),第四选择
StorageLevel.DISK_ONLY(),第五选择
如果内存充足,要使用双副本高可靠机制, 选择后缀带_2 的策略
StorageLevel.MEMORY_ONLY_2()
感谢各位的阅读,以上就是“spark 性能调优的方法是什么”的内容了,经过本文的学习后,相信大家对 spark 性能调优的方法是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!