·  第一、看 ReceiverTracker 的容错,主要是 ReceiverTracker 接收元数据的进入 WAL, 看 ReceiverTracker 的 addBlock 方法,代码如下

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

 try {

  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

  if (writeResult) {

   synchronized {

  getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo


  logDebug(s Stream ${receivedBlockInfo.streamId} received +

  s block ${receivedBlockInfo.blockStoreResult.blockId} )

  } else {

  logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving +

  s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )



 } catch {

  case NonFatal(e) =

  logError(s Error adding block $receivedBlockInfo , e)




writeToLog 方法就是进行 WAL 的操作,看 writeToLog 的代码

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

 if (isWriteAheadLogEnabled) {

  logTrace(s Writing record: $record)

  try {




  } catch {

  case NonFatal(e) =

  logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)



 } else {




首先判断是否开启了 WAL,根据一下 isWriteAheadLogEnabled 值

private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

接着看 writeAheadLogOption

private val writeAheadLogOption = createWriteAheadLog()

再看 createWriteAheadLog() 方法

private def createWriteAheadLog(): Option[WriteAheadLog] = { {checkpointDir =

  val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

  WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)



根据 checkpoint 的配置,获取 checkpoint 的目录,这里可以看出,checkpoint 可以有多个目录。
写完 WAL 才将 receivedBlockInfo 放到内存队列 getReceivedBlockQueue 中

·  第二、看 ReceivedBlockTracker 的 allocateBlocksToBatch 方法,代码如下

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

 if (lastAllocatedBatchTime == null || batchTime lastAllocatedBatchTime) {

  val streamIdToBlocks = {streamId =

  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x = true))


  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)


 } else {

  // This situation occurs when:

  // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

  // possibly processed batch job or half-processed batch job need to be processed again,

  // so the batchTime will be equal to lastAllocatedBatchTime.

  // 2. Slow checkpointing makes recovered batch time older than WAL recovered

  // lastAllocatedBatchTime.

  // This situation will only occurs in recovery time.

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)



首先从 getReceivedBlockQueue 中获取每一个 receiver 的 ReceivedBlockQueue 队列赋值给 streamIdToBlocks,然后包装一下

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

allocatedBlocks 就是根据时间获取的一批元数据,交给对应 batchDuration 的 job,job 在执行的时候就可以使用,在使用前先进行 WAL,如果 job 出错恢复后,可以知道数据计算到什么位置

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)


·  第三、看 cleanupOldBatches 方法,cleanupOldBatches 的功能是从内存中清楚不用的 batches 元数据,再删除 WAL 的数据,再删除之前把要删除的 batches 信息也进行 WAL

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

 require(cleanupThreshTime.milliseconds clock.getTimeMillis())

 val timesToCleanup = timeToAllocatedBlocks.keys.filter {_ cleanupThreshTime}.toSeq

 logInfo(Deleting batches + timesToCleanup)

 if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

  timeToAllocatedBlocks –= timesToCleanup

  writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

 } else {

  logWarning(Failed to acknowledge batch clean up in the Write Ahead Log.)



·  总结一下上面的三种 WAL, 对应下面的三种事件,这就是 ReceiverTracker 的容错

/** Trait representing any event in the ReceivedBlockTracker that updates its state. */

private[streaming] sealed trait ReceivedBlockTrackerLogEvent

private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

·  看一下 Dstream.graph 和 JobGenerator 的容错,从开始

private def generateJobs(time: Time) {

SparkEnv has been removed.


 Try {


  // allocate received blocks to batch

  // 分配接收到的数据给 batch


  // 使用分配的块生成 jobs

  graph.generateJobs(time) // generate jobs using allocated block

 } match {

  case Success(jobs) =

  // 获取元数据信息

  val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

  // 提交 jobSet

  jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  case Failure(e) =

  jobScheduler.reportError(Error generating jobs for time + time, e)

 }, clearCheckpointDataLater = false))


jobs 生成完成后发送 DoCheckpoint 消息,最终调用 doCheckpoint 方法, 代码如下

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

 if (shouldCheckpoint (time – graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

  logInfo(Checkpointing graph for time + time)


  checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)




