Rxjava3 全新详解及常用操作符

这篇具有很好参考价值的文章主要介绍了Rxjava3 全新详解及常用操作符。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

RxJava 是一个基于 Java 的响应式编程库,用于处理异步事件流和数据流。它是由 Netflix 开发并开源,现在广泛用于 Android 和 Java 后端开发。RxJava 提供了一种用于组合和处理异步数据的丰富工具集,它的核心思想是将数据流视为一系列事件,以响应事件的方式进行处理。RxJava 提供了丰富的操作符,用于处理和转换数据流。这些操作符可以帮助你执行各种操作,包括过滤、映射、合并、变换等,以便更好地处理异步数据流。

RxJava 原理

  1. Observable 和 Observer:RxJava 的核心是 Observable(可观察对象)和 Observer(观察者)。Observable 表示一个可观察的数据源,它可以发射数据项,而 Observer 用于订阅并监听这些数据项的变化。
  2. 操作符:RxJava 提供了各种操作符,用于对数据流进行转换、过滤、组合和其他处理。操作符可以将一个 Observable 转换成另一个 Observable,从而实现数据的转换和处理。
  3. 流式编程:RxJava 支持链式调用,可以将多个操作符和观察者方法连接在一起,以创建复杂的数据流处理逻辑。
  4. 异步和线程控制:RxJava 允许你轻松处理异步操作,使用 subscribeOnobserveOn 等操作符,可以指定在哪个线程上执行 Observable 和 Observer 的代码。
  5. 背压处理:RxJava 2 引入了背压处理机制,允许 Observable 控制发射的速度,以避免内存溢出和资源泄漏问题。

RxJava 在 Android 中的功能作用:

  1. 简化异步操作:RxJava 简化了异步操作,例如网络请求、数据库访问、文件读写等。你可以使用 Observable 发射异步事件,然后使用 Observer 来处理这些事件。
  2. 响应式UI:RxJava 可以在 Android UI 线程和后台线程之间建立响应式的通信,以便在 UI 上更新数据或执行操作。
  3. 处理多个观察者:RxJava 允许你轻松地将多个观察者订阅到一个 Observable,这对于多个界面元素依赖于相同数据源的情况很有用。
  4. 处理错误和异常:RxJava 提供了各种操作符,用于处理错误和异常,例如 onErrorReturnonErrorResumeNext 等,以确保应用程序能够更健壮地处理异常情况。
  5. 组合和转换数据流:RxJava 提供了丰富的操作符,用于组合和转换数据流。例如,你可以使用 zip 将多个数据流合并,或使用 map 转换数据项的类型。
  6. 事件总线:RxJava 可以用作事件总线,允许不同组件之间进行松散耦合的通信,例如通过 RxBus 发送和接收事件。
  7. 自动管理资源:RxJava 可以帮助你管理资源,例如自动释放订阅,避免内存泄漏。
  8. 测试支持:RxJava 提供了测试支持,可以轻松地测试异步操作,确保应用程序的可靠性。

Rxjava 基本组成

1. 响应式编程: RxJava 的核心思想是响应式编程,它允许你以一种响应事件的方式来处理数据流。你可以订阅一个数据流,然后定义事件处理的方式。当数据项到达时,它们会触发事件,观察者(Observer)会监听这些事件并执行相应的操作。

2. 基本组件: RxJava 的基本组件包括以下几个部分

  • Observable(可观察对象): 表示一个能够发射数据项的数据源。Observable 可以发射零个或多个数据项,以及错误或完成事件。
  • Observer(观察者): 订阅 Observable 并监听发射的事件。Observer 可以处理数据项、错误和完成事件。
  • Flowable (背压可观察对象) :和Observable使用和功能类似,区别是Flowable支持背压,Flowable操作符默认的缓存空间大小128。
  • Subscriber(订阅者): 类似于 Observer,用于订阅 Observable。
  • Operator(操作符): 用于对 Observable 发射的数据流进行变换、过滤、合并等操作。
  • Schedulers(调度器): 用于控制事件的执行线程,例如在主线程或后台线程执行。

3. 操作符: RxJava 提供了大量的操作符,用于操作和转换数据流。这些操作符包括 mapfilterflatMapzipmerge 等,允许你根据需要执行各种操作。

4. 异步和并发: RxJava 简化了异步编程,允许你轻松地处理多线程和并发操作。你可以使用 subscribeOnobserveOn 操作符来指定代码的执行线程,以避免阻塞主线程。

