android--RxJava线程调度源码详解

这篇具有很好参考价值的文章主要介绍了android--RxJava线程调度源码详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 从今天起关闭烦恼,开启赚钱模式,别去想那些乱七八糟的破事了,满脑子都是钱不好吗,肤浅又快乐!

目录

前言

 一,IO线程调度

二,安卓主线程调度

 

前言

学习线程调度的源码之前,我们需要先分析RxJava的源码,关于RxJava的源码,请移步文章android--RxJava源码详解-CSDN博客

如果没看上面的文章,直接看这篇文章可能会很难理解。 

 一,IO线程调度

关于RxJava的线程调度,有很多种,这里我们只分析IO线程,因为其他的原理是一样的

首先我们看下怎么使用RxJava切换IO线程:

Observable.create(
        
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("袁震");
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
            }
})
.subscribeOn(        
        Schedulers.io() 
)
.subscribe(
       
        new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Disposable disposable = d;
                Log.d(TAG, "onSubscribe: " +  Thread.currentThread().getName());
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
});

这是一个简单的线程切换的使用,首先我们先分析下代码

Schedulers.io()

看一下它的源码:

@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

首先,它返回了一个Scheduler

然后我们看一下这句代码:

return RxJavaPlugins.onIoScheduler(IO);

通过前面文章的分析,我们知道这句是RxJava的一个hook点,一般我们不手动设置,它是不会执行hook代码的,所以这里就是返回了一个名为IO的Scheduler,然后我们看下这个IO:

@NonNull
static final Scheduler IO;



static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

这个IO是一个Scheduler,然后它的创建是通过:

IO = RxJavaPlugins.initIoScheduler(new IOTask());

我们看下IOTask是什么:

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

这里的IOTask是一个Callable的实现类,Callable是什么,它是一个带有返回值的Runnable,不了解的可以看下文章Android 多线程并发详解_android 多线程并发处理for循环-CSDN博客

 它的返回值是Scheduler,再看看它的返回值IoHolder.DEFAULT:

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

那到这里我们应该大致清楚了,上面那一些逻辑其实就是为了返回一个IoScheduler,下面我们看下IoScheduler:

public final class IoScheduler extends Scheduler 

首先,它是Scheduler的子类。

由此可以推断出其他的策略也都有自己的子类。

IoScheduler里面封装了一系列线程池相关的代码,关于线程池的分析,就不在这里具体分析了

然后我们再回到使用的代码中,通过上篇文章和上面代码的分析,我们可以简单的将使用代码这样理解:

ObservbaleCreate.subscribeOn(IoScheduler).subscribe(观察者)

首先我们先看一下ObservbaleCreate.subscribeOn(IoScheduler):

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

通过这里可以知道,它返回的是ObservableSubscribeOn这个对象,所以使用代码可以进一步理解为:

ObservableSubscribeOn.subscribe(观察者)

然后我们分析下subscribe这句代码的源码:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

还是找到关键代码:

subscribeActual(observer);

它的源码:

protected abstract void subscribeActual(Observer<? super T> observer);

是一个抽象方法,我们可以推断出,ObservableSubscribeOn这个类里面有它的具体实现:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

果然有,在这个方法里,首先将观察者进行了一层包装SubscribeOnObserver,然后调用了onSubscribe方法,也就是我们自定义的观察者的onSubscribe方法,此时还在主线程中,并没有进行线程的切换。

然后我们看最关键的一句代码:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

从最里面往外看,首先看一下SubscribeTask这个类:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);
    }
}

它是一个Runnable,持有一个包装了观察者的SubscribeOnObserver的对象,在run方法中调用了

source.subscribe(parent);

 首先我们要知道source是什么,看过上篇文章的都应知道,source就是我们自己创建的被观察者的参数ObservableOnSubscribe。subscribe方法其实就是:

new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("袁震");
        Log.d(TAG, "自定义source: " + Thread.currentThread().getName());
    }

这里就清楚了,其实就是创建了一个Runnable,并在Run方法里面执行上面的方法。

然后再往外看:

scheduler.scheduleDirect(new SubscribeTask(parent))

这个scheduler,毫无疑问就是我们创建的IoScheduler,然后我们在它的父类Scheduler里面找到了方法scheduleDirect:

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

第一句createWorker:

@NonNull
public abstract Worker createWorker();

是一个抽象方法,我们去它的子类IoScheduler里面找到具体实现:

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

再看下面两句代码:

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);

是将我们的Runnable又进行了封装:

static final class DisposeTask implements Runnable, Disposable {
    final Runnable decoratedRun;
    final Worker w;
    Thread runner;
    DisposeTask(Runnable decoratedRun, Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }
    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            decoratedRun.run();
        } finally {
            dispose();
            runner = null;
        }
    }
    @Override
    public void dispose() {
        if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
            ((NewThreadWorker)w).shutdown();
        } else {
            w.dispose();
        }
    }
    @Override
    public boolean isDisposed() {
        return w.isDisposed();
    }
}

增加了disposed控制,方便管理生命周期。

然后再来看 w.schedule(task, delay, unit)的源码:

@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

其实就是将Runnable放到了线程池中去执行。到此,就切换到了IO线程。

二,安卓主线程调度

接下来我们使用中还会在执行完了subscribe方法之后,切换到主线程中:

Observable.create(
        
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("袁震");
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
            }
})
.subscribeOn(        
        Schedulers.io() 
)
.observeOn(
        
        AndroidSchedulers.mainThread()
)
.subscribe(
       
        new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Disposable disposable = d;
                Log.d(TAG, "onSubscribe: " +  Thread.currentThread().getName());
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
});

