共计 10183 个字符,预计需要花费 26 分钟才能阅读完成。
本篇内容介绍了“Spark Driver 启动流程是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
SparkContext.scala
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler s
// constructor
_taskScheduler.start()
我们再看一下 SparkContext.createTaskScheduler 当中究竟做了些什么
case SPARK_REGEX(sparkUrl) =
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(,).map(spark:// + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
我们看到 _taskScheduler 是 TaskSchedulerImpl 的实例,_schedulerBackend 是 StandaloneSchedulerBackend 的实例,而会把 _schedulerBackend 通过 scheduler.initialize 给到 _taskScheduler。
然后再来看一下 _taskScheduler.start() 究竟干了些什么
override def start() {
backend.start()
if (!isLocal conf.getBoolean( spark.speculation , false)) {
logInfo(Starting speculative execution thread)
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
我们看到首先是对 backend.start() 的调用,我们可以在 StandaloneSchedulerBackend 当中找到 start 的实现:
override def start() {
super.start()
launcherBackend.connect()
// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
sc.conf.get(spark.driver.host),
sc.conf.get(spark.driver.port).toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
–driver-url , driverUrl,
–executor-id , {{EXECUTOR_ID}} ,
–hostname , {{HOSTNAME}} ,
–cores , {{CORES}} ,
–app-id , {{APP_ID}} ,
–worker-url , {{WORKER_URL}} )
val extraJavaOpts = sc.conf.getOption(spark.executor.extraJavaOptions)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption(spark.executor.extraClassPath)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption(spark.executor.extraLibraryPath)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the *-provided profiles enabled.
val testingClassPath =
if (sys.props.contains( spark.testing)) {
sys.props(java.class.path).split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command(org.apache.spark.executor.CoarseGrainedExecutorBackend ,
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse()
val coresPerExecutor = conf.getOption(spark.executor.cores).map(_.toInt)
// If we re using dynamic allocation, set our initial executor limit to 0 for now.
// ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
我们看一下 client.start() 当中究竟了做了些什么:
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint( AppClient , new ClientEndpoint(rpcEnv)))
}
endpoint 是一个 AtomicReference,rpcEnv.setupEndpoint 做了 2 件事,一个是注册一个 endpoint,另外把它的 ref 返回回来。这里哪里体现 start 了?我知道一定会进到 ClientEndpoint 的 start 方法当中去,可是究竟是怎么进去的????
下面这段代码我们在 Rpc 机制的文章当中提到过,红色代码部分,当时并没有太在意,现在看来,每个 End Point 注册到 Rpc Env 当中的时候,都会自动触发它的 start 事件。
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException(RpcEnv has been stopped)
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
}
之后,让我们找到 ClientEndpoint(StandaloneAppClient 的一个内部类),看它的 onStart 方法:
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =
logWarning(Failed to connect to master , e)
markDisconnected()
stop()
}
}
registerWithMaster 当中也存在递归调用,不过这个递归,是为了 retry 服务的,所以我们直接看 tryRegisterAllMasters()。
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry = REGISTRATION_RETRIES) {
markDead(All masters are unresponsive! Giving up.)
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress – masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo(Connecting to master + masterAddress.toSparkURL + …)
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException = // Cancelled
case NonFatal(e) = logWarning(s Failed to connect to master $masterAddress , e)
}
})
}
}
红色 2 行代码,注册 End Point,并发送消息。这里的 master end point,应该是一个位于 spark 集群,master 节点上的 end point,相对于 driver 上的 Rpc Env 来讲,应该是一个 remote 的 end point。
我们找到 master.scala,先看它的类声明:
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
再找到它的 receive 方法:
override def receive: PartialFunction[Any, Unit]
只需要看其中一段:
case RegisterApplication(description, driver) =
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don t send response
} else {
logInfo(Registering app + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo(Registered app + description.name + with ID + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
1、创建 app,2、注册 app,3、持久化 app,4、向 driver 的 endpoint 发送消息,5、schedule()
step4,其中 driver 是跟着 Rpc Message 一起过来的,需要给 driver 发一个注册 app 的响应。
我们再回到 ClientEndpoint.receive,
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(appId_, masterRef) =
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
step5,我们看看当中做了些什么事情:
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver – waitingDrivers.toList) {// iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited numWorkersAlive !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree = driver.desc.mem worker.coresFree = driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
launchDriver(worker, driver) 我们理解为,在 worder 上为当前的 driver 启动一个线程。
再看一下 startExecutorsOnWorkers():
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app – waitingApps if app.coresLeft 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don t have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker = worker.memoryFree = app.desc.memoryPerExecutorMB
worker.coresFree = coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we ve decided how many cores to allocate on each worker, let s allocate them
for (pos – 0 until usableWorkers.length if assignedCores(pos) 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map {assignedCores / _}.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i – 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo(Launching executor + exec.fullId + on worker + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
1、在一个本地的 worker 变量当中添加一个 exec
2、通知 worker,启动一个 executor
3、通知 driver,executor added
“Spark Driver 启动流程是怎样的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!