5. 错误处理: RxJava 提供了多种方式来处理错误,包括 onErroronErrorReturnonErrorResumeNext 等,以确保应用程序能够更健壮地处理异常情况。

6. 背压处理: RxJava 2 引入了背压处理机制,用于处理生产者和消费者之间的速率不匹配问题。背压机制允许 Observable 控制数据的发射速率,以避免内存溢出和资源泄漏。

7. 在 Android 中的应用: RxJava 在 Android 开发中广泛应用于处理异步操作,例如网络请求、数据库访问、UI事件响应等。它简化了异步编程,提高了代码的可读性和可维护性,同时提供了更好的性能和响应性。

8. 流式编程: RxJava 支持链式调用,可以将多个操作符和观察者方法连接在一起,以创建复杂的数据流处理逻辑。这使得代码更具表达力和可读性。

Rxjava使用

RxJava 的基本用法: \

  1. 创建一个 Observable,它会发射整数数据。\
  2. 创建一个观察者 Observer,用于订阅这个 Observable
  3. 使用 subscribe 方法将观察者订阅到 Observable 上,观察者会监听 Observable 发射的数据项和事件。
    //创建一个 `Observable`,它会发射整数数据。
    val observable = Observable.create(ObservableOnSubscribe<Int> {
                    for (i in 1..10) {
                        it.onNext(i)
                    }
                    it.onComplete()
                })

    var num: Int;
    var dd : Disposable? = null

    //创建一个观察者 `Observer`,用于订阅这个 `Observable`
    val observer = object:Observer<Int>{
                    override fun onSubscribe(d: Disposable) {//当调用订阅时调用此方法
                        dd = d
                    }

                    override fun onNext(t: Int) {//上游发送数据时调用此方法 即当 Observable 发射数据项时调用
                        num = t
                        if (num == 5){
                            dd!!.dispose()
                        }
                        Log.d(TAG,"接受上游数据:$t")
                    }

                    override fun onError(e: Throwable) {// 当出现错误时调用

                    }

                    override fun onComplete() {// 当 Observable 完成时调用

                    }

                }

    //使用subscribe方法将观察者订阅到 Observable上,观察者会监听Observable发射的数据项和事件。
    observable.subscribe(observer)

也可以使用方法:直接使用函数式编程,把创建后的被观察者通过订阅方法(订阅操作符)把创建观察者作为订阅方法的参数;伪代码:

    Observable.create(ObservableOnSubscribe<Int> {
    				//上游发送数据
                    it.onNext(1)
                    it.onNext(3)
                    it.onNext(5)
                    it.onComplete()
                })
                .subscribe(object :Observer<Int>{
                        override fun onSubscribe(d: Disposable) {

                        }

                        override fun onNext(t: Int) {
    						//下游接受数据
                            Log.i(TAG,"接受到上游数据:$t")
                        }

                        override fun onError(e: Throwable) {

                        }

                        override fun onComplete() {

                        }

                    })

操作符

Rxjava 的使用核心点就是各种操作符的使用;所以以下介绍一些常用的 RxJava 操作符的详细使用示例:

  • 首先操作符的分类:

    创建操作符

    1. just:创建一个发射指定数据项的 Observable。
    2. fromArray:从一个数组或可迭代对象中创建一个 Observable。
    3. create:手动创建一个 Observable。
    4. range:创建一个发射特定整数范围的 Observable。

    转换操作符

    1. map:将数据项转换为另一种类型。
    2. flatMap:将每个数据项映射为一个 Observable,然后将这些 Observables 合并成一个数据流。
    3. concatMap:类似于 flatMap,但保持原始数据项的顺序。
    4. buffer:将数据项分组为列表,并以列表的形式发射。

    过滤操作符

    1. filter:过滤掉不满足条件的数据项。
    2. distinct:过滤掉重复的数据项。
    3. take:仅发射前 N 个数据项。
    4. skip:跳过前 N 个数据项。

    组合操作符

    1. merge:合并多个 Observables 的数据流。
    2. zip:将多个 Observables 的数据项按顺序一对一地合并。
    3. combineLatest:将多个 Observables 最近的数据项合并成一个。

    辅助操作符

    1. subscribe:用于订阅 Observable 并处理数据。
    2. observeOn:指定观察者运行在特定的调度器上。
    3. subscribeOn:指定 Observable 运行在特定的调度器上。
    4. debounce:用于过滤数据流,只保留最新的数据项。
    5. delay:延迟发射数据项。

    错误处理操作符

    1. onErrorReturn:在遇到错误时发射一个默认值。
    2. onErrorResumeNext:在遇到错误时切换到另一个 Observable。
    3. retry:在遇到错误时重试操作。
