RxJava线程切换过程是怎样的

45次阅读
没有评论

共计 7836 个字符,预计需要花费 20 分钟才能阅读完成。

今天就跟大家聊聊有关 RxJava 线程切换过程是怎样的,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

线程切换过程

下面我们就来看看它的又一利器,调度器 Scheduler:就像我们所知道的,Scheduler 是给 Observable 数据流添加多线程功能所准备的,一般我们会通过使用 subscribeOn()、observeOn() 方法传入对应的 Scheduler 去指定数据流的每部分操作应该以何种方式运行在何种线程。对于我们而言,最常见的莫过于在非主线程获取并处理数据之后在主线程更新 UI 这样的场景了:

这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰,我们现在来看看这其中的线程切换流程。

subscribeOn()

当我们调用 subscribeOn() 的时候:

可以看到这里也是调用了 create() 去生成一个 Observable,而 OperatorSubscribeOn 则是实现了 OnSubscribe 接口,同时将原始的 Observable 和我们需要的 scheduler 传入:

可以看出来,这里对 subscriber 的处理与前文中 OperatorMap 中 call() 对 subscriber 的处理很相似。在这里我们同样会根据传入的 subscriber 构造出新的 Subscribers,不过这一系列的过程大部分都是由 worker 通过 schedule() 去执行的,从后面 setProducer() 中对于线程的判断,再结合 subscribeOn() 方法的目的我们能大概推测出,这个 worker 在一定程度上就相当于一个新线程的代理执行者,schedule() 所实现的与 Thread 类中 run() 应该十分类似。我们现在来看看这个 worker 的执行过程。
首先从 Schedulers.io() 进入:

这个通过 hook 拿到 scheduler 的过程我们先不管,直接进 CachedThreadScheduler,看它的 createWorker() 方法:

这里的 pool 是一个原子变量引用 AtomicReference,所持有的则是 CachedWorkerPool,因而这个 pool 顾名思义就是用来保存 worker 的缓存池啦,我们从缓存池里拿到需要的 worker 并作了一层封装成为 EventLoopWorker:

在这里我们终于发现目标 ThreadWorker,它继承自 NewThreadWorker,之前的 schedule() 方法最终都会到这个 scheduleActual() 方法里:

这里我们看到了 executor 线程池,我们用 Schedulers.io() 最终实现的线程切换的本质就在这里了。现在再结合之前的过程我们从头梳理一下:

在 subscribeOn() 时,我们会新生成一个 Observable,它的成员 onSubscribe 会在目标 Subscriber 订阅时使用传入的 Scheduler 的 worker 作为线程调度执行者,在对应的线程中通知原始 Observable 发送消息给这个过程中临时生成的 Subscriber,这个 Subscriber 又会通知到目标 Subscriber,这样就完成了 subscribeOn() 的过程。

observeOn()

下面我们接着来看看 observeOn():

我们直接看最终调用的部分,可以看到这里又是一个 lift(),在这里传入了 OperatorObserveOn,它与 OperatorSubscribeOn 不同,是一个 Operator(Operator 的功能我们上文中已经讲过就不赘述了),它构造出了新的观察者 ObserveOnSubscriber 并实现了 Action0 接口:

可以看出来,这里 ObserveOnSubscriber 所有的发送给目标 Subscriber child 的消息都被切换到了 recursiveScheduler 的线程作处理,也就达到了将线程切回的目的。

总结 observeOn() 整体流程如下:

对比 subscribeOn() 和 observeOn() 这两个过程,我们不难发现两者的区别:subscribeOn() 将初始 Observable 的订阅事件整体都切换到了另一个线程;而 observeOn() 则是将初始 Observable 发送的消息切换到另一个线程通知到目标 Subscriber。前者把“订阅 + 发送”的切换了一个线程,后者把“发送”切换了一个线程。所以,我们的代码中所实现的功能其实是:

这样就能很容易实现耗时任务在子线程操作,在主线程作更新操作等这些常见场景的功能啦。

4. 其他角色