AmdroidSchedulers.mainThread()需要添加引用:

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

其原理和上面IO的切换原理相似,不同之处在于它是通过Handler将线程切换到了主线程。

我们看一下AndroidSchedules的源码:

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

它的里面只有一种策略,那就是MAIN_THREAD

可以看到它是创建了一个Handler,并将Handler传递给了HandlerScheduler,然后我们看下HandlerScheduler的源码:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}

首先,它也是继承了Scheduler,然后,它里面的线程切换方法:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");
    if (disposed) {
        return Disposables.disposed();
    }
    run = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    return scheduled;
}

是通过handler来实现的,这是它和其他策略的不同之处,其他策略都是通过线程池来完成的线程切换。这也是android的特有属性。文章来源地址https://www.toymoban.com/news/detail-830540.html

到了这里,关于android--RxJava线程调度源码详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Rxjava源码分析&实践(六)【实践环节:map操作符功能实现】

    Rxjava源码分析实践系列文章目录 Rxjava源码分析实践(一)【RxJava的基本使用】 Rxjava源码分析实践(二)【RxJava基本原理分析之构建流】 Rxjava源码分析实践(三)【RxJava基本原理分析之订阅流】 Rxjava源码分析实践(四)【RxJava基本原理分析之触发流】 Rxjava源码分析实践(五

    2024年02月02日
    浏览(34)
  • Android笔记(十八):面向Compose组件结合Retrofit2和Rxjava3实现网络访问

    Square公司推出的Retrofit2库(https://square.github.io/retrofit/),改变了网络访问的方式。它实现了网络请求的封装。Retrofit库采用回调处理方式,使得通过接口提交请求和相应的参数的配置,就可以获得对应的响应,并可以将响应获得的数据解析成特定的数据格式,例如将JSON数据解

    2024年02月02日
    浏览(32)
  • 【RxJava】RxJava 简介 ( RxJava 概念 | RxJava 组成 - 被观察者 / 观察者 / 订阅 / 操作符 | RxJava 适用场景 )

    RxJava 框架 是一个 异步编程函数库 , 包含了如下要素 : 观察者模式 迭代器模式 函数式编程 RxJava 框架应用场景 : 异步操作 事件流 RxJava 组成要素 : Observable(被观察者): Observable 是一个 可以发送消息的数据源 , 可以同时发送若干消息 , 消息的格式可以通过泛型进行定义 ; 消

    2024年02月06日
    浏览(34)
  • RxJava的前世【RxJava系列之设计模式】

    学习RxJava,少不了介绍它的设计模式。但我看大部分文章,都是先将其用法介绍一通,然后再结合其用法,讲解其设计模式。这样当然有很多好处,但我个人觉得,这种介绍方式,对于没有接触过RxJava的朋友来说,是不太友好的。 而我,更倾向于,先把对设计模式的认知,拉

    2024年02月13日
    浏览(31)
  • Rxjava retryWhen and repeatWhen

    当 Observable 发生错误时接收到 onError 事件,重新发射数据。可以拦截· Throwable 和 Exception 。 重载方法如下: 无条件地、重复发送 被观察者事件.,具备重载方法,可设置重复创建次数 遇到错误时,将发生的错误传递给一个新的被观察者 Observable , 并根据新被观察者发送的事件

    2024年02月10日
    浏览(30)
  • 每月一书(202304)《RxJava2.x实战》

    @ [TOC] 又到了每月一书的时间,本月阅读的是技术相关书籍《 RxJava2.x实战 》,下面分享一下我阅读完后的体会。 本书主要介绍了 RxJava 这个框架,框架版本是2.x。主要内容包含三大部分: 框架的原理和使用方法 框架中各类操作符的使用方法 和其它框架配合使用的方法,这块

    2024年02月01日
    浏览(34)
  • RxJava 响应式获取List 指定名称对象的值

    可以通过 Java 8 的 Stream API 来实现。假设你有一个名为 list 的 List 对象,其中包含多个对象,每个对象都有一个名为 name 的属性和一个名为 value 的属性,你可以使用以下代码来获取名为 targetName 的对象的 value 值:

    2024年02月08日
    浏览(34)
  • 【RxJava】map过程中各个Observable生命周期分析

    首先说下map和flatMap的区别,防止有对RxJava还不够熟悉的小伙伴 map的Function指定如何将A转为B flatMap的Function则指定如何将ObservableA转为ObservableB map和flatMap最终的转换结果都是ObservableB flatMap由于可以自己创建Observable,因此更为强大灵活,map比较简单 下面,我们创建一个Observab

    2024年02月05日
    浏览(30)
  • 将 RxJava 的 Observable 转换为 Reactor 的 Flux流

    如果您想将 RxJava 的 Observable 转换为 Reactor 的 Flux,可以使用 Reactor Adapter 库来实现这一转换。这个库提供了一些静态方法来将 RxJava 类型转换为 Reactor 类型。 以下是一个示例代码,演示了如何将 RxJava 的 Observable 转换为 Reactor 的 Flux:

    2024年02月01日
    浏览(28)
  • Android Pdf第三方框架

    导入AndroidPdfViewer 在 repositories 里面添加库 布局文件引用PDFView PdfViewer 为我们提供了以下几种读取文件的方法: Configurator 主要方法 PDFView 其他方法 github地址: https://github.com/barteksc/AndroidPdfViewer 导入mupdf 在 repositories 里面添加库 mupdf提供了解析代码, Document 读取pdf文件, Androi

    2024年02月13日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包