如何进行Apache Spark源码分析Job的提交与运行

61次阅读
没有评论

共计 2959 个字符,预计需要花费 8 分钟才能阅读完成。

如何进行 Apache Spark 源码分析 Job 的提交与运行,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

下面以 wordCount 为例,详细说明 spark 创建和运行 job 的过程,重点是在进程及线程的创建。

实验环境搭建

在进行后续操作前,确保下列条件已满足。

1. 下载 spark binary 0.9.1

2. 安装 scala

3. 安装 sbt

4. 安装 java

启动 spark-shell 单机模式运行,即 local 模式

local 模式运行非常简单,只要运行以下命令即可,假设当前目录是 $SPARK_HOME

MASTER=local bin/spark-shell

MASTER=local 就是表明当前运行在单机模式

local cluster 方式运行

localcluster 模式是一种伪 cluster 模式,在单机环境下模拟 standalone 的集群,启动顺序分别如下

1. 启动 master

2. 启动 worker

3. 启动 spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意运行时的输出,日志默认保存在 $SPARK_HOME/logs 目录。

master 主要是运行类  org.apache.spark.deploy.master.Master,在 8080 端口启动监听,日志如下图所示

修改配置

1. 进入 $SPARK_HOME/conf 目录

2. 将 spark-env.sh.template 重命名为 spark-env.sh

3. 修改 spark-env.sh,添加如下内容

export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost 运行 workerbin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M

worker 启动完成,连接到 master。打开 maser 的 webui 可以看到连接上来的 worker. Master WEb UI 的监听地址是 http://localhost:8080

启动 spark-shellMASTER=spark://localhost:7077 bin/spark-shell

如果一切顺利,将看到下面的提示信息。

Created spark context..Spark context available as sc.

可以用浏览器打开 localhost:4040 来查看如下内容

1. stages

2. storage

3. environment

4. executors

wordcount

上述环境准备妥当之后,我们在 sparkshell 中运行一下最简单的例子,在 spark-shell 中输入如下代码

scala sc.textFile(README.md).filter(_.contains( Spark)).count

上述代码统计在 README.md 中含有 Spark 的行数有多少

部署过程详解

Spark 布置环境中组件构成如下图所示。

 

Driver Program  简要来说在 spark-shell 中输入的 wordcount 语句对应于上图的 Driver Program.

Cluster Manager  就是对应于上面提到的 master,主要起到 deploy management 的作用

Worker Node  与 Master 相比,这是 slave node。上面运行各个 executor,executor 可以对应于线程。executor 处理两种基本的业务逻辑,一种就是 driver   programme, 另一种就是 job 在提交之后拆分成各个 stage,每个 stage 可以运行一到多个 task

Notes:  在集群 (cluster) 方式下, Cluster Manager 运行在一个 jvm 进程之中,而 worker 运行在另一个 jvm 进程中。在 local cluster 中,这些 jvm 进程都在同一台机器中,如果是真正的 standalone 或 Mesos 及 Yarn 集群,worker 与 master 或分布于不同的主机之上。

JOB 的生成和运行

job 生成的简单流程如下

1. 首先应用程序创建 SparkContext 的实例,如实例为 sc

2. 利用 SparkContext 的实例来创建生成 RDD

3. 经过一连串的 transformation 操作,原始的 RDD 转换成为其它类型的 RDD

4. 当 action 作用于转换之后 RDD 时,会调用 SparkContext 的 runJob 方法

5. sc.runJob 的调用是后面一连串反应的起点,关键性的跃变就发生在此处

调用路径大致如下

1. sc.runJob- dagScheduler.runJob- submitJob

2. DAGScheduler::submitJob 会创建 JobSummitted 的 event 发送给内嵌类 eventProcessActor

3. eventProcessActor 在接收到 JobSubmmitted 之后调用 processEvent 处理函数

4. job 到 stage 的转换,生成 finalStage 并提交运行,关键是调用 submitStage

5. 在 submitStage 中会计算 stage 之间的依赖关系,依赖关系分为宽依赖和窄依赖两种

6. 如果计算中发现当前的 stage 没有任何依赖或者所有的依赖都已经准备完毕,则提交 task

7. 提交 task 是调用函数 submitMissingTasks 来完成

8. task 真正运行在哪个 worker 上面是由 TaskScheduler 来管理,也就是上面的 submitMissingTasks 会调用 TaskScheduler::submitTasks

9. TaskSchedulerImpl 中会根据 Spark 的当前运行模式来创建相应的 backend, 如果是在单机运行则创建 LocalBackend

10. LocalBackend 收到 TaskSchedulerImpl 传递进来的 ReceiveOffers 事件

11. receiveOffers- executor.launchTask- TaskRunner.run

代码片段 executor.lauchTask

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {   val tr = new TaskRunner(context, taskId, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr) }

说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在 TaskRunner 这么一个 executor 之内。

运算结果是包装成为 MapStatus 然后通过一系列的内部消息传递,反馈到 DAGScheduler,这一个消息传递路径不是过于复杂。

看完上述内容,你们掌握如何进行 Apache Spark 源码分析 Job 的提交与运行 的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计2959字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)