创建型操作符

创建型用于创建 Observable(被观察者)实例,它们是构建数据流的起点(上游)。

1. Observable.create 这是最通用的创建型操作符。它允许你手动创建一个 Observable,你需要在其中定义数据项的发射逻辑。这个操作符通常用于创建自定义的 Observable。

val observable = Observable.create<String> { emitter ->
    emitter.onNext(1) //手动调用发射逻辑
    emitter.onNext(2)
    emitter.onComplete()//手动调用发射完成的逻辑
}

.subscribe(object :Observer<String>{
            override fun onSubscribe(d: Disposable) {

            }

            override fun onNext(t: String) {
				Log.d("Rxjava ","下游接受到上游的数据:" + t)
            }

            override fun onError(e: Throwable) {

            }

            override fun onComplete() {

            }

        })

或者使用Observer的简化版Consumer
.subscribe(object :Consumer<String>{
                override fun accept(t: String?) {
                    Log.d("Rxjava","下游接受到上游的数据:$t")
                }
            })

2. Observable.just 用于创建一个发射指定数据项的 Observable。它可以发射多个数据项,然后自动完成发射数据项。

val observable = Observable.just(1, 3, 5,7)//just方法参数为可变参数,参数类型为泛型
.subscribe(object :Consumer<String>{
                override fun accept(t: Int) {
                    Log.d("Rxjava","下游接受到上游的数据:$t")
                }
            })

3. Observable.fromArray 从一个数组、可迭代对象或可变参数列表中创建一个 Observable。然后自动完成发射数据项。

val numbers = arrayOf(1, 2, 3)
val observable = Observable.fromArray(numbers)
.subscribe(object :Consumer<Array<Int>>{
                override fun accept(t: Array<Int>?) {

                }
            })

4. Observable.fromIterable: 从一个 Iterable 对象(如 List 或 Set)中创建一个 Observable。

 val list = listOf("A", "B", "C")
 val observable = Observable.fromIterable(list)
            .subscribe(object :Consumer<String>{
                override fun accept(t: String?) {
                    
                }
            })     

5. Observable.interval 创建一个 Observable,定期发射一个递增的长整型数值。

Observable.interval(1, TimeUnit.SECONDS)//重载方法很多  
            .subscribe(object :Consumer<Long>{  
                override fun accept(t: Long?) {

                }
            })

6. Observable.range: 创建一个 Observable,发射一个指定范围内的整数序列。

Observable.range(1,6)
            .subscribe(object :Consumer<Int>{
                override fun accept(t: Int?) {

                }
            })

7. Observable.timer 创建一个 Observable,在指定延迟后发射一个数据项。

Observable.timer(2, TimeUnit.SECONDS)
            .subscribe(object :Consumer<Long>{
                override fun accept(t: Long?) {

                }
            })

这些创建型操作符用于生成不同类型的 Observable,根据需求选择合适的操作符。它们是构建数据流的起点,后续可以使用各种操作符对数据流进行变换、过滤、合并等操作,以满足具体的需求。在实际应用中,你通常会根据场景选择适当的创建型操作符来构建 Observable。

转换操作符

RxJava 的转换操作符用于对 Observable 发射的数据流进行变换、映射和操作。它们允许你以不同的方式处理数据项,以满足特定需求。

map 操作符: map 用于将 Observable 发射的每个数据项转换为另一种数据类型。它的参数是一个函数,该函数将原始数据项转换为新的数据项。

Observable.just(1, 2, 3)
            .map(object :Function<Int,String>{
                override fun apply(t: Int?): String {
                    val str = t.toString()
                    return str
                }
            })
            .subscribe(object:Consumer<String>{
                override fun accept(t: String?) {
                    Log.d(TAG, "Transformed: $t")
                }
            })
    /*subscribe {
                    value ->
                
                    Log.d(TAG, "Transformed: $value")
            }*/

flatMap 操作符: flatMap 用于将每个数据项映射为一个 Observable,然后将这些 Observables 合并成一个单一的数据流。这允许并发处理数据项。

Observable.just(1, 2, 3)
            .flatMap(object:Function<Int ,Observable<String>>{
                override fun apply(t: Int?): Observable<String> {
                    return Observable.just(t.toString())
                }
            })
