Driver容错安全性怎么实现

41次阅读
没有评论

共计 5569 个字符,预计需要花费 14 分钟才能阅读完成。

这篇文章主要介绍“Driver 容错安全性怎么实现”,在日常操作中,相信很多人在 Driver 容错安全性怎么实现问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Driver 容错安全性怎么实现”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

·  第一、看 ReceiverTracker 的容错,主要是 ReceiverTracker 接收元数据的进入 WAL, 看 ReceiverTracker 的 addBlock 方法,代码如下

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

 try {

  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

  if (writeResult) {

   synchronized {

  getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

  }

  logDebug(s Stream ${receivedBlockInfo.streamId} received +

  s block ${receivedBlockInfo.blockStoreResult.blockId} )

  } else {

  logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving +

  s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )

  }

  writeResult

 } catch {

  case NonFatal(e) =

  logError(s Error adding block $receivedBlockInfo , e)

  false

 }

}

writeToLog 方法就是进行 WAL 的操作,看 writeToLog 的代码

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

 if (isWriteAheadLogEnabled) {

  logTrace(s Writing record: $record)

  try {

  writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

  clock.getTimeMillis())

  true

  } catch {

  case NonFatal(e) =

  logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)

  false

  }

 } else {

  true

 }

}

首先判断是否开启了 WAL,根据一下 isWriteAheadLogEnabled 值

private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

接着看 writeAheadLogOption

private val writeAheadLogOption = createWriteAheadLog()

再看 createWriteAheadLog() 方法

private def createWriteAheadLog(): Option[WriteAheadLog] = {

 checkpointDirOption.map {checkpointDir =

  val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

  WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

 }

}

根据 checkpoint 的配置,获取 checkpoint 的目录,这里可以看出,checkpoint 可以有多个目录。
写完 WAL 才将 receivedBlockInfo 放到内存队列 getReceivedBlockQueue 中

·  第二、看 ReceivedBlockTracker 的 allocateBlocksToBatch 方法,代码如下

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

 if (lastAllocatedBatchTime == null || batchTime lastAllocatedBatchTime) {

  val streamIdToBlocks = streamIds.map {streamId =

  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x = true))

  }.toMap

  val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

  }

 } else {

  // This situation occurs when:

  // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

  // possibly processed batch job or half-processed batch job need to be processed again,

  // so the batchTime will be equal to lastAllocatedBatchTime.

  // 2. Slow checkpointing makes recovered batch time older than WAL recovered

  // lastAllocatedBatchTime.

  // This situation will only occurs in recovery time.

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

 }

}

首先从 getReceivedBlockQueue 中获取每一个 receiver 的 ReceivedBlockQueue 队列赋值给 streamIdToBlocks,然后包装一下

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

allocatedBlocks 就是根据时间获取的一批元数据,交给对应 batchDuration 的 job,job 在执行的时候就可以使用,在使用前先进行 WAL,如果 job 出错恢复后,可以知道数据计算到什么位置

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

  if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

  timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

  lastAllocatedBatchTime = batchTime

  } else {

  logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)

}

·  第三、看 cleanupOldBatches 方法,cleanupOldBatches 的功能是从内存中清楚不用的 batches 元数据,再删除 WAL 的数据,再删除之前把要删除的 batches 信息也进行 WAL

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

 require(cleanupThreshTime.milliseconds clock.getTimeMillis())

 val timesToCleanup = timeToAllocatedBlocks.keys.filter {_ cleanupThreshTime}.toSeq

 logInfo(Deleting batches + timesToCleanup)

 if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

  timeToAllocatedBlocks –= timesToCleanup

  writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

 } else {

  logWarning(Failed to acknowledge batch clean up in the Write Ahead Log.)

 }

}

·  总结一下上面的三种 WAL, 对应下面的三种事件,这就是 ReceiverTracker 的容错

/** Trait representing any event in the ReceivedBlockTracker that updates its state. */

private[streaming] sealed trait ReceivedBlockTrackerLogEvent

private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

extends ReceivedBlockTrackerLogEvent

private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

·  看一下 Dstream.graph 和 JobGenerator 的容错,从开始

private def generateJobs(time: Time) {

SparkEnv has been removed.

 SparkEnv.set(ssc.env)

 Try {

 

  // allocate received blocks to batch

  // 分配接收到的数据给 batch

  jobScheduler.receiverTracker.allocateBlocksToBatch(time)

  // 使用分配的块生成 jobs

  graph.generateJobs(time) // generate jobs using allocated block

 } match {

  case Success(jobs) =

  // 获取元数据信息

  val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

  // 提交 jobSet

  jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  case Failure(e) =

  jobScheduler.reportError(Error generating jobs for time + time, e)

 }

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

jobs 生成完成后发送 DoCheckpoint 消息,最终调用 doCheckpoint 方法, 代码如下

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

 if (shouldCheckpoint (time – graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

  logInfo(Checkpointing graph for time + time)

  ssc.graph.updateCheckpointData(time)

  checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

 }

}

 

到此,关于“Driver 容错安全性怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计5569字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)