共计 3165 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章主要介绍“Spark Streaming 怎么使用”,在日常操作中,相信很多人在 Spark Streaming 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark Streaming 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
DStream 是逻辑级别的,而 RDD 是物理级别的。DStream 是随着时间的流动内部将集合封装 RDD。对 DStream 的操作,转过来对其内部的 RDD 操作。
纵轴为空间维度:代表的是 RDD 的依赖关系构成的具体的处理逻辑的步骤,是用 DStream 来表示的。
横轴为时间维度:按照特定的时间间隔不断地生成 job 对象,并在集群上运行。
随着时间的推移,基于 DStream Graph 不断生成 RDD Graph , 也即 DAG 的方式生成 job, 并通过 Job Scheduler 的线程池的方式提交给 spark cluster 不断的执行。
由上可知,RDD 与 DStream 的关系如下
RDD 是物理级别的,而 DStream 是逻辑级别的
DStream 是 RDD 的封装类,是 RDD 进一步的抽象
DStream 是 RDD 的模板。DStream 要依赖 RDD 进行具体的数据计算
注意:纵轴维度需要 RDD,DAG 的生成模板,需要 TimeLine 的 job 控制器
横轴维度(时间维度)包含 batch interval, 窗口长度,窗口滑动时间等。
3,Spark Streaming 源码解析
StreamingContext 方法中调用 JobScheduler 的 start 方法
JobGenerator 的 start 方法中,调用 startFirstTime 方法,来开启定时生成 Job 的定时器
startFirstTime 方法,首先调用 DStreamGraph 的 start 方法,然后再调用 RecurringTimer 的 start 方法。
timer 对象为一个定时器,根据 batchInterval 时间间隔定期向 EventLoop 发送 GenerateJobs 的消息。
接收到 GenerateJobs 消息后,会回调 generateJobs 方法。
generateJobs 方法再调用 DStreamGraph 的 generateJobs 方法生成 Job
DStreamGraph 的 generateJobs 方法
DStreamGraph 的实例化是在 StreamingContext 中的
DStreamGraph 类中保存了输入流和输出流信息
回到 JobGenerator 的 start 方法中 receiverTracker.start()
其中 ReceiverTrackerEndpoint 对象为一个消息循环体
launchReceivers 方法中发送 StartAllReceivers 消息
接收到 StartAllReceivers 消息后,进行如下处理
StartReceiverFunc 方法如下,实例化 Receiver 监控者,开启并等待退出
supervisor 的 start 方法中调用 startReceiver 方法
我们以 socketTextStream 为例,其启动的是 SocketReceiver,内部开启一个线程,来接收数据。
内部调用 supervisor 的 pushSingle 方法,将数据聚集后存放在内存中
supervisor 的 pushSingle 方法如下,将数据放入到 defaultBlockGenerator 中,defaultBlockGenerator 为 BlockGenerator,保存 Socket 接收到的数据
BlockGenerator 对象中有一个定时器,来更新当前的 Buffer
BlockGenerator 对象中有一个线程,来从阻塞队列中取出数据
调用 ReceiverSupervisorImpl 类中的继承 BlockGeneratorListener 的匿名类中的 onPushBlock 方法。
receivedBlockHandler 对象如下
这里我们讲解 BlockManagerBasedBlockHandler 的方式
trackerEndpoint 如下
其实是发送给 ReceiverTrackerEndpoint 类,
InputInfoTracker 类的 reportInfo 方法只是对数据进行记录统计
其 generateJob 方法是被 DStreamGraph 调用
DStreamGraph 的 generateJobs 方法是被 JobGenerator 类的 generateJobs 方法调用。
JobGenerator 类中有一个定时器,batchInterval 发送 GenerateJobs 消息
总结:
1,当调用 StreamingContext 的 start 方法时,启动了 JobScheduler
2,当 JobScheduler 启动后会先后启动 ReceiverTracker 和 JobGenerator
3,ReceiverTracker 启动后会创建 ReceiverTrackerEndpoint 这个消息循环体,来接收运行在 Executor 上的 Receiver 发送过来的消息
4,ReceiverTracker 在启动时会给自己发送 StartAllReceivers 消息,自己接收到消息后,向 Spark 提交 startReceiverFunc 的 Job
5,startReceiverFunc 方法中在 Executor 上启动 Receiver,并实例化 ReceiverSupervisorImpl 对象,来监控 Receiver 的运行
6,ReceiverSupervisorImpl 对象会调用 Receiver 的 onStart 方法,我们以 SocketReceiver 为例,启动一个线程,连接 Server,读取网络数据先调用 ReceiverSupervisorImpl 的 pushSingle 方法,
保存在 BlockGenerator 对象中,该对象内部有个定时器,放到阻塞队列 blocksForPushing,等待内部线程取出数据放到 BlockManager 中,并发 AddBlock 消息给 ReceiverTrackerEndpoint。
ReceiverTrackerEndpoint 为 ReceiverTracker 的内部类,在接收到 addBlock 消息后将 streamId 对应的数据阻塞队列 streamIdToUnallocatedBlockQueues 中
7,JobGenerator 启动后会启动以 batchInterval 时间间隔发送 GenerateJobs 消息的定时器
8,接收到 GenerateJobs 消息会先后触发 ReceiverTracker 的 allocateBlocksToBatch 方法和 DStreamGraph 的 generateJobs 方法
9,ReceiverTracker 的 allocateBlocksToBatch 方法会调用 getReceivedBlockQueue 方法从阻塞队列 streamIdToUnallocatedBlockQueues 中根据 streamId 获取数据
10,DStreamGraph 的 generateJobs 方法,继而调用变量名为 outputStreams 的 DStream 集合的 generateJob 方法
11,继而调用 DStream 的 getOrCompute 来调用具体的 DStream 的 compute 方法,我们以 ReceiverInputDStream 为例,compute 方法是从 ReceiverTracker 中获取数据
到此,关于“Spark Streaming 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!