//            flatMap { number ->
//                Observable.just(number, number * 2)
//            }
            .subscribe { value -> Log.d(TAG, "FlatMapped: $value") }

concatMap 操作符: concatMap 类似于 flatMap,但它保持原始数据项的顺序。它等待前一个 Observable 完成后才处理下一个。

Observable.just(1, 2, 3)
            .concatMap(object:Function<Int ,Observable<String>>{
                override fun apply(t: Int?): Observable<String> {
                    return Observable.just(t.toString())
                }
            })
            /*.concatMap { number ->
                Observable.just(number, number * 2)
            }*/
            .subscribe { value -> 
				Log.d(TAG, "ConcatMapped: $value") 
			}

buffer 操作符: buffer 用于将数据项分组为列表,并以列表的形式发射。你可以指定每个列表中的数据项数量。

Observable.just(1, 2, 3, 4, 5, 6)
            .buffer(2)
            .subscribe(object:Consumer<List<Int>>{
                override fun accept(t: List<Int>?) {
                    Log.d("Rxjava","Buffered: $t")
                }

            })
            /*.subscribe { buffer -> println("Buffered: $buffer") }*/

groupBy 操作符: groupBy 允许你将 Observable 数据项按某个标准进行分组,然后发射多个子 Observable,每个子 Observable 包含一组具有相同标准的数据项。

 Observable.just(1, 2, 3, 4, 5, 6)
                .groupBy(object : Function<Int, String> {
                    override fun apply(t: Int): String {
                        if (t!! % 2 == 0) {
                            return "偶数"
                        }
                        return "奇数"
                    }
                })
                .subscribe(object : Consumer<GroupedObservable<String, Int>> { //注意GroupedObservable<String, Int>参数的类型和Function<Int, String>反过来
                    override fun accept(t: GroupedObservable<String, Int>) { //GroupedObservable被观察者
                        t.subscribe(object : Consumer<Int> {
                            override fun accept(value: Int?) {
                                Log.d("Rxjava", "Group ${t.key}: ${value}")
                            }
                        })
                    }
                })
            
 //lambda表达式
 Observable.just(1, 2, 3, 4, 5, 6)
                .groupBy { it % 2 == 0 }
                .subscribe { group ->
                    group.subscribe { value ->
                        Log.d("Rxjava", "Group ${group.key}: $value")
                    }
                }

scan 操作符: scan 用于将数据项累积成一个中间结果,然后发射这个中间结果。

Observable.just(1, 2, 3, 4, 5)

                .scan(object :BiFunction<Int,Int,Int>{
                    override fun apply(t1: Int, t2: Int): Int {
                        return t1 + t2
                    }
                })
                .subscribe(object:Consumer<Int>{
                    override fun accept(t: Int?) {
                        Log.d("Rxjava","Scanned: $t")
                    }
                })
//                .scan { acc, value -> acc + value }
//                .subscribe { result -> Log.d("Rxjava","Scanned: $result") }

打印结果:
Scanned: 1
Scanned: 3
Scanned: 6
Scanned: 10
Scanned: 15

这些转换操作符可以帮助你根据需要对数据流进行变换、映射和操作。你可以选择合适的操作符来满足具体的业务需求,以便更有效地处理异步数据流。 RxJava 提供了众多其他转换操作符,可以根据实际需求查阅文档来使用。

过滤操作符

RxJava 的过滤操作符用于从 Observable 中过滤、筛选和筛除数据项,以便只保留满足特定条件的数据项。

filter 操作符: filter 用于过滤掉不满足条件的数据项,只保留满足条件的数据项。条件由一个函数决定。

Observable.just(1, 2, 3, 4, 5, 6)
    .filter(object:Predicate<Int>{
        override fun test(t: Int): Boolean {
            return t%2 ==0;
        }
    })
    .subscribe(object :Consumer<Int>{
        override fun accept(value: Int?) {
            Log.d("Rxjava","Filtered: $value")
        }
    })
   /* .filter { it % 2 == 0 }
    .subscribe { value -> Log.d("Rxjava","Filtered: $value") }*/

distinct 操作符: distinct 用于过滤掉重复的数据项,只保留第一次出现的数据项。

//lambda写法
Observable.just(1, 2, 2, 3, 4, 4, 5)
    .distinct()
    .subscribe { value -> println("Distinct: $value") }

distinctUntilChanged 操作符: distinctUntilChanged 用于过滤掉连续重复的数据项,只保留第一次出现的数据项。

