RxJava源码解析

本文中的源码基于 RxJava1
RxJava : io.reactivex:rxjava:1.3.4

以下是各个 Part 主要分析源码的方向

  • Part 1: Observable, Observable.OnSubscribe, Subscriber
  • Part 2: map
  • Part 3: subscribeOn
  • Part 4: observeOn

Part 1

先来看一个最简单的 RxJava Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi rxjava");
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(String s) {
System.out.println(s);
}
});

那么,我们就从 Observable.create 的角度开始分析 RxJava 内部源码的实现。

1
2
3
4
5
6
7
8
9
10
public class Observable<T> {

@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

...

}

可以看到,我们上面传入 OnSubscribe 对象 f 被 RxJavaHooks 包装了一下。但是默认情况下的 RxJavaHooks.onCreate 返回的就是 f 本身。

1
2
3
4
5
6
7
8
9
10
11
public final class RxJavaHooks {

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}

}

接着,根据上面 demo 的代码可以看出,创建出来的 Observable 对象又调用了 subscribe 方法。

1
2
3
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

在 subscribe 内部调用了 subscribe(Subscriber<? super T> subscriber, Observable<T> observable) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// 一开始是对参数的校验
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/

}

// 调用 subscriber.onStart 方法,默认是空实现
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/

// if not already wrapped
// 把 subscriber 包装成 SafeSubscriber
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}

// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// 开始执行 onSubscribe 的 onCall 方法
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// 如果发生了异常,就调用 subscriber 的 onError 方法
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}

我们详细的来看下 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber) 这句代码。

1
2
3
4
5
6
7
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}

和上面的 RxJavaHooks 一样,默认的 hook 只是返回了 onSubscribe 对象。所以这句代码就可以“简化”为 onSubscribe.call(subscriber)

也就是执行了 demo 中的 call 方法。

1
2
3
4
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi rxjava");
}

而在 call 方法中又调用了 subscriber 的 onNext 方法。

还记得上面 subscriber 被包装成 SafeSubscriber 了吗?

所以这里就会调用 SafeSubscriber.onNext 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void onNext(T t) {
try {
if (!done) {
// actual就是我们自己定义的subscriber
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}

在 SafeSubscriber.onNext 方法内,会调用真正的 subscriber.onNext 方法。SafeSubscriber 的作用就是是为了防止被调用 onCompleted 之后再重新调用 onNext 。换句话说,SafeSubscriber 就是为了防止重用。

因此,subscriber.onNext 也就被执行了。

1
2
3
4
@Override
public void onNext(String s) {
System.out.println(s);
}

Part 2

在这里,我们把上面简单的 demo 稍微增加一点难度,中间加一个转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi rxjava");
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + "hahaha";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(String s) {
System.out.println(s);
}
});

可以看到,中间加了一层 map 操作符。

所以我们来分析一下 map 中到底干了什么。

1
2
3
4
5
6
7
public class Observable<T> {

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

}

map 中调用了 unsafeCreate 方法。我们来看看 unsafeCreate 方法内部

1
2
3
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

这代码多么似曾相识啊,和上面的 Observable.create 比较一下,发现 Observable.unsafeCreate 和 Observable.create 的逻辑是一样的。

所以我们可以知道, map 操作符内部会重新创建一个 Observable ,而这个 Observable 的 OnSubscribe 是一个 OnSubscribeMap 对象。

1
2
3
4
5
6
7
8
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source; // 原来我们自己创建的 Observable
this.transformer = transformer; // Fun1 转换
}

}

所以我们在这里可以小结一下,map 操作符会创建一个新的 Observable 对象,并且它的 OnSubscribe 是一个 OnSubscribeMap 对象,而我们自己的 Observable 会保存在 OnSubscribeMap 里。

再回头看看上面的 demo ,发现 map 创建出来的 Observable 对象调用了 subscribe 方法。在 Part 1 中我们分析过,调用 subscribe 方法内部其实就是会去调用 Observable 中 OnSubscribe 的 call 方法。

