本文中的源码基于 RxJava1
RxJava : io.reactivex:rxjava:1.3.4
以下是各个 Part 主要分析源码的方向
- Part 1: Observable, Observable.OnSubscribe, Subscriber
- Part 2: map
- Part 3: subscribeOn
- Part 4: observeOn
Part 1
先来看一个最简单的 RxJava Demo
1 | Observable.create(new Observable.OnSubscribe<String>() { |
那么,我们就从 Observable.create 的角度开始分析 RxJava 内部源码的实现。
1 | public class Observable<T> { |
可以看到,我们上面传入 OnSubscribe 对象 f 被 RxJavaHooks 包装了一下。但是默认情况下的 RxJavaHooks.onCreate 返回的就是 f 本身。
1 | public final class RxJavaHooks { |
接着,根据上面 demo 的代码可以看出,创建出来的 Observable 对象又调用了 subscribe 方法。
1 | public final Subscription subscribe(Subscriber<? super T> subscriber) { |
在 subscribe 内部调用了 subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
方法
1 | static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { |
我们详细的来看下 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)
这句代码。
1 | public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { |
和上面的 RxJavaHooks 一样,默认的 hook 只是返回了 onSubscribe 对象。所以这句代码就可以“简化”为 onSubscribe.call(subscriber)
。
也就是执行了 demo 中的 call 方法。
1 | @Override |
而在 call 方法中又调用了 subscriber 的 onNext 方法。
还记得上面 subscriber 被包装成 SafeSubscriber 了吗?
所以这里就会调用 SafeSubscriber.onNext 方法。
1 | @Override |
在 SafeSubscriber.onNext 方法内,会调用真正的 subscriber.onNext 方法。SafeSubscriber 的作用就是是为了防止被调用 onCompleted 之后再重新调用 onNext 。换句话说,SafeSubscriber 就是为了防止重用。
因此,subscriber.onNext 也就被执行了。
1 | @Override |
Part 2
在这里,我们把上面简单的 demo 稍微增加一点难度,中间加一个转换:
1 | Observable.create(new Observable.OnSubscribe<String>() { |
可以看到,中间加了一层 map 操作符。
所以我们来分析一下 map 中到底干了什么。
1 | public class Observable<T> { |
map 中调用了 unsafeCreate 方法。我们来看看 unsafeCreate 方法内部
1 | public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) { |
这代码多么似曾相识啊,和上面的 Observable.create 比较一下,发现 Observable.unsafeCreate 和 Observable.create 的逻辑是一样的。
所以我们可以知道, map 操作符内部会重新创建一个 Observable ,而这个 Observable 的 OnSubscribe 是一个 OnSubscribeMap 对象。
1 | public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { |
所以我们在这里可以小结一下,map 操作符会创建一个新的 Observable 对象,并且它的 OnSubscribe 是一个 OnSubscribeMap 对象,而我们自己的 Observable 会保存在 OnSubscribeMap 里。
再回头看看上面的 demo ,发现 map 创建出来的 Observable 对象调用了 subscribe 方法。在 Part 1 中我们分析过,调用 subscribe 方法内部其实就是会去调用 Observable 中 OnSubscribe 的 call 方法。
所以,我们直接来看 OnSubscribeMap 的 call 方法。
1 | @Override |
call 方法内部创建了一个新的 MapSubscriber 对象,
1 | static final class MapSubscriber<T, R> extends Subscriber<T> { |
然后让我们自己的 Observable 去 subscribe 这个 MapSubscriber 对象。
那么接着代码就会执行到 MapSubscriber.call 方法。
1 | @Override |
到这里,整个 map 操作符的流程就讲完了。不明白的同学可以对照着源码多读几遍,相信你会明白的。
献上官方对 map 操作符的示意图
Part 3
这一小节来看看 subscribeOn 操作,先来看 demo
1 | Observable.create(new Observable.OnSubscribe<String>() { |
先来看看 Schedulers.io() 中到底干了什么。
1 | public static Scheduler io() { |
其实就是获取 Schedulers 单例中的 ioScheduler 。
1 | private static Schedulers getInstance() { |
可以看到,ioScheduler 默认实现是 RxJavaSchedulersHook.createIoScheduler()
1 | public static Scheduler createIoScheduler() { |
默认创建一个新的 CachedThreadScheduler 。
1 | public CachedThreadScheduler(ThreadFactory threadFactory) { |
在 CachedWorkerPool 构造方法内部会去创建线程池。
然后回过头来看 subscribeOn 方法的内部代码:
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
发现逻辑都是类似的,也是创建了一个新的 Observable , 而对应的 OnSubscribe 是一个 OperatorSubscribeOn 对象。
1 | public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { |
按照以往的惯例,最后 subscribe 的时候肯定会调用 OperatorSubscribeOn 的 call 方法,所以我们直接去看 call 方法。
1 | @Override |
scheduler.createWorker() 是可以理解为在新的工作线程中去做某一个动作(Action0)。前面说过,这里的 scheduler 是 CachedThreadScheduler 类型,所以 createWorker 就是创建了一个 EventLoopWorker 对象。
1 | @Override |
然后调用 inner.schedule 。
1 | @Override |
接下来就到 SubscribeOnSubscriber 的 call 方法中看看。
1 | @Override |
最后,真正的 Observable 调用 call 方法时,会调用 SubscribeOnSubscriber 的 onNext 方法。
1 | @Override |
SubscribeOnSubscriber 中的 onNext 方法再把参数传给真正的 Subscriber 。
到这里,就把 subscribeOn 切换线程的原理讲完了。
Part 4
讲完了 subscribeOn ,再来看 observeOn 会简单很多。还是先来个 demo 吧
1 | Observable.create(new Observable.OnSubscribe<String>() { |
先分析 AndroidSchedulers.mainThread() 。
1 | public static Scheduler mainThread() { |
和 Schedulers.io 类似,也是去获取 AndroidSchedulers.mainThreadScheduler
1 | private static AndroidSchedulers getInstance() { |
LooperScheduler 的构造方法
1 | class LooperScheduler extends Scheduler { |
内部就是创建了主线程的 Handler 。然后利用 Handler 去发送消息就行了。
那么我们就来看看 observeOn 方法。
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
可以看到 observeOn 内部利用了 lift ,那么什么是 lift 呢?
1 | public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { |
原来 lift 内部也是去创建一个新的 Observable ,而且 Observable.OnSubscribe 是一个 OnSubscribeLift 对象。套路都是相似的,不一样的就是需要额外传入一个 Operator 对象。从上面可知, Operator 就是一个 OperatorObserveOn 对象。
接着就去 OnSubscribeLift 的 call 方法中看看。
1 | @Override |
关键代码 RxJavaHooks.onObservableLift(operator).call(o);
,可以猜到默认hook就是返回 operator 本身。那么我们到 operator.call 中看看。
1 | @Override |
从前面的代码可以知道,这里是 scheduler 是 AndroidScheduler 。所以这里会返回一个 ObserveOnSubscriber 对象。从上面的代码可知,返回了 ObserveOnSubscriber 对象之后,会调用 parent.call(st)
。这里的 parent 就是最原始,也就是我们自定义的 Observable 。所以最后代码就走到了 ObserveOnSubscriber 的 onNext 方法中。
1 | static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { |
可以看到,这里会调用 LooperScheduler 来处理。
1 | @Override |
LooperScheduler.schedule 主要做的就是构造出 message ,然后利用 Handler 把 message 发送到主线程中去执行。所以接着代码就到了 ScheduledAction.run 中。
1 | static final class ScheduledAction implements Runnable, Subscription { |
ScheduledAction.run 是在主线程中运行的,而 run 方法中调用了 action.call();
。action 其实就是原来那个 ObserveOnSubscriber 对象。
所以代码再次跳转到 ObserveOnSubscriber.call 方法中。
1 | // only execute this from schedule() |
到这里,observeOn 实现切换线程的原理就讲完了。基本的 RxJava 操作中的源码也都讲了一遍。至于其他的操作符后面有空再讲吧。