从今天起关闭烦恼,开启赚钱模式,别去想那些乱七八糟的破事了,满脑子都是钱不好吗,肤浅又快乐!
目录
前言
一,IO线程调度
二,安卓主线程调度
前言
学习线程调度的源码之前,我们需要先分析RxJava的源码,关于RxJava的源码,请移步文章android--RxJava源码详解-CSDN博客
如果没看上面的文章,直接看这篇文章可能会很难理解。
一,IO线程调度
关于RxJava的线程调度,有很多种,这里我们只分析IO线程,因为其他的原理是一样的
首先我们看下怎么使用RxJava切换IO线程:
Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("袁震");
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
}
})
.subscribeOn(
Schedulers.io()
)
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Disposable disposable = d;
Log.d(TAG, "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这是一个简单的线程切换的使用,首先我们先分析下代码
Schedulers.io()
看一下它的源码:
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
首先,它返回了一个Scheduler
然后我们看一下这句代码:
return RxJavaPlugins.onIoScheduler(IO);
通过前面文章的分析,我们知道这句是RxJava的一个hook点,一般我们不手动设置,它是不会执行hook代码的,所以这里就是返回了一个名为IO的Scheduler,然后我们看下这个IO:
@NonNull
static final Scheduler IO;
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
这个IO是一个Scheduler,然后它的创建是通过:
IO = RxJavaPlugins.initIoScheduler(new IOTask());
我们看下IOTask是什么:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
这里的IOTask是一个Callable的实现类,Callable是什么,它是一个带有返回值的Runnable,不了解的可以看下文章Android 多线程并发详解_android 多线程并发处理for循环-CSDN博客
它的返回值是Scheduler,再看看它的返回值IoHolder.DEFAULT:
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
那到这里我们应该大致清楚了,上面那一些逻辑其实就是为了返回一个IoScheduler,下面我们看下IoScheduler:
public final class IoScheduler extends Scheduler
首先,它是Scheduler的子类。
由此可以推断出其他的策略也都有自己的子类。
IoScheduler里面封装了一系列线程池相关的代码,关于线程池的分析,就不在这里具体分析了
然后我们再回到使用的代码中,通过上篇文章和上面代码的分析,我们可以简单的将使用代码这样理解:
ObservbaleCreate.subscribeOn(IoScheduler).subscribe(观察者)
首先我们先看一下ObservbaleCreate.subscribeOn(IoScheduler):
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
通过这里可以知道,它返回的是ObservableSubscribeOn这个对象,所以使用代码可以进一步理解为:
ObservableSubscribeOn.subscribe(观察者)
然后我们分析下subscribe这句代码的源码:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
还是找到关键代码:
subscribeActual(observer);
它的源码:
protected abstract void subscribeActual(Observer<? super T> observer);
是一个抽象方法,我们可以推断出,ObservableSubscribeOn这个类里面有它的具体实现:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
果然有,在这个方法里,首先将观察者进行了一层包装SubscribeOnObserver,然后调用了onSubscribe方法,也就是我们自定义的观察者的onSubscribe方法,此时还在主线程中,并没有进行线程的切换。
然后我们看最关键的一句代码:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
从最里面往外看,首先看一下SubscribeTask这个类:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它是一个Runnable,持有一个包装了观察者的SubscribeOnObserver的对象,在run方法中调用了
source.subscribe(parent);
首先我们要知道source是什么,看过上篇文章的都应知道,source就是我们自己创建的被观察者的参数ObservableOnSubscribe。subscribe方法其实就是:
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("袁震");
Log.d(TAG, "自定义source: " + Thread.currentThread().getName());
}
这里就清楚了,其实就是创建了一个Runnable,并在Run方法里面执行上面的方法。
然后再往外看:
scheduler.scheduleDirect(new SubscribeTask(parent))
这个scheduler,毫无疑问就是我们创建的IoScheduler,然后我们在它的父类Scheduler里面找到了方法scheduleDirect:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
第一句createWorker:
@NonNull
public abstract Worker createWorker();
是一个抽象方法,我们去它的子类IoScheduler里面找到具体实现:
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
再看下面两句代码:
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
是将我们的Runnable又进行了封装:
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
增加了disposed控制,方便管理生命周期。
然后再来看 w.schedule(task, delay, unit)的源码:
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
其实就是将Runnable放到了线程池中去执行。到此,就切换到了IO线程。
二,安卓主线程调度
接下来我们使用中还会在执行完了subscribe方法之后,切换到主线程中:
Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("袁震");
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
}
})
.subscribeOn(
Schedulers.io()
)
.observeOn(
AndroidSchedulers.mainThread()
)
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Disposable disposable = d;
Log.d(TAG, "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
AmdroidSchedulers.mainThread()需要添加引用:
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
其原理和上面IO的切换原理相似,不同之处在于它是通过Handler将线程切换到了主线程。
我们看一下AndroidSchedules的源码:
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
它的里面只有一种策略,那就是MAIN_THREAD
可以看到它是创建了一个Handler,并将Handler传递给了HandlerScheduler,然后我们看下HandlerScheduler的源码:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
首先,它也是继承了Scheduler,然后,它里面的线程切换方法:文章来源:https://www.toymoban.com/news/detail-830540.html
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
是通过handler来实现的,这是它和其他策略的不同之处,其他策略都是通过线程池来完成的线程切换。这也是android的特有属性。文章来源地址https://www.toymoban.com/news/detail-830540.html
到了这里,关于android--RxJava线程调度源码详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!