共计 2977 个字符,预计需要花费 8 分钟才能阅读完成。
本篇文章为大家展示了 RxJava 的响应流程分析基本调用流程是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
一. 用法
首先来看一个简单的例子:
运行结果为:
从结果中我们不难看出整体的调用流程:
首先通过调用 Observable.create() 方法生成一个被观察者,紧接着在这里我们又调用了 map() 方法对原被观察者进行数据流的变换操作,生成一个新的被观察者(为何是新的被观察者后文会讲),最后调用 subscribe() 方法,传入我们的观察者,这里观察者订阅的则是调用 map() 之后生成的新被观察者。
在整个过程中我们会注意到三个主角:Observable、OnSubscribe、Subscriber,所有的操作都是围绕它们进行的。不难看出这里三个角色的分工:
Observable:被观察者的来源,亦或说是被观察者本身
OnSubscribe:用来通知观察者的不同行为
Subscriber:观察者,通过实现对应方法来产生具体的处理。
所以接下来我们以这三个角色为中心来分析具体的流程。
二. 分析 1. 订阅过程
首先我们进入 Observable.create() 看看:
这里调用构造函数生成了一个 Observable 对象并将传入的 OnSubscribe 赋给自己的成员变量 onsubscribe,等等,这个 hook 是从哪里冒出来的?我们向上找:
RxJavaObservableExecutionHook 这个抽象 Proxy 类默认对 OnSubscribe 对象不做任何处理,不过通过继承该类并重写 onCreate() 等方法我们可以对这些方法对应的时机做一些额外处理比如打 Log 或者一些数据收集方面的工作。
到目前最初始的被观察者已经生成了,我们再来看看观察者这边。我们知道通过调用 observable.subscribe() 方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:
这里我们略去部分无关代码看主要部分,subscribe.onStart() 默认空实现我们暂且不用管它,对于传进来的 subscriber 要包装成 SafeSubscriber,这个 SafeSubscriber 对原来的 subscriber 的一系列方法做了更完善的处理,包括:onError() 与 onCompleted() 只会有一个被执行;保证一旦 onError() 或者 onCompleted() 被执行,将不再能再执 onNext() 等情况。这里封装为 SafeSubscriber 之后,调用 onSubscribe.call(),并将 subscriber 传入,这样就完成了一次订阅。
显而易见,Subscriber 作为观察者,在订阅行为完成后,其具体行为在整个链式调用中起着至关重要的作用,我们来看看它内部的构成的主要部分:
每个 Subscriber 都持有一个 SubscriptionList,这个 list 保存的是所有该观察者的订阅事件,同时 Subscriber 也对应实现了 Subscription 接口,当这个 Subscriber 取消订阅的时候会将持有事件列表中的所有 Subscription 取消订阅,并且从此不再接受任何订阅事件。同时,通过 Producer 可以去限定该 Subscriber 所接收的数据流的总量,这个限制量其实是加在 Subscriber.onNext() 方法上的,onComplete()、onError() 则不会受到其影响。因为是底层抽象类,onNext()、onComplete()、onError() 统一不在这里处理。
2. 变换过程
在收到 Observable 的消息之前我们有可能会对数据流进行处理,例如 map()、flatMap()、deBounce()、buffer() 等方法,本例中我们用了 map() 方法,它接收了原被观察者发射的数据并将通过该方法返回的结果作为新的数据发射出去,相当于做了一层中间转化:
我们接着看这个转化过程:
这里是通过一个 lift() 方法实现的,再查看其他的转化方法发现内部也都使用 lift() 实现的,看来这个 lift() 就是关键所在了,不过不急,我们先来看看这个 OperationMap 是什么:
OperationMap 实现了 Operator 接口的 call() 方法,该方法接受外部传入的观察者,并将其作为参数构造出了一个新的观察者,我们不难发现 o.onNext(transformer.call(t)); 这一句起了至关重要的作用,这里的接口 transformer 将泛型 T 转化为泛型 R:
这样之后,再将转换后的数据传回至原观察者的 onNext() 方法,就完成了观察数据流的转化,但是你应该也注意到了,我们用来做转换的这个新的观察者并没有实现订阅被观察者的操作,这个订阅操作又是在哪里实现的呢?答案就是接下来的 lift():
在这里我们新生成了一个 Observable 对象,在这个新对象的 onSubscribe 成员的 call() 方法中我们通过 operator.call() 拿到之前生成的未产生订阅的观察者 st,之后将它作为参数传入一开始的 onSubscribe.call() 中,即完成了这个中间订阅的过程。
现在我们将整个流程梳理一下:
一次 map() 变换
根据 Operator 实例生成新的 Subscriber
通过 lift() 生成新的 Observable
原 Subscriber 订阅新的 Observavble
新的 Observable 中 onSubscribe 通知新 Subscriber 订阅原 Observable
新 Subscriber 将消息传给原 Subscriber。
为了便于理解,这里借用一下扔物线的图:
以上就是一次 map() 变换的流程,事实上多次 map() 也是同样道理:最外层的目标 Subscriber 发生订阅行为后,onSubscribe.onNext() 会逐层嵌套调用,直至初始 Observable 被最底层的 Subscriber 订阅,通过 Operator 的一层层变化将消息传到目标 Subscriber。再次祭出扔物线的图:
至于其他的多种变化的实现流程也都很类似,借助于 Operator 的不同实现来达到变换数据流的目的。例如其中的 flatMap(),它需要进行两次 lift(),其中第二次是 OperationMerge,将转换成的每一个 Observable 数据流通过 InnerSubscriber 这个纽带订阅后,在 InnerSubscriber 的 onNext() 中拿到 R,再通过传入的 parent(也就是原 MergeSubscriber)将它们全部发射(emit)出去,由最外层我们传入的 Subscriber 统一接收,这样就完成了 T = Observable R = R 的转化。
除此之外,还有许多各式各样的操作符,如果它们还不能满足你的需要,你也可以通过实现 Operator 接口定制新的操作符。灵活运用它们往往能达到事半功倍的效果,比如通过使用 sample()、debounce() 等操作符有效避免 backpressure 的需要等等,这里就不一一介绍了。
上述内容就是 RxJava 的响应流程分析基本调用流程是怎样的,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注丸趣 TV 行业资讯频道。