共计 3792 个字符,预计需要花费 10 分钟才能阅读完成。
这篇文章给大家分享的是有关 Spark 结构化流处理机制之容错机制的示例分析的内容。丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,一起跟随丸趣 TV 小编过来看看吧。
容错机制
端到端的有且仅有一次保证, 是结构化流设计的关键目标之一.
结构化流设计了 Structured Streaming sources,sinks 等等, 来跟踪确切的处理进度, 并让其重启或重运行来处理任何故障
streaming source 是类似 kafka 的偏移量 (offsets) 来跟踪流的读取位置. 执行引擎使用检查点 (checkpoint) 和预写日志 (write ahead logs) 来记录每个执行其的偏移范围值
streaming sinks 是设计用来保证处理的幂等性
这样, 依靠可回放的数据源 (streaming source) 和处理幂等(streaming sinks), 结构流来做到任何故障下的端到端的有且仅有一次保证
val lines = spark.readStream
.format(socket)
.option(host , localhost)
.option(port , 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split( ))
// Generate running word count
val wordCounts = words.groupBy(value).count()
其中,spark 是 SparkSession,lines 是 DataFrame,DataFrame 就是 Dataset[Row]。
DataSet
看看 Dataset 的触发因子的代码实现,比如 foreach 操作:
def foreach(f: T = Unit): Unit = withNewRDDExecutionId { rdd.foreach(f)
}
private def withNewRDDExecutionId[U](body: = U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
rddQueryExecution.executedPlan.foreach { plan =
plan.resetMetrics()
}
body
}
}
接着看:
def withNewExecutionId[T](
sparkSession: SparkSession,
queryExecution: QueryExecution,
name: Option[String] = None)(body: = T): T = {
val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
executionIdToQueryExecution.put(executionId, queryExecution)
try {
withSQLConfPropagated(sparkSession) {
try {
body
} catch {
} finally {
}
}
} finally { executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
}
}
执行的真正代码就是 queryExecution: QueryExecution。
@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
看到了看到了,是 sessionState.executePlan 执行 logicalPlan 而得到了 QueryExecution
这里的 sessionState.executePlan 其实就是创建了一个 QueryExecution 对象。然后执行 QueryExecution 的 executedPlan 方法得到 SparkPlan 这个物理计划。怎么生成的呢?
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession)
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
}
通过 planner.plan 方法生成。
planner 是 SparkPlanner。在 BaseSessionStateBuilder 类中定义。
protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}
SparkPlanner 类
SparkPlanner 对 LogicalPlan 执行各种策略,返回对应的 SparkPlan。比如对于流应用来说,有这样的策略:DataSourceV2Strategy。
典型的几个逻辑计划到物理计划的映射关系如下:
StreamingDataSourceV2Relation-》ContinuousScanExec
StreamingDataSourceV2Relation-》MicroBatchScanExec
前一种对应与 Offset 没有 endOffset 的情况,后一种对应于有 endOffset 的情况。前一种是没有结束的连续流,后一种是有区间的微批处理流。
前一种的时延可以达到 1ms,后一种的时延只能达到 100ms。
【代码】:
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined r.endOffset.isDefined =
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined r.endOffset.isEmpty =
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
感谢各位的阅读!关于“Spark 结构化流处理机制之容错机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!