如何理解Receiver启动以及启动源码分析

74次阅读
没有评论

共计 4670 个字符,预计需要花费 12 分钟才能阅读完成。

今天就跟大家聊聊有关如何理解 Receiver 启动以及启动源码分析,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

为什么要 Receiver?

Receiver 不断持续接收外部数据源的数据,并把数据汇报给 Driver 端,这样我们每隔 BatchDuration 会把汇报数据生成不同的 Job,来执行 RDD 的操作。

Receiver 是随着应用程序的启动而启动的。

Receiver 和 InputDStream 是一一对应的。

RDD[Receiver] 只有一个 Partition,一个 Receiver 实例。

Spark Core 并不知道 RDD[Receiver] 的特殊性,依然按照普通 RDD 对应的 Job 进行调度,就有可能在同样一个 Executor 上启动多个 Receiver,会导致负载不均衡,会导致 Receiver 启动失败。

Receiver 在 Executor 启动的方案:

1,启动不同 Receiver 采用 RDD 中不同 Partiton 的方式,不同的 Partiton 代表不同的 Receiver,在执行层面就是不同的 Task,在每个 Task 启动时就启动 Receiver。

这种方式实现简单巧妙,但是存在弊端启动可能失败,运行过程中 Receiver 失败,会导致 TaskRetry,如果 3 次失败就会导致 Job 失败,会导致整个 Spark 应用程序失败。因为 Receiver 的故障,导致 Job 失败,不能容错。

2. 第二种方式就是 Spark Streaming 采用的方式。

在 ReceiverTacker 的 start 方法中,先实例化 Rpc 消息通信体 ReceiverTrackerEndpoint,再调用

launchReceivers 方法。

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
 if (isTrackerStarted) {
 throw new SparkException(ReceiverTracker already started)
 }

 if (!receiverInputStreams.isEmpty) {
 endpoint = ssc.env.rpcEnv.setupEndpoint(
  ReceiverTracker , new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
 if (!skipReceiverLaunch) launchReceivers()
 logInfo(ReceiverTracker started)
 trackerState = Started
 }
}

在 launchReceivers 方法中,先对每一个 ReceiverInputStream 获取到对应的一个 Receiver,然后发送 StartAllReceivers 消息。Receiver 对应一个数据来源。

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
 val receivers = receiverInputStreams.map(nis = {
 val rcvr = nis.getReceiver()
 rcvr.setReceiverId(nis.id)
 rcvr
 })

 runDummySparkJob()

 logInfo(Starting + receivers.length + receivers)
 endpoint.send(StartAllReceivers(receivers))
}

ReceiverTrackerEndpoint 接收到 StartAllReceivers 消息后,先找到 Receiver 运行在哪些 Executor 上,然后调用 startReceiver 方法。

override def receive: PartialFunction[Any, Unit] = {
 // Local messages
 case StartAllReceivers(receivers) =
 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
 for (receiver – receivers) {
 val executors = scheduledLocations(receiver.streamId)
 updateReceiverScheduledExecutors(receiver.streamId, executors)
 receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
 startReceiver(receiver, executors)
 }

startReceiver 方法在 Driver 层面自己指定了 TaskLocation,而不用 Spark Core 来帮我们选择 TaskLocation。其有以下特点: 终止 Receiver 不需要重启 Spark Job;第一次启动 Receiver,不会执行第二次;为了启动 Receiver 而启动了一个 Spark 作业,一个 Spark 作业启动一个 Receiver。每个 Receiver 启动触发一个 Spark 作业,而不是每个 Receiver 是在一个 Spark 作业的一个 Task 来启动。当提交启动 Receiver 的作业失败时发送 RestartReceiver 消息,来重启 Receiver。

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
 receiver: Receiver[_],
 scheduledLocations: Seq[TaskLocation]): Unit = {
 def shouldStartReceiver: Boolean = {
 // It s okay to start when trackerState is Initialized or Started
 !(isTrackerStopping || isTrackerStopped)
 }

 val receiverId = receiver.streamId
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 return
 }

 val checkpointDirOption = Option(ssc.checkpointDir)
 val serializableHadoopConf =
 new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

 // Function to start the receiver on the worker node
 val startReceiverFunc: Iterator[Receiver[_]] = Unit =
 (iterator: Iterator[Receiver[_]]) = {
 if (!iterator.hasNext) {
 throw new SparkException(
  Could not start receiver as object not found. )
 }
 if (TaskContext.get().attemptNumber() == 0) {
 val receiver = iterator.next()
 assert(iterator.hasNext == false)
 val supervisor = new ReceiverSupervisorImpl(
 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
 supervisor.start()
 supervisor.awaitTermination()
 } else {
 // It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
 }
 }

 // Create the RDD using the scheduledLocations to run the receiver in a Spark job
 val receiverRDD: RDD[Receiver[_]] =
 if (scheduledLocations.isEmpty) {
 ssc.sc.makeRDD(Seq(receiver), 1)
 } else {
 val preferredLocations = scheduledLocations.map(_.toString).distinct
 ssc.sc.makeRDD(Seq(receiver – preferredLocations))
 }
 receiverRDD.setName(s Receiver $receiverId)
 ssc.sparkContext.setJobDescription(s Streaming job running receiver $receiverId)
 ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
 receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, ())
 // We will keep restarting the receiver job until ReceiverTracker is stopped
 future.onComplete {
 case Success(_) =
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 } else {
 logInfo(s Restarting Receiver $receiverId)
 self.send(RestartReceiver(receiver))
 }
 case Failure(e) =
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 } else {
 logError(Receiver has been stopped. Try to restart it. , e)
 logInfo(s Restarting Receiver $receiverId)
 self.send(RestartReceiver(receiver))
 }
 }(submitJobThreadPool)
 logInfo(s Receiver ${receiver.streamId} started )
}

看完上述内容,你们对如何理解 Receiver 启动以及启动源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计4670字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)