Spark Streaming怎么使用

51次阅读
没有评论

共计 3165 个字符,预计需要花费 8 分钟才能阅读完成。

这篇文章主要介绍“Spark Streaming 怎么使用”,在日常操作中,相信很多人在 Spark Streaming 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark Streaming 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

DStream 是逻辑级别的,而 RDD 是物理级别的。DStream 是随着时间的流动内部将集合封装 RDD。对 DStream 的操作,转过来对其内部的 RDD 操作。

纵轴为空间维度:代表的是 RDD 的依赖关系构成的具体的处理逻辑的步骤,是用 DStream 来表示的。

横轴为时间维度:按照特定的时间间隔不断地生成 job 对象,并在集群上运行。

随着时间的推移,基于 DStream Graph 不断生成 RDD Graph , 也即 DAG 的方式生成 job, 并通过 Job Scheduler 的线程池的方式提交给 spark cluster 不断的执行。

由上可知,RDD     与  DStream 的关系如下

RDD 是物理级别的,而 DStream 是逻辑级别的

DStream 是 RDD 的封装类,是 RDD 进一步的抽象

DStream 是 RDD 的模板。DStream 要依赖 RDD 进行具体的数据计算

注意:纵轴维度需要 RDD,DAG 的生成模板,需要 TimeLine 的 job 控制器

横轴维度(时间维度)包含 batch interval, 窗口长度,窗口滑动时间等。

3,Spark Streaming 源码解析

StreamingContext 方法中调用 JobScheduler 的 start 方法

JobGenerator 的 start 方法中,调用 startFirstTime 方法,来开启定时生成 Job 的定时器

startFirstTime 方法,首先调用 DStreamGraph 的 start 方法,然后再调用 RecurringTimer 的 start 方法。

timer 对象为一个定时器,根据 batchInterval 时间间隔定期向 EventLoop 发送 GenerateJobs 的消息。

接收到 GenerateJobs 消息后,会回调 generateJobs 方法。

generateJobs 方法再调用 DStreamGraph 的 generateJobs 方法生成 Job

DStreamGraph 的 generateJobs 方法

DStreamGraph 的实例化是在 StreamingContext 中的

DStreamGraph 类中保存了输入流和输出流信息

Spark Streaming 怎么使用

Spark Streaming 怎么使用

回到 JobGenerator 的 start 方法中 receiverTracker.start()

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其中 ReceiverTrackerEndpoint 对象为一个消息循环体

Spark Streaming 怎么使用

launchReceivers 方法中发送 StartAllReceivers 消息

Spark Streaming 怎么使用

接收到 StartAllReceivers 消息后,进行如下处理

Spark Streaming 怎么使用

Spark Streaming 怎么使用

StartReceiverFunc 方法如下,实例化 Receiver 监控者,开启并等待退出

Spark Streaming 怎么使用

supervisor 的 start 方法中调用 startReceiver 方法

Spark Streaming 怎么使用

Spark Streaming 怎么使用

我们以 socketTextStream 为例,其启动的是 SocketReceiver,内部开启一个线程,来接收数据。

Spark Streaming 怎么使用

Spark Streaming 怎么使用

内部调用 supervisor 的 pushSingle 方法,将数据聚集后存放在内存中

Spark Streaming 怎么使用

supervisor 的 pushSingle 方法如下,将数据放入到 defaultBlockGenerator 中,defaultBlockGenerator 为 BlockGenerator,保存 Socket 接收到的数据

Spark Streaming 怎么使用

Spark Streaming 怎么使用

BlockGenerator 对象中有一个定时器,来更新当前的 Buffer

Spark Streaming 怎么使用

Spark Streaming 怎么使用

BlockGenerator 对象中有一个线程,来从阻塞队列中取出数据

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

调用 ReceiverSupervisorImpl 类中的继承 BlockGeneratorListener 的匿名类中的 onPushBlock 方法。

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

receivedBlockHandler 对象如下

Spark Streaming 怎么使用

这里我们讲解 BlockManagerBasedBlockHandler 的方式

Spark Streaming 怎么使用

trackerEndpoint 如下

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其实是发送给 ReceiverTrackerEndpoint 类,

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

InputInfoTracker 类的 reportInfo 方法只是对数据进行记录统计

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其 generateJob 方法是被 DStreamGraph 调用

Spark Streaming 怎么使用

DStreamGraph 的 generateJobs 方法是被 JobGenerator 类的 generateJobs 方法调用。

Spark Streaming 怎么使用

JobGenerator 类中有一个定时器,batchInterval 发送 GenerateJobs 消息

Spark Streaming 怎么使用

总结:

1,当调用 StreamingContext 的 start 方法时,启动了 JobScheduler

2,当 JobScheduler 启动后会先后启动 ReceiverTracker 和 JobGenerator

3,ReceiverTracker 启动后会创建 ReceiverTrackerEndpoint 这个消息循环体,来接收运行在 Executor 上的 Receiver 发送过来的消息

4,ReceiverTracker 在启动时会给自己发送 StartAllReceivers 消息,自己接收到消息后,向 Spark 提交 startReceiverFunc 的 Job

5,startReceiverFunc 方法中在 Executor 上启动 Receiver,并实例化 ReceiverSupervisorImpl 对象,来监控 Receiver 的运行

6,ReceiverSupervisorImpl 对象会调用 Receiver 的 onStart 方法,我们以 SocketReceiver 为例,启动一个线程,连接 Server,读取网络数据先调用 ReceiverSupervisorImpl 的 pushSingle 方法,

保存在 BlockGenerator 对象中,该对象内部有个定时器,放到阻塞队列 blocksForPushing,等待内部线程取出数据放到 BlockManager 中,并发 AddBlock 消息给 ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint 为 ReceiverTracker 的内部类,在接收到 addBlock 消息后将 streamId 对应的数据阻塞队列 streamIdToUnallocatedBlockQueues 中

7,JobGenerator 启动后会启动以 batchInterval 时间间隔发送 GenerateJobs 消息的定时器

8,接收到 GenerateJobs 消息会先后触发 ReceiverTracker 的 allocateBlocksToBatch 方法和 DStreamGraph 的 generateJobs 方法

9,ReceiverTracker 的 allocateBlocksToBatch 方法会调用 getReceivedBlockQueue 方法从阻塞队列 streamIdToUnallocatedBlockQueues 中根据 streamId 获取数据

10,DStreamGraph 的 generateJobs 方法,继而调用变量名为 outputStreams 的 DStream 集合的 generateJob 方法

11,继而调用 DStream 的 getOrCompute 来调用具体的 DStream 的 compute 方法,我们以 ReceiverInputDStream 为例,compute 方法是从 ReceiverTracker 中获取数据

到此,关于“Spark Streaming 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3165字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)