所以,我们直接来看 OnSubscribeMap 的 call 方法。

1
2
3
4
5
6
7
8
@Override
public void call(final Subscriber<? super R> o) { // o 就是我们自定义的 Subscriber
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
// source 就是我们自己创建的 Observable
// 和 subscribe 相比,unsafeSubscribe 内部不会对参数校验,subscriber 不会包装成 SafeSubscriber
source.unsafeSubscribe(parent);
}

call 方法内部创建了一个新的 MapSubscriber 对象,

1
2
3
4
5
6
7
static final class MapSubscriber<T, R> extends Subscriber<T> {

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual; // actual 就是我们自定义的 Subscriber
this.mapper = mapper; // mapper 就是 Func1 转换
}
}

然后让我们自己的 Observable 去 subscribe 这个 MapSubscriber 对象。

那么接着代码就会执行到 MapSubscriber.call 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onNext(T t) {
R result;

try {
// 执行 Func1 转换器 实现从 T 到 R 的转换
// 在 demo 中就是在 hi rxjava 后面追加 hahaha
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
// 执行我们自定义的 Subscriber 的 onNext 方法
actual.onNext(result);
}

到这里,整个 map 操作符的流程就讲完了。不明白的同学可以对照着源码多读几遍,相信你会明白的。

献上官方对 map 操作符的示意图

map

Part 3

这一小节来看看 subscribeOn 操作,先来看 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi rxjava");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(String s) {
System.out.println(s);
}
});

先来看看 Schedulers.io() 中到底干了什么。

1
2
3
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

其实就是获取 Schedulers 单例中的 ioScheduler 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
current.shutdownInstance();
}
}
}

private Schedulers() {
@SuppressWarnings("deprecation")
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

Scheduler c = hook.getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
}

Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}

Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}

可以看到,ioScheduler 默认实现是 RxJavaSchedulersHook.createIoScheduler()

1
2
3
4
5
6
7
8
9
10
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}

默认创建一个新的 CachedThreadScheduler 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

@Override
public void start() {
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}

在 CachedWorkerPool 构造方法内部会去创建线程池。

然后回过头来看 subscribeOn 方法的内部代码:

1
2
3
4
5
6
7
8
9
10
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

发现逻辑都是类似的,也是创建了一个新的 Observable , 而对应的 OnSubscribe 是一个 OperatorSubscribeOn 对象。

1
2
3
4
5
6
7
8
9
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source; // 真正的 Observable
this.requestOn = requestOn;
}

}

按照以往的惯例,最后 subscribe 的时候肯定会调用 OperatorSubscribeOn 的 call 方法,所以我们直接去看 call 方法。

1
2
3
4
5
6
7
8
9
10
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
// 创建时传入真正的 subscriber
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);

inner.schedule(parent);
}

scheduler.createWorker() 是可以理解为在新的工作线程中去做某一个动作(Action0)。前面说过,这里的 scheduler 是 CachedThreadScheduler 类型,所以 createWorker 就是创建了一个 EventLoopWorker 对象。

1
2
3
4
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

然后调用 inner.schedule 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// threadWorker.scheduleActual 就是调用了线程池去执行这个 action0
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
// 这里的 action 就是上面的 SubscribeOnSubscriber 对象
// 因此 SubscribeOnSubscriber 的 call 方法就在工作线程中被调用了
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}

接下来就到 SubscribeOnSubscriber 的 call 方法中看看。

1
2
3
4
5
6
7
8
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
// 把真正的 Observable subscribe 到 SubscribeOnSubscriber 中
src.unsafeSubscribe(this);
}

最后,真正的 Observable 调用 call 方法时,会调用 SubscribeOnSubscriber 的 onNext 方法。

1
2
3
4
5
@Override
public void onNext(T t) {
// actual 是真正我们自定义的 Subscriber
actual.onNext(t);
}

SubscribeOnSubscriber 中的 onNext 方法再把参数传给真正的 Subscriber 。

到这里,就把 subscribeOn 切换线程的原理讲完了。

Part 4

