共计 5569 个字符,预计需要花费 14 分钟才能阅读完成。
这篇文章主要介绍“Driver 容错安全性怎么实现”,在日常操作中,相信很多人在 Driver 容错安全性怎么实现问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Driver 容错安全性怎么实现”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
· 第一、看 ReceiverTracker 的容错,主要是 ReceiverTracker 接收元数据的进入 WAL, 看 ReceiverTracker 的 addBlock 方法,代码如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s Stream ${receivedBlockInfo.streamId} received +
s block ${receivedBlockInfo.blockStoreResult.blockId} )
} else {
logDebug(s Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving +
s block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log. )
}
writeResult
} catch {
case NonFatal(e) =
logError(s Error adding block $receivedBlockInfo , e)
false
}
}
writeToLog 方法就是进行 WAL 的操作,看 writeToLog 的代码
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s Writing record: $record)
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =
logWarning(s Exception thrown while writing record: $record to the WriteAheadLog. , e)
false
}
} else {
true
}
}
首先判断是否开启了 WAL,根据一下 isWriteAheadLogEnabled 值
private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
接着看 writeAheadLogOption
private val writeAheadLogOption = createWriteAheadLog()
再看 createWriteAheadLog() 方法
private def createWriteAheadLog(): Option[WriteAheadLog] = {
checkpointDirOption.map {checkpointDir =
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}
根据 checkpoint 的配置,获取 checkpoint 的目录,这里可以看出,checkpoint 可以有多个目录。
写完 WAL 才将 receivedBlockInfo 放到内存队列 getReceivedBlockQueue 中
· 第二、看 ReceivedBlockTracker 的 allocateBlocksToBatch 方法,代码如下
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map {streamId =
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x = true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
}
首先从 getReceivedBlockQueue 中获取每一个 receiver 的 ReceivedBlockQueue 队列赋值给 streamIdToBlocks,然后包装一下
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
allocatedBlocks 就是根据时间获取的一批元数据,交给对应 batchDuration 的 job,job 在执行的时候就可以使用,在使用前先进行 WAL,如果 job 出错恢复后,可以知道数据计算到什么位置
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s Possibly processed batch $batchTime need to be processed again in WAL recovery)
}
· 第三、看 cleanupOldBatches 方法,cleanupOldBatches 的功能是从内存中清楚不用的 batches 元数据,再删除 WAL 的数据,再删除之前把要删除的 batches 信息也进行 WAL
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter {_ cleanupThreshTime}.toSeq
logInfo(Deleting batches + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks –= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning(Failed to acknowledge batch clean up in the Write Ahead Log.)
}
}
· 总结一下上面的三种 WAL, 对应下面的三种事件,这就是 ReceiverTracker 的容错
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent
· 看一下 Dstream.graph 和 JobGenerator 的容错,从开始
private def generateJobs(time: Time) {
SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// allocate received blocks to batch
// 分配接收到的数据给 batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// 使用分配的块生成 jobs
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =
// 获取元数据信息
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
// 提交 jobSet
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =
jobScheduler.reportError(Error generating jobs for time + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobs 生成完成后发送 DoCheckpoint 消息,最终调用 doCheckpoint 方法, 代码如下
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint (time – graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo(Checkpointing graph for time + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
到此,关于“Driver 容错安全性怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!