Spark Driver启动流程是怎样的

82次阅读
没有评论

共计 10183 个字符,预计需要花费 26 分钟才能阅读完成。

本篇内容介绍了“Spark Driver 启动流程是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

SparkContext.scala

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  _schedulerBackend = sched
  _taskScheduler = ts
  _dagScheduler = new DAGScheduler(this)
  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler s
  // constructor
  _taskScheduler.start()

我们再看一下 SparkContext.createTaskScheduler 当中究竟做了些什么

  case SPARK_REGEX(sparkUrl) =
  val scheduler = new TaskSchedulerImpl(sc)
  val masterUrls = sparkUrl.split(,).map(spark:// + _)
  val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
  scheduler.initialize(backend)
  (backend, scheduler)

我们看到 _taskScheduler 是 TaskSchedulerImpl 的实例,_schedulerBackend 是 StandaloneSchedulerBackend 的实例,而会把 _schedulerBackend 通过 scheduler.initialize 给到 _taskScheduler。

然后再来看一下 _taskScheduler.start() 究竟干了些什么

  override def start() {
  backend.start()

  if (!isLocal conf.getBoolean( spark.speculation , false)) {
  logInfo(Starting speculative execution thread)
  speculationScheduler.scheduleAtFixedRate(new Runnable {
  override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
  checkSpeculatableTasks()
  }
  }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
  }

我们看到首先是对 backend.start() 的调用,我们可以在 StandaloneSchedulerBackend 当中找到 start 的实现:

override def start() {
  super.start()
  launcherBackend.connect()

  // The endpoint for executors to talk to us
  val driverUrl = RpcEndpointAddress(
  sc.conf.get(spark.driver.host),
  sc.conf.get(spark.driver.port).toInt,
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
  val args = Seq(
  –driver-url , driverUrl,
  –executor-id , {{EXECUTOR_ID}} ,
  –hostname , {{HOSTNAME}} ,
  –cores , {{CORES}} ,
  –app-id , {{APP_ID}} ,
  –worker-url , {{WORKER_URL}} )
  val extraJavaOpts = sc.conf.getOption(spark.executor.extraJavaOptions)
  .map(Utils.splitCommandString).getOrElse(Seq.empty)
  val classPathEntries = sc.conf.getOption(spark.executor.extraClassPath)
  .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  val libraryPathEntries = sc.conf.getOption(spark.executor.extraLibraryPath)
  .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

  // When testing, expose the parent class path to the child. This is processed by
  // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
  // when the assembly is built with the *-provided profiles enabled.
  val testingClassPath =
  if (sys.props.contains( spark.testing)) {
  sys.props(java.class.path).split(java.io.File.pathSeparator).toSeq
  } else {
  Nil
  }

  // Start executors with a few necessary configs for registering with the scheduler
  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts
  val command = Command(org.apache.spark.executor.CoarseGrainedExecutorBackend ,
  args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse()
  val coresPerExecutor = conf.getOption(spark.executor.cores).map(_.toInt)
  // If we re using dynamic allocation, set our initial executor limit to 0 for now.
  // ExecutorAllocationManager will send the real initial limit to the Master later.
  val initialExecutorLimit =
  if (Utils.isDynamicAllocationEnabled(conf)) {
  Some(0)
  } else {
  None
  }
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
  appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
  client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  waitForRegistration()
  launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

我们看一下 client.start() 当中究竟了做了些什么:

  def start() {
  // Just launch an rpcEndpoint; it will call back into the listener.
  endpoint.set(rpcEnv.setupEndpoint( AppClient , new ClientEndpoint(rpcEnv)))
  }

endpoint 是一个 AtomicReference,rpcEnv.setupEndpoint 做了 2 件事,一个是注册一个 endpoint,另外把它的 ref 返回回来。这里哪里体现 start 了?我知道一定会进到 ClientEndpoint 的 start 方法当中去,可是究竟是怎么进去的????

下面这段代码我们在 Rpc 机制的文章当中提到过,红色代码部分,当时并没有太在意,现在看来,每个 End Point 注册到 Rpc Env 当中的时候,都会自动触发它的 start 事件。

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  val addr = RpcEndpointAddress(nettyEnv.address, name)
  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
  synchronized {
  if (stopped) {
  throw new IllegalStateException(RpcEnv has been stopped)
  }
  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
  throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)
  }
  val data = endpoints.get(name)
  endpointRefs.put(data.endpoint, data.ref)
  receivers.offer(data)  // for the OnStart message
  }
  endpointRef
  }

之后,让我们找到 ClientEndpoint(StandaloneAppClient 的一个内部类),看它的 onStart 方法:

  override def onStart(): Unit = {
  try {
  registerWithMaster(1)
  } catch {
  case e: Exception =
  logWarning(Failed to connect to master , e)
  markDisconnected()
  stop()
  }
  }

registerWithMaster 当中也存在递归调用,不过这个递归,是为了 retry 服务的,所以我们直接看 tryRegisterAllMasters()。

  private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
  override def run(): Unit = {
  if (registered.get) {
  registerMasterFutures.get.foreach(_.cancel(true))
  registerMasterThreadPool.shutdownNow()
  } else if (nthRetry = REGISTRATION_RETRIES) {
  markDead(All masters are unresponsive! Giving up.)
  } else {
  registerMasterFutures.get.foreach(_.cancel(true))
  registerWithMaster(nthRetry + 1)
  }
  }
  }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
  }

  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress – masterRpcAddresses) yield {
  registerMasterThreadPool.submit(new Runnable {
  override def run(): Unit = try {
  if (registered.get) {
  return
  }
  logInfo(Connecting to master + masterAddress.toSparkURL + …)
  val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
  masterRef.send(RegisterApplication(appDescription, self))
  } catch {
  case ie: InterruptedException = // Cancelled
  case NonFatal(e) = logWarning(s Failed to connect to master $masterAddress , e)
  }
  })
  }
  }