Subject
Subject 在 Rx 系列是一个比较特殊的角色,它继承了 Observable 的同时也实现了 Observer 接口,也就是说它既可作为观察者,也可作为被观察者,他一般被用来作为连接多个不同 Observable、Observer 之间的纽带。可能你会奇怪,我们不是已经有了像 map()、flatMap() 这类的操作符去变化 Observable 数据流了吗,为什么还要引入 Subject 这个东西呢?这是因为 Subject 所承担的工作并非是针对 Observable 数据流内容的转换连接,而是数据流本身在 Observable、Observer 之间的调度。光这么说可能还是很模糊,我们举个《RxJava Essentials》中的例子:

我们通过 create() 创建了一个 PublishSubject,观察者成功订阅了这个 subject,然而这个 subject 却没有任何数据要发送,我们只是知道他未来会发送的会是 String 值而已。之后,当我们调用 subject.onNext() 时,消息才被发送,Observer 的 onNext() 被触发调用,输出了 Hello World。

这里我们注意到,当订阅事件发生时,我们的 subject 是没有产生数据流的,直到它发射了 Hello World,数据流才开始运转,试想我们如果将订阅过程和 subject.onNext() 调换一下位置,那么 Observer 就一定不会接受到 Hello World 了(这不是废话吗 - -|||),因而这也在根本上反映了 Observable 的冷热区别。

一般而言,我们的 Observable 都属于 Cold Observables,就像看视频,每次点开新视频我们都要从头开始播放;而 Subject 则默认属于 Hot Observables,就像看直播,视频数据永远都是新的。
基于这种属性,Subject 自然拥有了对接收到的数据流进行选择调度等的能力了,因此,我们对于 Subject 的使用也就通常基于如下的思路:

在前面的例子里我们用到的是 PublishSubject,它只会把在订阅发生的时间点之后来自原始 Observable 的数据发射给观察者。等一下,这功能听起来是不是有些似曾相识呢?

没错,就是 EventBus 和 Otto。(RxJava 的出现慢慢让 Otto 退出了舞台,现在 Otto 的 Repo 已经是 Deprecated 状态了,而 EventBus 依旧坚挺)基于 RxJava 的观察订阅取消的能力和 PublishSubject 的功能,我们十分容易就能写出实现了最基本功能的简易事件总线框架:

当然 Subject 还有其他如 BehaviorSubject、ReplaySubject、AsyncSubject 等类型,大家可以去看官方文档,写得十分详细,这里就不介绍了。

三. 后记

前面相信最近这段日子里,提到 RxJava,大家就会想到 Google 最近刚刚开源的 Agera。Agera 作为专门为 Android 打造的 Reactive Programming 框架,难免会被拿来与 RxJava 做对比。本文前面 RxJava 的主体流程分析已近尾声,现在我们再来看看 Agera 这东东又是怎么一回事。

首先先上结论:

Agera 最初是为了 Google Play Movies 而开发的一个内部框架,现在开源出来了,它虽然是在 RxJava 之后才出现,但是完全独立于 RxJava,与它没有任何关系(只不过开源的时间十分微妙罢了 233333)。与 RxJava 比起来,Agera 更加专注于 Android 的生命周期,而 RxJava 则更加纯粹地面向 Java 平台而非 Android。

也许你可能会问:“那么 RxAndroid 呢,不是还有它吗?”事实上,RxAndroid 早在 1.0 版本的时候就进行了很大的重构,很多模块被拆分到其他的项目中去了,同时也删除了部分代码,仅存下来的部分多是和 Android 线程相关的部分,比如 AndroidSchedulers、MainThreadSubscription 等。鉴于这种情况,我们暂且不去关注 RxAndroid,先把目光放在 Agera 上。

同样也是基于观察者模式,Agera 和 RxJava 的角色分类大致相似,在 Agera 中,主要角色有两个:Observable(被观察者)、Updatable(观察者)。

是的,相较于 RxJava 中的 Observable,Agera 中的 Observable 只是一个简单的接口,也没有范性的存在,Updatable 亦是如此,这样我们要如何做到消息的传递呢?这就需要另外一个接口了:

终于看到了泛型 T,我们的消息的传递能力就是依赖于此接口了。所以我们将这个接口和基础的 Observable 结合一下:

