共计 8921 个字符,预计需要花费 23 分钟才能阅读完成。
这篇文章主要介绍“ReceiverSupervisorImpl 实例化怎么实现”,在日常操作中,相信很多人在 ReceiverSupervisorImpl 实例化怎么实现问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReceiverSupervisorImpl 实例化怎么实现”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
先回顾下 在 Executor 执行的具体的方法
实例化 ReceiverSupervisorImpl
start 之后等待 awaitTermination
// ReceiverTracker.scala line 564
val startReceiverFunc: Iterator[Receiver[_]] = Unit =
(iterator: Iterator[Receiver[_]]) = { if (!iterator.hasNext) {
throw new SparkException( Could not start receiver as object not found.)
}
if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
看下 ReceiverSupervisorImpl 的父类 ReceiverSupervisor 的构造。
成员变量赋值、将当前 supervisor 与 receiver 关联 ( receiver.attachSupervisor(this) )
注释也很清晰:在 Worker 上负责监督 Receiver。提供所需所有 处理从 receiver 接收到的数据 的接口
// ReceiverSupervisor.scala line 31
* Abstract class that is responsible for supervising a Receiver in the worker.
* It provides all the necessary interfaces for handling the data received by the receiver.
*/
private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_],
conf: SparkConf
) extends Logging {
/** Enumeration to identify current state of the Receiver */
object ReceiverState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}
import ReceiverState._
// Attach the supervisor to the receiver
receiver.attachSupervisor(this) // 将 receiver 与 supervisor 关联
private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool( receiver-supervisor-future , 128))
/** Receiver id */
protected val streamId = receiver.streamId
/** Has the receiver been marked for stop. */
private val stopLatch = new CountDownLatch(1)
/** Time between a receiver is stopped and started again */
private val defaultRestartDelay = conf.getInt(spark.streaming.receiverRestartDelay , 2000)
/** The current maximum rate limit for this receiver. */
private[streaming] def getCurrentRateLimit: Long = Long.MaxValue
/** Exception associated with the stopping of the receiver */
@volatile protected var stoppingError: Throwable = null
/** State of the receiver */
@volatile private[streaming] var receiverState = Initialized
// 一些方法,其实就是 数据处理接口
}
ReceiverSupervisorImpl 的实例化
实例化了 BlockManagerBasedBlockHandler,用于将数据发送到 BlockManager
实例化 RpcEndpoint
实例化 BlockGenerator
实例化 BlockGeneratorListener 监听器
// ReceiverSupervisorImpl.scala line 43
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
* which provides all the necessary functionality for handling the data received by
* the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
* object that is used to divide the received data stream into blocks of data.
*/
private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_],
env: SparkEnv,
hadoopConf: Configuration,
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
private val host = SparkEnv.get.blockManager.blockManagerId.host
private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { // 默认是不开启
if (checkpointDirOption.isEmpty) {
throw new SparkException(
Cannot enable receiver write-ahead log without checkpoint directory set. +
Please use streamingContext.checkpoint() to set the checkpoint directory. +
See documentation for more details. )
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
/** Remote RpcEndpointRef for the ReceiverTracker */
private val trackerEndpoint = RpcUtils.makeDriverRef(ReceiverTracker , env.conf, env.rpcEnv)
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint( Receiver- + streamId + - + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =
logInfo(Received stop signal)
ReceiverSupervisorImpl.this.stop(Stopped by driver , None)
case CleanupOldBlocks(threshTime) =
logDebug(Received delete old batch signal)
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =
logInfo(s Received a new rate limit: $eps.)
registeredBlockGenerators.foreach { bg =
bg.updateRate(eps)
}
}
})
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式
with mutable.SynchronizedBuffer[BlockGenerator]
/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { }
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
def onError(message: String, throwable: Throwable) { reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
// ... 一些方法
/** Store an ArrayBuffer of received data as a data block into Spark s memory. */
def pushArrayBuffer( arrayBuffer: ArrayBuffer[_],
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) { val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s Pushed block $blockId in ${(System.currentTimeMillis - time)} ms )
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s Reported block $blockId)
}
看看 BlockGenerator
注释很清晰,有两个线程
周期性的 将上一批数据 作为一个 block,并新建下一个批次的数据;RecurringTimer 类,内部有 Thread
将数据 push 到 BlockManager
//
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*
* Note: Do not create BlockGenerator instances directly inside receivers. Use
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging{private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
* The BlockGenerator can be in 5 possible states, in the order as follows.
*
* - Initialized: Nothing has been started
* - Active: start() has been called, and it is generating blocks on added data.
* - StoppedAddingData: stop() has been called, the adding of data has been stopped,
* but blocks are still being generated and pushed.
* - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
* they are still being pushed.
* - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
*/
private object GeneratorState extends Enumeration {
type GeneratorState = Value
val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value
import GeneratorState._
private val blockIntervalMs = conf.getTimeAsMs(spark.streaming.blockInterval , 200ms)
require(blockIntervalMs 0, s spark.streaming.blockInterval should be a positive value)
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, BlockGenerator) // 周期性线程
private val blockQueueSize = conf.getInt(spark.streaming.blockQueueSize , 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 负责将数据 push 的
@volatile private var currentBuffer = new ArrayBuffer[Any]
@volatile private var state = Initialized
//...
}
至此,ReceiverSupervisorImpl 实例化完成。不过,截至目前为止 Receiver 还未启动。
到此,关于“ReceiverSupervisorImpl 实例化怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!