共计 7836 个字符,预计需要花费 20 分钟才能阅读完成。
今天就跟大家聊聊有关 RxJava 线程切换过程是怎样的,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
线程切换过程
下面我们就来看看它的又一利器,调度器 Scheduler:就像我们所知道的,Scheduler 是给 Observable 数据流添加多线程功能所准备的,一般我们会通过使用 subscribeOn()、observeOn() 方法传入对应的 Scheduler 去指定数据流的每部分操作应该以何种方式运行在何种线程。对于我们而言,最常见的莫过于在非主线程获取并处理数据之后在主线程更新 UI 这样的场景了:
这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰,我们现在来看看这其中的线程切换流程。
subscribeOn()
当我们调用 subscribeOn() 的时候:
可以看到这里也是调用了 create() 去生成一个 Observable,而 OperatorSubscribeOn 则是实现了 OnSubscribe 接口,同时将原始的 Observable 和我们需要的 scheduler 传入:
可以看出来,这里对 subscriber 的处理与前文中 OperatorMap 中 call() 对 subscriber 的处理很相似。在这里我们同样会根据传入的 subscriber 构造出新的 Subscribers,不过这一系列的过程大部分都是由 worker 通过 schedule() 去执行的,从后面 setProducer() 中对于线程的判断,再结合 subscribeOn() 方法的目的我们能大概推测出,这个 worker 在一定程度上就相当于一个新线程的代理执行者,schedule() 所实现的与 Thread 类中 run() 应该十分类似。我们现在来看看这个 worker 的执行过程。
首先从 Schedulers.io() 进入:
这个通过 hook 拿到 scheduler 的过程我们先不管,直接进 CachedThreadScheduler,看它的 createWorker() 方法:
这里的 pool 是一个原子变量引用 AtomicReference,所持有的则是 CachedWorkerPool,因而这个 pool 顾名思义就是用来保存 worker 的缓存池啦,我们从缓存池里拿到需要的 worker 并作了一层封装成为 EventLoopWorker:
在这里我们终于发现目标 ThreadWorker,它继承自 NewThreadWorker,之前的 schedule() 方法最终都会到这个 scheduleActual() 方法里:
这里我们看到了 executor 线程池,我们用 Schedulers.io() 最终实现的线程切换的本质就在这里了。现在再结合之前的过程我们从头梳理一下:
在 subscribeOn() 时,我们会新生成一个 Observable,它的成员 onSubscribe 会在目标 Subscriber 订阅时使用传入的 Scheduler 的 worker 作为线程调度执行者,在对应的线程中通知原始 Observable 发送消息给这个过程中临时生成的 Subscriber,这个 Subscriber 又会通知到目标 Subscriber,这样就完成了 subscribeOn() 的过程。
observeOn()
下面我们接着来看看 observeOn():
我们直接看最终调用的部分,可以看到这里又是一个 lift(),在这里传入了 OperatorObserveOn,它与 OperatorSubscribeOn 不同,是一个 Operator(Operator 的功能我们上文中已经讲过就不赘述了),它构造出了新的观察者 ObserveOnSubscriber 并实现了 Action0 接口:
可以看出来,这里 ObserveOnSubscriber 所有的发送给目标 Subscriber child 的消息都被切换到了 recursiveScheduler 的线程作处理,也就达到了将线程切回的目的。
总结 observeOn() 整体流程如下:
对比 subscribeOn() 和 observeOn() 这两个过程,我们不难发现两者的区别:subscribeOn() 将初始 Observable 的订阅事件整体都切换到了另一个线程;而 observeOn() 则是将初始 Observable 发送的消息切换到另一个线程通知到目标 Subscriber。前者把“订阅 + 发送”的切换了一个线程,后者把“发送”切换了一个线程。所以,我们的代码中所实现的功能其实是:
这样就能很容易实现耗时任务在子线程操作,在主线程作更新操作等这些常见场景的功能啦。
4. 其他角色
Subject
Subject 在 Rx 系列是一个比较特殊的角色,它继承了 Observable 的同时也实现了 Observer 接口,也就是说它既可作为观察者,也可作为被观察者,他一般被用来作为连接多个不同 Observable、Observer 之间的纽带。可能你会奇怪,我们不是已经有了像 map()、flatMap() 这类的操作符去变化 Observable 数据流了吗,为什么还要引入 Subject 这个东西呢?这是因为 Subject 所承担的工作并非是针对 Observable 数据流内容的转换连接,而是数据流本身在 Observable、Observer 之间的调度。光这么说可能还是很模糊,我们举个《RxJava Essentials》中的例子:
我们通过 create() 创建了一个 PublishSubject,观察者成功订阅了这个 subject,然而这个 subject 却没有任何数据要发送,我们只是知道他未来会发送的会是 String 值而已。之后,当我们调用 subject.onNext() 时,消息才被发送,Observer 的 onNext() 被触发调用,输出了 Hello World。
这里我们注意到,当订阅事件发生时,我们的 subject 是没有产生数据流的,直到它发射了 Hello World,数据流才开始运转,试想我们如果将订阅过程和 subject.onNext() 调换一下位置,那么 Observer 就一定不会接受到 Hello World 了(这不是废话吗 - -|||),因而这也在根本上反映了 Observable 的冷热区别。
一般而言,我们的 Observable 都属于 Cold Observables,就像看视频,每次点开新视频我们都要从头开始播放;而 Subject 则默认属于 Hot Observables,就像看直播,视频数据永远都是新的。
基于这种属性,Subject 自然拥有了对接收到的数据流进行选择调度等的能力了,因此,我们对于 Subject 的使用也就通常基于如下的思路:
在前面的例子里我们用到的是 PublishSubject,它只会把在订阅发生的时间点之后来自原始 Observable 的数据发射给观察者。等一下,这功能听起来是不是有些似曾相识呢?
没错,就是 EventBus 和 Otto。(RxJava 的出现慢慢让 Otto 退出了舞台,现在 Otto 的 Repo 已经是 Deprecated 状态了,而 EventBus 依旧坚挺)基于 RxJava 的观察订阅取消的能力和 PublishSubject 的功能,我们十分容易就能写出实现了最基本功能的简易事件总线框架:
当然 Subject 还有其他如 BehaviorSubject、ReplaySubject、AsyncSubject 等类型,大家可以去看官方文档,写得十分详细,这里就不介绍了。
三. 后记
前面相信最近这段日子里,提到 RxJava,大家就会想到 Google 最近刚刚开源的 Agera。Agera 作为专门为 Android 打造的 Reactive Programming 框架,难免会被拿来与 RxJava 做对比。本文前面 RxJava 的主体流程分析已近尾声,现在我们再来看看 Agera 这东东又是怎么一回事。
首先先上结论:
Agera 最初是为了 Google Play Movies 而开发的一个内部框架,现在开源出来了,它虽然是在 RxJava 之后才出现,但是完全独立于 RxJava,与它没有任何关系(只不过开源的时间十分微妙罢了 233333)。与 RxJava 比起来,Agera 更加专注于 Android 的生命周期,而 RxJava 则更加纯粹地面向 Java 平台而非 Android。
也许你可能会问:“那么 RxAndroid 呢,不是还有它吗?”事实上,RxAndroid 早在 1.0 版本的时候就进行了很大的重构,很多模块被拆分到其他的项目中去了,同时也删除了部分代码,仅存下来的部分多是和 Android 线程相关的部分,比如 AndroidSchedulers、MainThreadSubscription 等。鉴于这种情况,我们暂且不去关注 RxAndroid,先把目光放在 Agera 上。
同样也是基于观察者模式,Agera 和 RxJava 的角色分类大致相似,在 Agera 中,主要角色有两个:Observable(被观察者)、Updatable(观察者)。
是的,相较于 RxJava 中的 Observable,Agera 中的 Observable 只是一个简单的接口,也没有范性的存在,Updatable 亦是如此,这样我们要如何做到消息的传递呢?这就需要另外一个接口了:
终于看到了泛型 T,我们的消息的传递能力就是依赖于此接口了。所以我们将这个接口和基础的 Observable 结合一下:
这里的 Repository T 在一定程度上就是我们想要的 RxJava 中的 Observable T 啦。类似地,Repository 也有两种类型的实现:
Direct – 所包含的数据总是可用的或者是可被同步计算出来的;一个 Direct 的 Repository 总是处于活跃(active)状态下
Deferred – 所包含的数据是异步计算或拉去所得;一个 Deffered 的 Repository 直到有 Updatable 被添加进来之前都会是非活跃(inactive)状态下
是不是感到似曾相识呢?没错,Repository 也是有冷热区分的,不过我们现在暂且不去关注这一点。回到上面接着看,既然现在发数据的角色有了,那么我们要如何接收数据呢?答案就是 Receiver:
相信看到这里,大家应该也隐约感觉到了:在 Agera 的世界里,数据的传输与事件的传递是相互隔离开的,这是目前 Agera 与 Rx 系列的最大本质区别。Agera 所使用的是一种 push event, pull data 的模型,这意味着 event 并不会携带任何 data,Updatable 在需要更新时,它自己会承担起从数据源拉取数据的任务。这样,提供数据的责任就从 Observable 中拆分了出来交给了 Repository,让其自身能够专注于发送一些简单的事件如按钮点击、一次下拉刷新的触发等等。
那么,这样的实现有什么好处呢?
当这两种处理分发逻辑分离开时,Updatable 就不必观察到来自 Repository 的完整数据变化的历史,毕竟在大多数场景下,尤其是更新 UI 的场景下,最新的数据往往才是有用的数据。
但是我就是需要看到变化的历史数据,怎么办?
不用担心,这里我们再请出一个角色 Reservoir:
顾名思义,Reservoir 就是我们用来存储变化中的数据的地方,它继承了 Receiver、Repository,也就相当于同时具有了接收数据,发送数据的能力。通过查看其具体实现我们可以知道它的本质操作都是使用内部的 Queue 实现的:通过 accept() 接收到数据后入列,通过 get() 拿到数据后出列。若一个 Updatable 观察了此 Reservoir,其队列中发生调度变化后即将出列的下一个数据如果是可用的(非空),就会通知该 Updatable,进一步拉取这个数据发送给 Receiver。
现在,我们已经大概了解了这几个角色的功能属性了,接下来我们来看一段官方示例代码:
是不是有些云里雾里的感觉呢?多亏有注释,我们大概能够猜出到底上面都做了什么:使用需要的图片规格作为参数拼接到 url 中,拉取对应的图片并用 ImageView 显示出来。我们结合 API 来看看整个过程:
Repositories.repositoryWithInitialValue(Result.absent())
创建一个可运行(抑或说执行)的 repository。
初始化传入值是 Result,它用来概括一些诸如 apply()、merge() 的操作的结果的不可变对象,并且存在两种状态 succeeded()、failed()。
返回 REventSource
observe()
用于添加新的 Observable 作为更新我们的图片的 Event source,本例中不需要。
返回 RFrequency
onUpdatesPerLoop()
在每一个 Looper Thread loop 中若有来自多个 Event Source 的 update() 处理时,只需开启一个数据处理流。
返回 RFlow
getFrom(new Supplier(…))
忽略输入值,使用来自给定 Supplier 的新获取的数据作为输出值。
返回 RFlow
goTo(executor)
切换到给定的 executor 继续数据处理流。
attemptTransform(function())
使用给定的 function() 变换输入值,若变换失败,则终止数据流;若成功,则取新的变换后的值作为当前流指令的输出。
返回 RTermination
orSkip()
若前面的操作检查为失败,就跳过剩下的数据处理流,并且不会通知所有已添加的 Updatable。
thenTransform(function())
与 attemptTransform(function()) 相似,区别在于当必要时会发出通知。
返回 RConfig
onDeactivation(SEND_INTERRUPT)
用于明确 repository 不再 active 时的行为。
返回 RConfig
compile()
执行这个 repository。
返回 Repository
整体流程乍看起来并没有什么特别的地方,但是真正的玄机其实藏在执行每一步的返回值里:
初始的 REventSource T, T 代表着事件源的开端,它从传入值接收了 T initialValue,这里的中,第一个 T 是当前 repository 的数据的类型,第二个 T 则是数据处理流开端的时候的数据的类型。
之后,当 observe() 调用后,我们传入事件源给 REventSource,相当于设定好了需要的事件源和对应的开端,这里返回的是 RFrequency T, T,它继承自 REventSource,为其添加了事件源的发送频率的属性。
之后,我们来到了 onUpdatesPerLoop(),这里明确了所开启的数据流的个数(也就是前面所讲的频率)后,返回了 RFlow,这里也就意味着我们的数据流正式生成了。同时,这里也是流式调用的起点。
拿到我们的 RFlow 之后,我们就可以为其提供数据源了,也就是前面说的 Supplier,于是调用 getFrom(),这样我们的数据流也就真正意义拥有了数据“干货”。
有了数据之后我们就可以按具体需要进行数据转换了,这里我们可以直接使用 transform(),返回 RFlow,以便进一步进行流式调用;也可以调用 attemptTransform() 来对可能出现的异常进行处理,比如 orSkip()、orEnd() 之后继续进行流式调用。
经过一系列的流式调用之后,我们终于对数据处理完成啦,现在我们可以选择先对成型的数据在做一次最后的包装 thenTransform(),或是与另一个 Supplier 合并 thenMergeIn() 等。这些处理之后,我们的返回值也就转为了 RConfig,进入了最终配置和 repository 声明结束的状态。
在最终的这个配置过程中,我们调用了 onDeactivation(),为这个 repository 明确了最终进入非活跃状态时的行为,如果不需要其他多余的配置的话,我们就可以进入最终的 compile() 方法了。当我们调用 compile() 时,就会按照前面所走过的所有流程与配置去执行并生成这个 repository。到此,我们的 repository 才真正被创建了出来。
以上就是 repository 从无到有的全过程。当 repository 诞生后,我们也就可以传输需要的数据啦。再回到上面的示例代码:
我们在 onResume()、onPause() 这两个生命周期下分别添加、移除了 Updatable。相较于 RxJava 中通过 Subscription 去取消订阅的做法,Agera 的这种写法显然更为清晰也更为整洁。我们的 Activity 实现了 Updatable 和 Receiver 接口,直接看其实现方法:
可以看到这里 repository 将数据发送给了 receiver,也就是自己,在对应的 accept() 方法中接收到我们想要的 bitmap 后,这张图片也就显示出来了,示例代码中的完整流程也就结束了。
总结一下上述过程:
首先 Repositories.repositoryWithInitialValue() 生成原点 REventSource。
配置完 Observable 之后进入 RFrequency 状态,接着配置数据流的流数。
前面配置完成后,数据流 RFlow 生成,之后通过 getFrom()、mergeIn()、transform() 等方法可进一步进行流式调用;也可以使用 attemptXXX() 方法代替原方法,后面接着调用 orSkip()、orEnd() 进行 error handling 处理。当使用 attemptXXX() 方法时,数据流状态会变为 RTermination,它代表此时的状态已具有终结数据流的能力,是否终结数据流要根据 failed check 触发,结合后面跟着调用的 orSkip()、orEnd(),我们的数据流会从 RTermination 再次切换为 RFlow,以便进行后面的流式调用。
经过前面一系列的流式处理,我们需要结束数据流时,可以选择调用 thenXXX() 方法,对数据流进行最终的处理,处理之后,数据流状态会变为 RConfig;也可以为此行为添加 error handling 处理,选择 thenAttemptXXX() 方法,后面同样接上 orSkip()、orEnd() 即可,最终数据流也会转为 Rconfig 状态。
此时,我们可以在结束前按需要选择对数据流进行最后的配置,例如:调用 onDeactivation() 配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。
一切都部署完毕后,我们 compile() 这个 RConfig,得到最终的成型的 Repository,它具有添加 Updatable、发送数据通知 Receiver 的能力。
我们根据需要添加 Updatable,repository 在数据流处理完成后会通过 update() 发送 event 通知 Updatable。
Updatable 收到通知后则会拉取 repository 的成果数据,并将数据通过 accept() 发送给 Receiver。完成 Push event,pull data 的流程。
看完上述内容,你们对 RxJava 线程切换过程是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。