//lambda写法
Observable.just(1, 1, 2, 2, 3, 4, 4, 5)
    .distinctUntilChanged()
    .subscribe { value -> println("DistinctUntilChanged: $value") }

take 操作符: take 用于仅发射前 N 个数据项,忽略其余的数据项。

Observable.just(1, 2, 3, 4, 5)
    .take(3)
    .subscribe { value -> println("Taken: $value") }

skip 操作符: skip 用于跳过前 N 个数据项,只发射后续的数据项。

Observable.just(1, 2, 3, 4, 5)
    .skip(2)
    .subscribe { value -> println("Skipped: $value") }

elementAt 操作符: elementAt 用于发射指定索引位置的数据项,忽略其他数据项。

Observable.just(1, 2, 3, 4, 5)
    .elementAt(2)
    .subscribe { value -> println("ElementAt: $value") }

takeLast 操作符: takeLast 用于仅发射最后 N 个数据项,忽略前面的数据项。

Observable.just(1, 2, 3, 4, 5)
    .takeLast(3)
    .subscribe { value -> println("TakeLast: $value") }

这些过滤操作符可以帮助你根据特定条件来过滤和筛选数据流,以满足具体的需求。你可以选择合适的操作符来处理数据流,从而仅保留需要的数据项,而忽略其他数据项。在实际应用中,过滤操作符常用于数据筛选、去重、限制数量等场景,以帮助你更有效地处理异步数据流。 RxJava 还提供了其他过滤操作符,可以根据实际需求查阅文档来使用。

组合操作符

RxJava 的组合操作符用于将多个 Observable 合并、组合或操作,以生成新的 Observable 或数据流。

  1. merge 操作符: merge 用于合并多个 Observables 的数据流,以按照发射顺序合并它们的数据项。这意味着数据项将按照它们发射的顺序合并,不考虑来源 Observable。

    kotlinCopy codeval observable1 = Observable.just(1, 2, 3)
    val observable2 = Observable.just(4, 5, 6)
    
    Observable.merge(observable1, observable2)
        .subscribe { value -> println("Merged: $value") }
    
    
  2. concat 操作符: concat 用于合并多个 Observables 的数据流,但它保持原始 Observables 的顺序,先合并第一个 Observable 的数据,再合并第二个 Observable 的数据,以此类推。

    kotlinCopy codeval observable1 = Observable.just(1, 2, 3)
    val observable2 = Observable.just(4, 5, 6)
    
    Observable.concat(observable1, observable2)
        .subscribe { value -> println("Concatenated: $value") }
    
    
  3. zip 操作符: zip 用于将多个 Observables 的数据项一对一地合并,生成一个新的 Observable。它会按顺序将每个 Observable 的相同索引位置的数据项合并在一起。

    kotlinCopy codeval observable1 = Observable.just("A", "B", "C")
    val observable2 = Observable.just(1, 2, 3)
    
    Observable.zip(observable1, observable2) { str, num -> "$str$num" }
        .subscribe { value -> println("Zipped: $value") }
    
    
  4. combineLatest 操作符: combineLatest 用于将多个 Observables 最近的数据项合并成一个新的 Observable。每当任何一个源 Observable 发射新数据,将使用最近发射的数据项来组合生成新的数据项。

    kotlinCopy codeval observable1 = Observable.interval(300, TimeUnit.MILLISECONDS).map { "A$it" }
    val observable2 = Observable.interval(200, TimeUnit.MILLISECONDS).map { "B$it" }
    
    Observable.combineLatest(observable1, observable2) { a, b -> "$a-$b" }
        .take(5)
        .subscribe { value -> println("Combined: $value") }
    
    
  5. switchOnNext 操作符: switchOnNext 用于在一个 Observable 发射多个 Observables 时,切换到最新的 Observable 并发射它的数据。

    kotlinCopy codeval observables = listOf(
        Observable.just(1, 2, 3),
        Observable.just(4, 5, 6),
        Observable.just(7, 8, 9)
    )
    
    Observable.fromIterable(observables)
        .switchMap { it }
        .subscribe { value -> println("Switched: $value") }
    
    

这些组合操作符允许你将多个 Observables 合并、组合或操作,以满足不同的数据处理需求。你可以根据具体的场景选择合适的操作符,以便有效地处理异步数据流。在实际应用中,组合操作符常用于合并多个数据源,进行数据计算和处理,以及管理多个数据流的交互。 RxJava 还提供了其他组合操作符,可以根据实际需求查阅文档来使用。

错误操作符