这里的 Repository T 在一定程度上就是我们想要的 RxJava 中的 Observable T 啦。类似地,Repository 也有两种类型的实现:

Direct – 所包含的数据总是可用的或者是可被同步计算出来的;一个 Direct 的 Repository 总是处于活跃(active)状态下

Deferred – 所包含的数据是异步计算或拉去所得;一个 Deffered 的 Repository 直到有 Updatable 被添加进来之前都会是非活跃(inactive)状态下
是不是感到似曾相识呢?没错,Repository 也是有冷热区分的,不过我们现在暂且不去关注这一点。回到上面接着看,既然现在发数据的角色有了,那么我们要如何接收数据呢?答案就是 Receiver:

相信看到这里,大家应该也隐约感觉到了:在 Agera 的世界里,数据的传输与事件的传递是相互隔离开的,这是目前 Agera 与 Rx 系列的最大本质区别。Agera 所使用的是一种 push event, pull data 的模型,这意味着 event 并不会携带任何 data,Updatable 在需要更新时,它自己会承担起从数据源拉取数据的任务。这样,提供数据的责任就从 Observable 中拆分了出来交给了 Repository,让其自身能够专注于发送一些简单的事件如按钮点击、一次下拉刷新的触发等等。

那么,这样的实现有什么好处呢?

当这两种处理分发逻辑分离开时,Updatable 就不必观察到来自 Repository 的完整数据变化的历史,毕竟在大多数场景下,尤其是更新 UI 的场景下,最新的数据往往才是有用的数据。

但是我就是需要看到变化的历史数据,怎么办?

不用担心,这里我们再请出一个角色 Reservoir:

顾名思义,Reservoir 就是我们用来存储变化中的数据的地方,它继承了 Receiver、Repository,也就相当于同时具有了接收数据,发送数据的能力。通过查看其具体实现我们可以知道它的本质操作都是使用内部的 Queue 实现的:通过 accept() 接收到数据后入列,通过 get() 拿到数据后出列。若一个 Updatable 观察了此 Reservoir,其队列中发生调度变化后即将出列的下一个数据如果是可用的(非空),就会通知该 Updatable,进一步拉取这个数据发送给 Receiver。

现在,我们已经大概了解了这几个角色的功能属性了,接下来我们来看一段官方示例代码:

是不是有些云里雾里的感觉呢?多亏有注释,我们大概能够猜出到底上面都做了什么:使用需要的图片规格作为参数拼接到 url 中,拉取对应的图片并用 ImageView 显示出来。我们结合 API 来看看整个过程:

Repositories.repositoryWithInitialValue(Result.absent())
创建一个可运行(抑或说执行)的 repository。
初始化传入值是 Result,它用来概括一些诸如 apply()、merge() 的操作的结果的不可变对象,并且存在两种状态 succeeded()、failed()。
返回 REventSource

observe()
用于添加新的 Observable 作为更新我们的图片的 Event source,本例中不需要。
返回 RFrequency

onUpdatesPerLoop()
在每一个 Looper Thread loop 中若有来自多个 Event Source 的 update() 处理时,只需开启一个数据处理流。
返回 RFlow

getFrom(new Supplier(…))
忽略输入值,使用来自给定 Supplier 的新获取的数据作为输出值。
返回 RFlow

goTo(executor)
切换到给定的 executor 继续数据处理流。

attemptTransform(function())
使用给定的 function() 变换输入值,若变换失败,则终止数据流;若成功,则取新的变换后的值作为当前流指令的输出。
返回 RTermination

orSkip()
若前面的操作检查为失败,就跳过剩下的数据处理流,并且不会通知所有已添加的 Updatable。

thenTransform(function())
与 attemptTransform(function()) 相似,区别在于当必要时会发出通知。
返回 RConfig

onDeactivation(SEND_INTERRUPT)
用于明确 repository 不再 active 时的行为。
返回 RConfig

compile()
执行这个 repository。
返回 Repository

整体流程乍看起来并没有什么特别的地方,但是真正的玄机其实藏在执行每一步的返回值里:
初始的 REventSource T, T 代表着事件源的开端,它从传入值接收了 T initialValue,这里的中,第一个 T 是当前 repository 的数据的类型,第二个 T 则是数据处理流开端的时候的数据的类型。