红色 2 行代码,注册 End Point,并发送消息。这里的 master end point,应该是一个位于 spark 集群,master 节点上的 end point,相对于 driver 上的 Rpc Env 来讲,应该是一个 remote 的 end point。

我们找到 master.scala,先看它的类声明:

private[deploy] class Master(
  override val rpcEnv: RpcEnv,
  address: RpcAddress,
  webUiPort: Int,
  val securityMgr: SecurityManager,
  val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

再找到它的 receive 方法:

override def receive: PartialFunction[Any, Unit]

只需要看其中一段:

  case RegisterApplication(description, driver) =
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
  // ignore, don t send response
  } else {
  logInfo(Registering app + description.name)
  val app = createApplication(description, driver)
  registerApplication(app)
  logInfo(Registered app + description.name + with ID + app.id)
  persistenceEngine.addApplication(app)
  driver.send(RegisteredApplication(app.id, self))
  schedule()
  }

1、创建 app,2、注册 app,3、持久化 app,4、向 driver 的 endpoint 发送消息,5、schedule()

step4,其中 driver 是跟着 Rpc Message 一起过来的,需要给 driver 发一个注册 app 的响应。

我们再回到 ClientEndpoint.receive,

  override def receive: PartialFunction[Any, Unit] = {
  case RegisteredApplication(appId_, masterRef) =
  // FIXME How to handle the following cases?
  // 1. A master receives multiple registrations and sends back multiple
  // RegisteredApplications due to an unstable network.
  // 2. Receive multiple RegisteredApplication from different masters because the master is
  // changing.
  appId.set(appId_)
  registered.set(true)
  master = Some(masterRef)
  listener.connected(appId.get)

step5,我们看看当中做了些什么事情:

  private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) {
  return
  }
  // Drivers take strict precedence over executors
  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  val numWorkersAlive = shuffledAliveWorkers.size
  var curPos = 0
  for (driver – waitingDrivers.toList) {// iterate over a copy of waitingDrivers
  // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
  // start from the last worker that was assigned a driver, and continue onwards until we have
  // explored all alive workers.
  var launched = false
  var numWorkersVisited = 0
  while (numWorkersVisited numWorkersAlive !launched) {
  val worker = shuffledAliveWorkers(curPos)
  numWorkersVisited += 1
  if (worker.memoryFree = driver.desc.mem worker.coresFree = driver.desc.cores) {
  launchDriver(worker, driver)
  waitingDrivers -= driver
  launched = true
  }
  curPos = (curPos + 1) % numWorkersAlive
  }
  }
  startExecutorsOnWorkers()
  }

launchDriver(worker, driver) 我们理解为,在 worder 上为当前的 driver 启动一个线程。

再看一下 startExecutorsOnWorkers():

  private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app – waitingApps if app.coresLeft 0) {
  val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
  // Filter out workers that don t have enough resources to launch an executor
  val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
  .filter(worker = worker.memoryFree = app.desc.memoryPerExecutorMB
  worker.coresFree = coresPerExecutor.getOrElse(1))
  .sortBy(_.coresFree).reverse
  val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

  // Now that we ve decided how many cores to allocate on each worker, let s allocate them
  for (pos – 0 until usableWorkers.length if assignedCores(pos) 0) {
  allocateWorkerResourceToExecutors(
  app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
  }
  }
  }

  private def allocateWorkerResourceToExecutors(
  app: ApplicationInfo,
  assignedCores: Int,
  coresPerExecutor: Option[Int],
  worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map {assignedCores / _}.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i – 1 to numExecutors) {
  val exec = app.addExecutor(worker, coresToAssign)
  launchExecutor(worker, exec)
  app.state = ApplicationState.RUNNING
  }
  }

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo(Launching executor + exec.fullId + on worker + worker.id)
  worker.addExecutor(exec)
  worker.endpoint.send(LaunchExecutor(masterUrl,
  exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
  ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

1、在一个本地的 worker 变量当中添加一个 exec

2、通知 worker,启动一个 executor

3、通知 driver,executor added

“Spark Driver 启动流程是怎样的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计10183字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)