RxJava 的错误处理操作符用于处理 Observable 中可能出现的错误和异常情况,以确保应用程序能够更健壮地处理这些问题。

  1. onErrorReturn 操作符: onErrorReturn 用于在 Observable 遇到错误时发射一个默认值,并继续正常的数据流。

    kotlinCopy codeObservable.create<Int> { emitter ->
        emitter.onNext(1)
        emitter.onError(Exception("An error occurred"))
    }
    .onErrorReturn { error -> 0 }
    .subscribe(
        { value -> println("Received: $value") },
        { error -> println("Error: ${error.message}") }
    )
    
    
  2. onErrorResumeNext 操作符: onErrorResumeNext 用于在 Observable 遇到错误时切换到另一个 Observable,并继续发射数据。

    kotlinCopy codeval sourceObservable = Observable.create<Int> { emitter ->
        emitter.onNext(1)
        emitter.onError(Exception("An error occurred"))
    }
    
    val fallbackObservable = Observable.just(2, 3, 4)
    
    sourceObservable
        .onErrorResumeNext(fallbackObservable)
        .subscribe(
            { value -> println("Received: $value") },
            { error -> println("Error: ${error.message}") }
        )
    
    
  3. retry 操作符: retry 用于在 Observable 遇到错误时重试操作,指定重试次数。如果重试次数用尽仍有错误,错误会传递给观察者。

    kotlinCopy codevar attempts = 0
    
    Observable.create<Int> { emitter ->
        if (attempts < 3) {
            attempts++
            emitter.onError(Exception("An error occurred"))
        } else {
            emitter.onNext(1)
            emitter.onComplete()
        }
    }
    .retry(3)
    .subscribe(
        { value -> println("Received: $value") },
        { error -> println("Error: ${error.message}") }
    )
    
    
  4. retryWhen 操作符: retryWhen 允许你自定义错误重试策略。你可以在 retryWhen 中返回一个 Observable,用于控制重试次数和时机。

    kotlinCopy codevar attempts = 0
    
    Observable.create<Int> { emitter ->
        if (attempts < 3) {
            attempts++
            emitter.onError(Exception("An error occurred"))
        } else {
            emitter.onNext(1)
            emitter.onComplete()
        }
    }
    .retryWhen { errors ->
        errors.flatMap { error ->
            if (attempts < 3) {
                Observable.timer(1, TimeUnit.SECONDS)
            } else {
                Observable.error(error)
            }
        }
    }
    .subscribe(
        { value -> println("Received: $value") },
        { error -> println("Error: ${error.message}") }
    )
    
    
  5. onErrorComplete 操作符: onErrorComplete 用于在 Observable 遇到错误时忽略错误,不传递给观察者,直接完成 Observable。

    kotlinCopy codeObservable.create<Int> { emitter ->
        emitter.onNext(1)
        emitter.onError(Exception("An error occurred"))
    }
    .onErrorComplete()
    .subscribe(
        { value -> println("Received: $value") },
        { error -> println("Error: ${error.message}") }
    )
    

这些错误处理操作符允许你在 Observable 遇到错误和异常情况时采取不同的处理策略,以确保你的应用程序能够更好地应对异常情况。你可以根据具体的需求选择合适的操作符,以提高应用程序的可靠性和健壮性。 RxJava 还提供了其他错误处理操作符,可以根据实际需求查阅文档来使用。

总结:以上都是同步操作,即被观察者和观察者都是在默认线程中执行(Android的主线程)

