共计 4670 个字符,预计需要花费 12 分钟才能阅读完成。
今天就跟大家聊聊有关如何理解 Receiver 启动以及启动源码分析,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
为什么要 Receiver?
Receiver 不断持续接收外部数据源的数据,并把数据汇报给 Driver 端,这样我们每隔 BatchDuration 会把汇报数据生成不同的 Job,来执行 RDD 的操作。
Receiver 是随着应用程序的启动而启动的。
Receiver 和 InputDStream 是一一对应的。
RDD[Receiver] 只有一个 Partition,一个 Receiver 实例。
Spark Core 并不知道 RDD[Receiver] 的特殊性,依然按照普通 RDD 对应的 Job 进行调度,就有可能在同样一个 Executor 上启动多个 Receiver,会导致负载不均衡,会导致 Receiver 启动失败。
Receiver 在 Executor 启动的方案:
1,启动不同 Receiver 采用 RDD 中不同 Partiton 的方式,不同的 Partiton 代表不同的 Receiver,在执行层面就是不同的 Task,在每个 Task 启动时就启动 Receiver。
这种方式实现简单巧妙,但是存在弊端启动可能失败,运行过程中 Receiver 失败,会导致 TaskRetry,如果 3 次失败就会导致 Job 失败,会导致整个 Spark 应用程序失败。因为 Receiver 的故障,导致 Job 失败,不能容错。
2. 第二种方式就是 Spark Streaming 采用的方式。
在 ReceiverTacker 的 start 方法中,先实例化 Rpc 消息通信体 ReceiverTrackerEndpoint,再调用
launchReceivers 方法。
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException(ReceiverTracker already started)
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
ReceiverTracker , new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo(ReceiverTracker started)
trackerState = Started
}
}
在 launchReceivers 方法中,先对每一个 ReceiverInputStream 获取到对应的一个 Receiver,然后发送 StartAllReceivers 消息。Receiver 对应一个数据来源。
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis = {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo(Starting + receivers.length + receivers)
endpoint.send(StartAllReceivers(receivers))
}
ReceiverTrackerEndpoint 接收到 StartAllReceivers 消息后,先找到 Receiver 运行在哪些 Executor 上,然后调用 startReceiver 方法。
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver – receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
startReceiver 方法在 Driver 层面自己指定了 TaskLocation,而不用 Spark Core 来帮我们选择 TaskLocation。其有以下特点: 终止 Receiver 不需要重启 Spark Job;第一次启动 Receiver,不会执行第二次;为了启动 Receiver 而启动了一个 Spark 作业,一个 Spark 作业启动一个 Receiver。每个 Receiver 启动触发一个 Spark 作业,而不是每个 Receiver 是在一个 Spark 作业的一个 Task 来启动。当提交启动 Receiver 的作业失败时发送 RestartReceiver 消息,来重启 Receiver。
/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It s okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] = Unit =
(iterator: Iterator[Receiver[_]]) = {
if (!iterator.hasNext) {
throw new SparkException(
Could not start receiver as object not found. )
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver – preferredLocations))
}
receiverRDD.setName(s Receiver $receiverId)
ssc.sparkContext.setJobDescription(s Streaming job running receiver $receiverId)
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s Restarting Receiver $receiverId)
self.send(RestartReceiver(receiver))
}
case Failure(e) =
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError(Receiver has been stopped. Try to restart it. , e)
logInfo(s Restarting Receiver $receiverId)
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s Receiver ${receiver.streamId} started )
}
看完上述内容,你们对如何理解 Receiver 启动以及启动源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。