Spark Streaming流计算框架如何运行

72次阅读
没有评论

共计 5146 个字符,预计需要花费 13 分钟才能阅读完成。

这篇文章主要讲解了“Spark Streaming 流计算框架如何运行”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Spark Streaming 流计算框架如何运行”吧!

先贴案例

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster(spark://master:7077).setAppName(StreamingWordCountSelfScala)
 val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) //  每 5 秒收割一次数据
 val lines = ssc.socketTextStream(localhost , 9999) //  监听   本地 9999 socket  端口
 val words = lines.flatMap(_.split(  )).map((_, 1)).reduceByKey(_ + _) // flat map  后  reduce
 words.print() //  打印结果
 ssc.start() //  启动
 ssc.awaitTermination()
 ssc.stop(true)
 }
}

再来回溯下触发过程。

定时器定时触发执行某个方法。这里是  longTime = eventLoop.post(GenerateJobs(new Time(longTime))),将一个  GenerateJobs 类型的事件消息发送到  eventLoop 的   队列中。

// JobGenerator.scala line 58
 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
 longTime =  eventLoop.post(GenerateJobs(new Time(longTime))),  JobGenerator )

另一方便,eventLoop 一直循环取出队列中的事件消息,当取出 GenerateJobs 类型的事件消息时。会调用 onReceive(event)。

// EventLoop.scala line 48
 onReceive(event)

此时的 onReceive(event) 在 JobGenerator 实例化 eventLoop 时已经 override 了。

// JobGenerator.scala line 87
 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

调用 generatorJobs(time)

// JobGenerator.scala line 181
 case GenerateJobs(time) =  generateJobs(time)

graph.generateJobs

// JobGenerator.scala line 248
graph.generateJobs(time)

通过 outputStream.generateJob  还原出 RDD 的整个依赖,并创建出 Job。这个 outputStream 就是 ForEachDStream。

// DStreamGraph.scala line 115
 val jobOption = outputStream.generateJob(time)
 在本案例中,按照  SocketInputDStream   FlatMappedDStream   MappedDStream   ShuffledDStream   ForEachDStream  的依赖关系
调用 parent.getOrCompute,此 getOrCompute 只在 DStream 中有定义,所有子类都没重写过此方法。在此方法中,会调用当前 DStream 的 compute 方法,而 compute 中又调用了 parent.getOrCompute,同时将当前的 DStream 的 func 加入到串联的 RDD 之后。

一直循环,直到 inputStream,本例中为 SocketInputDStream 的 compute 被执行,实际上执行的是 ReceiverInputDStream.compute,创建出 BlockRDD。

至此整个 RDD 被还原出来。作为参数传入 Job 的构造中。 

至此 Job 创建成功,但是此 Job 为 Spark Core 中的 Job,而且也并没有被提交到 spark 集群中。

获取给定时间对应的输入数据的信息,此时得到的都是元数据,即输入数据的元数据。

再创建成 JobSet,并提交 JobSet

// JobGenerator.scala line 251
 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

jobScheduler.submitJobSet

交由 jobExecutor 线程池来处理,这里显然可以推测出,JobHandler 一定是一个 Runnable 或者 Callable 接口的实现。

另外 jobExecutor 默认的线程数量是 1,从并发性考虑,建议与 outputStreams 的数量保持一致:DStreamGraph.outputStreams.size

// JobScheduler.scala line 122
 def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo( No jobs added for time   + jobSet.time)
 } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
 jobSets.put(jobSet.time, jobSet)
 jobSet.jobs.foreach(job =  jobExecutor.execute(new JobHandler(job)))
 logInfo(Added jobs for time   + jobSet.time)
 }
 }
JobHandler 中封装的 run 方法 

发送 JobStarted 事件消息,用于监控

job.run,真正的 Job 提交,注意,这里的 Job 提交是指提交 Streaming 的 Job 到 Spark 集群,类似普通 Spark 程序将 RDD 提交给 Spark 集群运行

// JobScheduler.scala line 202
 def run() {
 try {
 val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
 val batchUrl = s /streaming/batch/?id=${job.time.milliseconds} 
 val batchLinkText = s [output operation ${job.outputOpId}, batch time ${formattedTime}] 
 ssc.sc.setJobDescription( s Streaming job from  a href= $batchUrl $batchLinkText /a)
 ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
 ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
 // We need to assign `eventLoop` to a temp variable. Otherwise, because
 // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
 // it s possible that when `post` is called, `eventLoop` happens to null.
 var _eventLoop = eventLoop
 if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
 // Disable checks for existing output directories in jobs launched by the streaming
 // scheduler, since we may need to write output to an existing directory during checkpoint
 // recovery; see SPARK-4835 for more details.
 PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run()
 }
 _eventLoop = eventLoop
 if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
 }
 } else {
 // JobScheduler has been stopped.
 }
 } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
 ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
 }
 }

job.run

// Job.scala line 38
 def run() { _result = Try(func())
 }

执行 func(),而此时的 func 就是在 ForEachDStream 中封装 Job 的第二个参数。

在本例中,即为

() = foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t= t.split())).map(_.map[U](t= (t,1))).combineByKey[C](t= t, (t1,t2)= t1+t2, (t1,t2)= t1+t2,partitioner, true),time)

至于如何推导出此 RDD,可参考前文。

读者们,至此,是否有很熟悉的感觉,很明显,上面的代码就是一个函数,函数没有参数,方法体中,执行的代码中,从 new BlockRDD 开始,就是我们普通的 Spark 的程序:新建 RDD,然后一连串 transform,最后将结果交给 foreachFunc 处理。

由此,SparkStreaming 最终是转变为普通的 Spark Application 来提交给 Spark 集群来执行。是否也可以理解 Spark Streaming 其实就是 Spark 的一个应用程序。而已。

感谢各位的阅读,以上就是“Spark Streaming 流计算框架如何运行”的内容了,经过本文的学习后,相信大家对 Spark Streaming 流计算框架如何运行这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计5146字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)