共计 2688 个字符,预计需要花费 7 分钟才能阅读完成。
本篇内容主要讲解“Driver 容错安全性是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Driver 容错安全性是什么”吧!
从数据层面,ReceivedBlockTracker 为整个 Spark Streaming 应用程序记录元数据信息。
从调度层面,DStreamGraph 和 JobGenerator 是 Spark Streaming 调度的核心,记录当前调度到哪一进度,和业务有关。
ReceivedBlockTracker 在接收到元数据信息后调用 addBlock 方法,先写入磁盘中,然后在写入内存中。
根据 batchTime 分配属于当前 BatchDuration 要处理的数据到 timToAllocatedBlocks 数据结构中。
Time 类的是一个 case class,记录时间,重载了操作符,隐式转换,值得借鉴。
case class Time(private val millis: Long) {
def milliseconds: Long = millis
def (that: Time): Boolean = (this.millis that.millis)
def = (that: Time): Boolean = (this.millis = that.millis)
def (that: Time): Boolean = (this.millis that.millis)
def = (that: Time): Boolean = (this.millis = that.millis)
def + (that: Duration): Time = new Time(millis + that.milliseconds)
def – (that: Time): Duration = new Duration(millis – that.millis)
def – (that: Duration): Time = new Time(millis – that.milliseconds)
// Java-friendlier versions of the above.
def less(that: Time): Boolean = this that
def lessEq(that: Time): Boolean = this = that
def greater(that: Time): Boolean = this that
def greaterEq(that: Time): Boolean = this = that
def plus(that: Duration): Time = this + that
def minus(that: Time): Duration = this – that
def minus(that: Duration): Time = this – that
def floor(that: Duration): Time = {
val t = that.milliseconds
new Time((this.millis / t) * t)
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis – zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.milliseconds == 0)
def min(that: Time): Time = if (this that) this else that
def max(that: Time): Time = if (this that) this else that
def until(that: Time, interval: Duration): Seq[Time] = {
(this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
def to(that: Time, interval: Duration): Seq[Time] = {
(this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
override def toString: String = (millis.toString + ms)
object Time {
implicit val ordering = Ordering.by((time: Time) = time.millis)
跟踪 Time 对象,ReceiverTracker 的 allocateBlocksToBatch 方法中的入参 batchTime 是被 JobGenerator 的 generateJobs 方法调用的。
JobGenerator 的 generateJobs 方法是被定时器发送 GenerateJobs 消息调用的。
GenerateJobs 中的时间参数就是 nextTime,而 nextTime+=period,这个 period 就是 ssc.graph.batchDuration.milliseconds。
nextTime 的初始值是在 start 方法中传入的 startTime 赋值的,即 RecurringTimer 的 getStartTime 方法的返回值,是当前时间 period 的 (整数倍 +1)。
Period 这个值是我们调用 new StreamingContext 来构造 StreamingContext 时传入的 Duration 值。
ReceivedBlockTracker 会清除过期的元数据信息,从 HashMap 中移除,也是先写入磁盘,然后在写入内存。
元数据的生成,消费和销毁都有 WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了 checkpoint 目录,才进行 WAL 机制。
对传入的 checkpoint 目录来创建日志目录进行 WAL。
这里是在 checkpoint 目录下创建文件夹名为 receivedBlockMetadata 的文件夹来保存 WAL 记录的数据。
把当前的 DStream 和 JobGenerator 的状态进行 checkpoint,该方法是在 generateJobs 方法最后通过发送 DoCheckpoint 消息,来调用的。
到此,相信大家对“Driver 容错安全性是什么”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!