其他常用操作符

  1. subscribeOn 操作符: subscribeOn 用于指定 Observable 的创建和订阅操作运行在指定的线程。通常用于将耗时的任务移到后台线程执行,以避免阻塞主线程。

    kotlinCopy codeObservable.create<Int> { emitter ->
        // 在 IO 线程执行任务
        emitter.onNext(1)
        emitter.onComplete()
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { value -> println("Received on main thread: $value") }
    
    
  2. observeOn 操作符: observeOn 用于指定观察者(Observer)接收数据的线程。它允许你将数据切换到主线程,以更新 UI,或切换到其他线程执行特定操作。

    kotlinCopy codeObservable.just(1, 2, 3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { value -> println("Received on main thread: $value") }
    
    
  3. observeOn(io.reactivex.rxjava3.schedulers.Scheduler) 除了 Android 主线程调度器 (AndroidSchedulers.mainThread()),RxJava 还提供了其他内置的调度器,如 Schedulers.io()Schedulers.computation() 等,用于切换到不同的线程池执行任务。

    kotlinCopy codeObservable.just(1, 2, 3)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .subscribe { value -> println("Received on computation thread: $value") }
    
    
  4. subscribeOnobserveOn 组合: 通常,你需要结合使用 subscribeOnobserveOn 来控制 Observable 的创建和订阅线程以及观察者接收数据的线程。

    kotlinCopy codeObservable.create<Int> { emitter ->
        // 在 IO 线程执行任务
        emitter.onNext(1)
        emitter.onComplete()
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { value -> println("Received on main thread: $value") }
    
    
  5. doOnSubscribedoOnNext 操作符: 这些操作符用于在特定事件发生时执行操作,例如,使用 doOnSubscribe 可以在订阅发生时切换线程。

    kotlinCopy codeObservable.just(1, 2, 3)
        .doOnSubscribe { println("Subscribed on ${Thread.currentThread().name}") }
        .subscribeOn(Schedulers.io())
        .doOnNext { value -> println("Processing on ${Thread.currentThread().name}: $value") }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { value -> println("Received on main thread: $value") }
    
    

这些线程切换操作符允许你在 RxJava 中灵活地控制数据流的线程调度,以满足不同场景下的性能和响应需求。你可以根据具体的需求和场景来选择合适的线程切换操作符,以优化应用程序的性能和用户体验。 RxJava 还提供其他线程切换操作符和自定义调度器的功能,可以根据实际需求查阅文档来使用。

Flowable背压模式

Flowable是 RxJava 中用于处理背压的一种特殊类型的 Observable。它被设计用来处理生产者产生数据速度快于消费者处理数据速度的情况,从而防止数据积累和内存溢出。以下是关于Flowable` 的详细说明:

  1. 支持背压: Flowable 是 RxJava 2 中引入的一种 Observable 类型,它支持背压。这意味着它内置了背压处理机制,可以确保在数据流速度不同步的情况下能够安全地传递数据。
  2. 背压策略: Flowable 提供了多种背压策略,可以通过 onBackpressureXXX() 操作符来指定。一些常见的背压策略包括:
    • onBackpressureBuffer(): 缓冲数据并等待消费者处理。
    • onBackpressureDrop(): 丢弃过多的数据,只保留最新的数据。
    • onBackpressureLatest(): 保留最新的数据项,丢弃其余的数据。
    • onBackpressureError(): 当数据流速度太快时,抛出异常以通知消费者处理背压问题。
  3. 创建 Flowable 你可以使用 Flowable.create 或其他创建方法来创建 Flowable。通常,在创建 Flowable 时需要使用 BackpressureStrategy 来指定背压策略。
Flowable.create<Int>({ emitter ->
        for (i in 1..10000) {
            emitter.onNext(i)
        }
    }, BackpressureStrategy.BUFFER)
    .observeOn(Schedulers.io())
    .subscribe { value -> println("Received: $value") }
  1. 使用 Flowable 的消费者: 消费 Flowable 的观察者(Observer)通常需要使用 onBackpressureXXX 操作符来处理背压问题。消费者也可以通过调用 request 方法来请求数据。

    Flowable.create<Int>({ emitter ->
        for (i in 1..10000) {
            emitter.onNext(i)
        }
    }, BackpressureStrategy.BUFFER)
        .onBackpressureBuffer()
        .observeOn(Schedulers.io())
        .subscribe(object : FlowableSubscriber<Int> {
            override fun onSubscribe(subscription: Subscription) {
                subscription.request(10) // 请求处理的数据项数量
            }
    
            override fun onNext(value: Int) {
                println("Received: $value")
            }
    
            override fun onError(throwable: Throwable) {
                println("Error: ${throwable.message}")
            }
    
            override fun onComplete() {
                println("Completed")
            }
        })
    
    
  2. 注意事项: 使用 Flowable 需要注意,消费者需要明确请求数据,否则数据不会被传递。你可以通过调用 request 方法来请求数据,或者在订阅时使用 onBackpressureXXX 操作符来自动请求数据。

Flowable 是处理背压问题的有效工具,适用于处理大量数据或高频率事件时,以确保数据流能够稳定和高效地传递。选择合适的背压策略和请求数据的数量是使用 Flowable 的关键因素,以满足应用程序的需求。

注意使用Flowable (被观察者)时观察者使用subscriber

rxjava3,开源库,javascript,android,kotlin文章来源地址https://www.toymoban.com/news/detail-812776.html

到了这里,关于Rxjava3 全新详解及常用操作符的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C语言位操作符常用

    1, 交换两个变量的值 2, 求二进制中1的个数: 3, 求二进制中0的个数 4,求一个数的绝对值 5, 求一个数的相反数 6, 判断一个数的奇偶性 7,求两个数的平均数 8,从无符号类型x的第p位开始, 取n位数

    2024年02月09日
    浏览(33)
  • 【c语言操作符系列1】^(异或操作符)讲解和多种例题详解

    目录 一、^ 是什么(^称为异或) 二、^的规律(特点) 三、可利用^秒杀的常见例题(重点) 1、消失的数字  2、不一样的人生密码 3、交换两个数(不能创建中间变量) 4、找出只出现一个的两个数字 是一种操作符, 针对二进制异或而言的 ,两个数对应的二进制位相同,异或

    2024年02月16日
    浏览(69)
  • 【C】操作符详解

    今天给大家带来一篇关于C语言操作符的详细介绍,在C语言中操作符主要有以下几种: 1.算术操作符 2.移位操作符 3.位操作符 4.赋值操作符 5.单目操作符 6.关系操作符 7.逻辑操作符 8.逗号表达式 9.条件操作符 9.下标引用,函数调用和结构成员 接下来给大家详细介绍! + - * /

    2024年02月12日
    浏览(37)
  • 操作符(超详解)

    算数操作符 移位操作符 位操作符 赋值操作符 单目操作符 关系操作符 逻辑操作符 条件操作符 逗号操作符 除了 % 操作符之外 , 其他的几个操作符 可以作用于 整数 和 浮点数 。 对于 / 操作符如果两个操作数都为 整数 ,执行 整数除法 。而只要有 浮点数 执行的就是 浮点数

    2024年02月15日
    浏览(41)
  • 操作符详解(1)

    算术操作符 移位操作符 位操作符 赋值操作符 单目操作符 关系操作符 逻辑操作符 条件操作符 逗号表达式 下标引用、函数调用和结构成员 表达式求值 算术操作符分为:+、-、*、/、% 除了 % 操作符之外,其他的几个操作符可以作用于整数和浮点数。 对于 / 操作符如果两个操

    2024年02月09日
    浏览(42)
  • 【C语言】操作符----详解

    💓博客主页:江池俊的博客 ⏩收录专栏:C语言初阶之路 👉其他专栏:数据结构探索 💻代码仓库:江池俊的代码仓库 🍁 如果觉得博主的文章还不错的话,请点赞👍收藏🌟 三连支持一下博主💞 目录 操作符分类:  💨 算术操作符  💨 移位操作符  原码、反码、补码

    2024年02月14日
    浏览(41)
  • c语言---操作符(详解)

    算术操作符: + 、- 、*、/ 、% 移位操作符: 位操作符: | ^ ` 赋值操作符: = 、+= 、 -= 、 = 、 /= 、%= 、= 、= 、= 、|= 、^= 单⽬操作符: !、++、–、、 、+、-、~ 、sizeof、(类型) 关系操作符: 、= 、 、= 、 == 、 != 逻辑操作符: 、|| 条件操作符: ? : 逗号表达式: , 下标引⽤: [

    2024年02月22日
    浏览(47)
  • C语言:“~”操作符详解

    ~:含义及用法举例 文章目录 前言 一、“~”是什么? 二、原码、反码及补码 1.简介 2.用法举例 ①正整数 ②负整数 ③零 三、“~”操作符 1.简介 2.用法举例 ①正整数 ②负整数 ③零 本文在简单介绍原码反码补码的基础上,介绍“~”操作符的用法。 ~:一种单目操作符,即对

    2024年02月10日
    浏览(53)
  • C语言---操作符详解

    算术操作符 移位操作符 位操作符 赋值操作符 单目操作符 关系操作符 逻辑操作符 条件操作符 逗号操作符 下标引用、函数调用和结构成员。 移位操作符,移动的是二进制位。 警告:对于移位运算符,不要移动负位数,这个是标准定义的。 3.1.1、原码、反码、补码介绍 整数

    2024年02月07日
    浏览(41)
  • 【C语言】操作符详解

    👦个人主页:Weraphael ✍🏻作者简介:目前正在回炉重造C语言(2023暑假) ✈️专栏:【C语言航路】 🐋 希望大家多多支持,咱一起进步!😁 如果文章对你有帮助的话 欢迎 评论💬 点赞👍🏻 收藏 📂 加关注😍 算术操作符 位移操作符 位操作符 赋值操作符 单目操作符 关

    2024年02月16日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包