讲完了 subscribeOn ,再来看 observeOn 会简单很多。还是先来个 demo 吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi rxjava");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(String s) {
System.out.println(s);
}
});

先分析 AndroidSchedulers.mainThread() 。

1
2
3
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}

和 Schedulers.io 类似,也是去获取 AndroidSchedulers.mainThreadScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static AndroidSchedulers getInstance() {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}

private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
// 传入主线程的 looper
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}

LooperScheduler 的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
class LooperScheduler extends Scheduler {
private final Handler handler;

LooperScheduler(Looper looper) {
handler = new Handler(looper);
}

LooperScheduler(Handler handler) {
this.handler = handler;
}

}

内部就是创建了主线程的 Handler 。然后利用 Handler 去发送消息就行了。

那么我们就来看看 observeOn 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

可以看到 observeOn 内部利用了 lift ,那么什么是 lift 呢?

1
2
3
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

原来 lift 内部也是去创建一个新的 Observable ,而且 Observable.OnSubscribe 是一个 OnSubscribeLift 对象。套路都是相似的,不一样的就是需要额外传入一个 Operator 对象。从上面可知, Operator 就是一个 OperatorObserveOn 对象。

接着就去 OnSubscribeLift 的 call 方法中看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void call(Subscriber<? super R> o) {
try {
// 执行 operator.call 方法,返回一个 Subscriber 对象
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
// 这里的 parent 就是我们真正的 onSubscribe
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}

关键代码 RxJavaHooks.onObservableLift(operator).call(o); ,可以猜到默认hook就是返回 operator 本身。那么我们到 operator.call 中看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}

从前面的代码可以知道,这里是 scheduler 是 AndroidScheduler 。所以这里会返回一个 ObserveOnSubscriber 对象。从上面的代码可知,返回了 ObserveOnSubscriber 对象之后,会调用 parent.call(st) 。这里的 parent 就是最原始,也就是我们自定义的 Observable 。所以最后代码就走到了 ObserveOnSubscriber 的 onNext 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {


@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 这里的 recursiveScheduler 就是 LooperScheduler
// 注意这里 schedule 的参数是 this !!!
recursiveScheduler.schedule(this);
}
}

}

可以看到,这里会调用 LooperScheduler 来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public Subscription schedule(final Action0 action) {
// 这里的 action 参数就是 ObserveOnSubscriber 对象
return schedule(action, 0, TimeUnit.MILLISECONDS);
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

action = hook.onSchedule(action);
// ScheduledAction 实现了 Runnable 接口
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
// 之后该 message 会在主线程中取出,然后执行 ScheduledAction
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.

handler.sendMessageDelayed(message, unit.toMillis(delayTime));

if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}

return scheduledAction;
}

LooperScheduler.schedule 主要做的就是构造出 message ,然后利用 Handler 把 message 发送到主线程中去执行。所以接着代码就到了 ScheduledAction.run 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class ScheduledAction implements Runnable, Subscription {

@Override public void run() {
try {
// 这里的 action 参数就是 ObserveOnSubscriber 对象
// 所以会调用 ObserveOnSubscriber.call
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}

}

ScheduledAction.run 是在主线程中运行的,而 run 方法中调用了 action.call(); 。action 其实就是原来那个 ObserveOnSubscriber 对象。

所以代码再次跳转到 ObserveOnSubscriber.call 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;

// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
// 这里的 child 就是真正的我们自定义的 Subscriber
final Subscriber<? super T> localChild = this.child;

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)

for (;;) {
long requestAmount = requested.get();

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}
// 在主线程中调用了真正的 Subscriber 的 onNext 方法
localChild.onNext(NotificationLite.<T>getValue(v));

currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}

到这里,observeOn 实现切换线程的原理就讲完了。基本的 RxJava 操作中的源码也都讲了一遍。至于其他的操作符后面有空再讲吧。

俞其荣 wechat
欢迎订阅我的微信公众号来获取我的动态!
坚持原创技术分享,您的支持将鼓励我继续创作!