ReceiverTracker 可以以 Driver 中具体的算法计算出在具体的 executor 上启动 Receiver。启动 Receiver 的方法是封装在一个 task 中运行,这个 task 是 job 中唯一的 task。实质上讲,ReceiverTracker 启动 Receiver 时封装成一个又一个的 job。启动 Receiver 的方法中有一个 ReceiverSupervisorImpl,ReceiverSupervisorImpl 的 start 方法会导致 Receiver 早 work 节点上真正的执行。转过来通过 BlockGenerator 把接收到的数据放入 block 中,并通过 ReceiverSupervisorImpl 把 block 进行存储,然后把数据的元数据汇报给 ReceiverTracker。

下面就讲 ReceiverTracker 在接收到数据之后具体怎么处理。

ReceiverSupervisorImpl 把 block 进行存储是通过 receivedBlockHandler 来写的。

private val receivedBlockHandler: ReceivedBlockHandler = {
 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
 } else {
 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)

一种是通过 WAL 的方式,一种是通过 BlockManager 的方式。

/** Store block and report it to driver */
def pushAndReportBlock(
 receivedBlock: ReceivedBlock,
 metadataOption: Option[Any],
 blockIdOption: Option[StreamBlockId]
 ) {
 val blockId = blockIdOption.getOrElse(nextBlockId)
 val time = System.currentTimeMillis
 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
 logDebug(s Pushed block $blockId in ${(System.currentTimeMillis – time)} ms )
 val numRecords = blockStoreResult.numRecords
 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
 logDebug(s Reported block $blockId)

把数据存储起来切向 ReceiverTracker 汇报。汇报的时候是元数据。

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
 streamId: Int,
 numRecords: Option[Long],
 metadataOption: Option[Any],
 blockStoreResult: ReceivedBlockStoreResult

Sealed 关键字的意思就是所有的子类都在当前的文件中

ReceiverTracker 管理 Receiver 的启动、回收、接收汇报的元数据。ReceiverTracker 在实例化之前必须所有的 input stream 都已经被 added 和 streamingcontext.start()。因为 ReceiverTracker 要为每个 input stream 启动一个 Receiver。

ReceiverTracker 中有所有的输入数据来源和 ID。

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = { }

ReceiverTracker 的状态

/** Enumeration to identify current state of the ReceiverTracker */
object TrackerState extends Enumeration {
 type TrackerState = Value
 val Initialized, Started, Stopping, Stopped = Value

下面看一下 ReceiverTracker 在接收到 ReceiverSupervisorImpl 发送的 AddBlock 的消息后的处理。

case AddBlock(receivedBlockInfo) =
 if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
 walBatchingThreadPool.execute(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 if (active) {
 } else {
 throw new IllegalStateException(ReceiverTracker RpcEndpoint shut down.)
 } else {

先判断一下是不是 WAL 得方式,如果是就用线程池中的一个线程来回复 addBlock,因为 WAL 非常消耗性能。否则就直接回复 addBlock。

  让后交给 receiverBlockTracker 进行处理

/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

ReceiverBlockTracker 是在 Driver 端管理 blockInfo 的。

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 try {
 val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
 if (writeResult) {
 synchronized {
 getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 logDebug(s Stream ${receivedBlockInfo.streamId} received  +
 s block ${receivedBlockInfo.blockStoreResult.blockId} )
 } else {
 logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving  +
 s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )
 } catch {
 case NonFatal(e) =
 logError(s Error adding block $receivedBlockInfo , e)

writeToLog 的代码很简单,首先判断是不是 WAL 得方式,如果是就把 blockInfo 写入到日志中,用于以后恢复数据。否则的话就直接返回 true。然后就把 block 的信息放入 streamIdToUnallocatedBlockQueues 中。

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

这个数据结构很精妙,key 是 streamid,value 是一个队列,把每一个 stream 接收的 block 信息分开存储。这样 ReceiverBlockTracker 就有了所有 stream 接收到的 block 信息。

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
 if (isWriteAheadLogEnabled) {
 logTrace(s Writing record: $record)
 try {
 } catch {
 case NonFatal(e) =
 logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)
 } else {

详细看一下 ReceiverBlockTracker 的注释。这个 class 会追踪所有接收到的 blocks,并把他们按 batch 分配,如果有需要这个 class 接收的所有 action 都可以写 WAL 中,如果指定了 checkpoint 的目录,当 Driver 崩溃了,ReceiverBlockTracker 的状态(包括接收的 blocks 和分配的 blocks)都可以恢复。如果实例化这个 class 的时候指定了 checkpoint,就会从中读取之前保存的信息。

 * Class that keep track of all the received blocks, and allocate them to batches
 * when required. All actions taken by this class can be saved to a write ahead log
 * (if a checkpoint directory has been provided), so that the state of the tracker
 * (received blocks and block-to-batch allocations) can be recovered after driver failure.
 * Note that when any instance of this class is created with a checkpoint directory,
 * it will try reading events from logs in the directory.
private[streaming] class ReceivedBlockTracker(

下面看一下 ReceiverTracker 接收到 CleanupOldBlocks 后的处理。

case c: CleanupOldBlocks =

ReceiverTracker 接收到这条消息后会给它管理的每一个 Receiver 发送这个消息。ReceiverSupervisorImpl 接收到消息后使用 receivedBlockHandler 清理数据。

private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
 logDebug(s Cleaning up blocks older then $cleanupThreshTime)

ReceiverTracker 还可以随时调整某一个 streamID 接收数据的速度,向对应的 ReceiverSupervisorImpl 发送 UpdateRateLimit 的消息。

case UpdateReceiverRateLimit(streamUID, newRate) =
 for (info – receiverTrackingInfos.get(streamUID); eP – info.endpoint) {

ReceiverSupervisorImpl 接收到消息后。

case UpdateRateLimit(eps) =
 logInfo(s Received a new rate limit: $eps.)
 registeredBlockGenerators.foreach {bg =

 * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
 * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
 * @param newRate A new rate in events per second. It has no effect if it s 0 or negative.
private[receiver] def updateRate(newRate: Long): Unit =
 if (newRate  0) {
 if (maxRateLimit   0) {
 } else {

ReceiverTracker 是一个门面设计模式,看似调用的是 ReceiverTracker 的功能,其实调用的是别的类的功能。