之后,当 observe() 调用后,我们传入事件源给 REventSource,相当于设定好了需要的事件源和对应的开端,这里返回的是 RFrequency T, T,它继承自 REventSource,为其添加了事件源的发送频率的属性。

之后,我们来到了 onUpdatesPerLoop(),这里明确了所开启的数据流的个数(也就是前面所讲的频率)后,返回了 RFlow,这里也就意味着我们的数据流正式生成了。同时,这里也是流式调用的起点。

拿到我们的 RFlow 之后,我们就可以为其提供数据源了,也就是前面说的 Supplier,于是调用 getFrom(),这样我们的数据流也就真正意义拥有了数据“干货”。

有了数据之后我们就可以按具体需要进行数据转换了,这里我们可以直接使用 transform(),返回 RFlow,以便进一步进行流式调用;也可以调用 attemptTransform() 来对可能出现的异常进行处理,比如 orSkip()、orEnd() 之后继续进行流式调用。

经过一系列的流式调用之后,我们终于对数据处理完成啦,现在我们可以选择先对成型的数据在做一次最后的包装 thenTransform(),或是与另一个 Supplier 合并 thenMergeIn() 等。这些处理之后,我们的返回值也就转为了 RConfig,进入了最终配置和 repository 声明结束的状态。
在最终的这个配置过程中,我们调用了 onDeactivation(),为这个 repository 明确了最终进入非活跃状态时的行为,如果不需要其他多余的配置的话,我们就可以进入最终的 compile() 方法了。当我们调用 compile() 时,就会按照前面所走过的所有流程与配置去执行并生成这个 repository。到此,我们的 repository 才真正被创建了出来。

以上就是 repository 从无到有的全过程。当 repository 诞生后,我们也就可以传输需要的数据啦。再回到上面的示例代码:

我们在 onResume()、onPause() 这两个生命周期下分别添加、移除了 Updatable。相较于 RxJava 中通过 Subscription 去取消订阅的做法,Agera 的这种写法显然更为清晰也更为整洁。我们的 Activity 实现了 Updatable 和 Receiver 接口,直接看其实现方法:

可以看到这里 repository 将数据发送给了 receiver,也就是自己,在对应的 accept() 方法中接收到我们想要的 bitmap 后,这张图片也就显示出来了,示例代码中的完整流程也就结束了。

总结一下上述过程:

首先 Repositories.repositoryWithInitialValue() 生成原点 REventSource。

配置完 Observable 之后进入 RFrequency 状态,接着配置数据流的流数。

前面配置完成后,数据流 RFlow 生成,之后通过 getFrom()、mergeIn()、transform() 等方法可进一步进行流式调用;也可以使用 attemptXXX() 方法代替原方法,后面接着调用 orSkip()、orEnd() 进行 error handling 处理。当使用 attemptXXX() 方法时,数据流状态会变为 RTermination,它代表此时的状态已具有终结数据流的能力,是否终结数据流要根据 failed check 触发,结合后面跟着调用的 orSkip()、orEnd(),我们的数据流会从 RTermination 再次切换为 RFlow,以便进行后面的流式调用。

经过前面一系列的流式处理,我们需要结束数据流时,可以选择调用 thenXXX() 方法,对数据流进行最终的处理,处理之后,数据流状态会变为 RConfig;也可以为此行为添加 error handling 处理,选择 thenAttemptXXX() 方法,后面同样接上 orSkip()、orEnd() 即可,最终数据流也会转为 Rconfig 状态。

此时,我们可以在结束前按需要选择对数据流进行最后的配置,例如:调用 onDeactivation() 配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。

一切都部署完毕后,我们 compile() 这个 RConfig,得到最终的成型的 Repository,它具有添加 Updatable、发送数据通知 Receiver 的能力。

我们根据需要添加 Updatable,repository 在数据流处理完成后会通过 update() 发送 event 通知 Updatable。

Updatable 收到通知后则会拉取 repository 的成果数据,并将数据通过 accept() 发送给 Receiver。完成 Push event,pull data 的流程。

看完上述内容,你们对 RxJava 线程切换过程是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计7836字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)