Rxjava retryWhen and repeatWhen

这篇具有很好参考价值的文章主要介绍了Rxjava retryWhen and repeatWhen。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

retry

Observable发生错误时接收到onError事件,重新发射数据。可以拦截·Throwable 和 Exception

重载方法如下:

	// 一直错误,一直重试
public final Observable<T> retry() {
    return retry(Long.MAX_VALUE, Functions.alwaysTrue());
}
// 最大重试的次数
public final Observable<T> retry(long times) {
    return retry(times, Functions.alwaysTrue());
}
// 重试条件
public final Observable<T> retry(Predicate<? super Throwable> predicate) {
    return retry(Long.MAX_VALUE, predicate);
}
// 重试次数和条件
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate) {
    if (times < 0) {
        throw new IllegalArgumentException("times >= 0 required but it was " + times);
    }
    ObjectHelper.requireNonNull(predicate, "predicate is null");

    return RxJavaPlugins.onAssembly(new ObservableRetryPredicate<T>(this, times, predicate));
}

public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
    ObjectHelper.requireNonNull(predicate, "predicate is null");

    return RxJavaPlugins.onAssembly(new ObservableRetryBiPredicate<T>(this, predicate));
}

Repeat

无条件地、重复发送 被观察者事件.,具备重载方法,可设置重复创建次数


 public final Observable<T> repeat() {
     return repeat(Long.MAX_VALUE);
 }

public final Observable<T> repeat(long times) {
    if (times < 0) {
        throw new IllegalArgumentException("times >= 0 required but it was " + times);
    }
    if (times == 0) {
        return empty();
    }
    return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}

RetryWhen

遇到错误时,将发生的错误传递给一个新的被观察者 Observable, 并根据新被观察者发送的事件,决定是否需要重新订阅原始被观察者Observable & 发送事件

分为两种情况

  1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:该异常错误信息可在观察者中的onError()中获得
  2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件。
Observable.create(new ObservableOnSubscribe<Integer>() {

       @Override
       public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
           emitter.onNext(1);
           emitter.onNext(2);
           emitter.onNext(3);
           emitter.onError(new Exception("error happen."));
           emitter.onNext(4);
       }
   })
   // 上游遇到error时回调
   .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
       @Override
       public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
           // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
           // 返回Observable<?> = 新的被观察者 Observable(任意类型)
           // throwableObservable 必须被处理,不然只会发送上游发送error事件
           return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
               @Override
               public ObservableSource<?> apply(Throwable throwable) throws Exception {
                  /**
                      * 1. emit error. 不会重新发射数据。异常传递到观察的的onError中
                      * 10:54:26.148 com...mple.test_android  D  接收到了事件1
                      * 10:54:26.148 com...mple.test_android  D  接收到了事件2
                      * 10:54:26.148 com...mple.test_android  D  接收到了事件3
                      * 10:54:26.148 com...mple.test_android  D  对Error事件作出响应java.lang.Throwable: retry stop!
                      */
               		//  return Observable.error(new Throwable("retry stop!"));

                    /**
                     * 2. emit onNext
                     * 原始的Observable则重新发送数据
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件1
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件2
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件3
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件1
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件2
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件3
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件1
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件2
                     * 10:57:22.759 com...mple.test_android  D  接收到了事件3
                     */
                    return Observable.just(true);
               }
           });
       }
   }).subscribe(new Observer<Integer>() {
       @Override
       public void onSubscribe(Disposable d) {

       }

       @Override
       public void onNext(Integer value) {
           Log.d(TAG, "接收到了事件" + value);
       }

       @Override
       public void onError(Throwable e) {
           Log.d(TAG, "对Error事件作出响应" + e.toString());
           // 获取异常错误信息
       }

       @Override
       public void onComplete() {
           Log.d(TAG, "对Complete事件作出响应");
       }
   });

RepeatWhen

有条件地、重复发送 被观察者事件。 将原始 Observable 停止发送事件的标识(Complete() / Error())。转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable文章来源地址https://www.toymoban.com/news/detail-690171.html

  1. 若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
  2. 若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
Observable.just(1, 2, 3, 4)
   .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
       @Override
       // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
       public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {

           return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
               @Override
               public ObservableSource<?> apply(Object o) throws Exception {
                   /**
                    *  1. 发送onComplete事件,不会重新发送原来的数据 但不会回调观察者的onComplete()
                    * 11:03:43.908 com...mple.test_android  D  开始采用subscribe连接
                    * 11:03:43.908 com...mple.test_android  D  接收到了事件1
                    * 11:03:43.908 com...mple.test_android  D  接收到了事件2
                    * 11:03:43.908 com...mple.test_android  D  接收到了事件3
                    * 11:03:43.908 com...mple.test_android  D  接收到了事件4 
                    */
                   //return Observable.empty();

                   /**
                    * 2. 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
                    *  * 11:05:38.118 com...mple.test_android  D  开始采用subscribe连接
                    *  * 11:05:38.119 com...mple.test_android  D  接收到了事件1
                    *  * 11:05:38.119 com...mple.test_android  D  接收到了事件2
                    *  * 11:05:38.119 com...mple.test_android  D  接收到了事件3
                    *  * 11:05:38.119 com...mple.test_android  D  接收到了事件4
                    *  * 11:05:38.121 com...mple.test_android  D  对Error事件作出响应:java.lang.Throwable: repeat when stop!
                    */
                   //return Observable.error(new Throwable("repeat when stop!"));

                   /**
                    * 3.若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                    * 11:07:23.876 com...mple.test_android  D  开始采用subscribe连接
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件1
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件2
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件3
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件4
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件1
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件2
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件3
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件4
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件1
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件2
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件3
                    * 11:07:23.877 com...mple.test_android  D  接收到了事件4
                    */
                   return Observable.just(1);
               }
           });
       }
   }).subscribe(new Observer<Integer>() {
       @Override
       public void onSubscribe(Disposable d) {
           Log.d(TAG, "开始采用subscribe连接");
       }

       @Override
       public void onNext(Integer value) {
           Log.d(TAG, "接收到了事件" + value);
       }

       @Override
       public void onError(Throwable e) {
           Log.d(TAG, "对Error事件作出响应:" + e.toString());
       }

       @Override
       public void onComplete() {
           Log.d(TAG, "对Complete事件作出响应");
       }

   });

RetryWhen 和 RepeatWhen组合完成轮询请求

private int i = 0;
public void repeatAndRetryWhen() {
	Observable.create(new ObservableOnSubscribe<Integer>() {
	
	            @Override
	            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
	                emitter.onNext(1);
	                //emitter.onError(new Throwable("error happened!!")); // error走retryWhen
	                emitter.onNext(3);
	                emitter.onComplete(); // 顺利完成走repeatWhen
	            }
	        }).repeat()
	        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
	            @Override
	            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
	                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
	                    @Override
	                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
	                        // 超出最大请求次数或者这个throwable是结束条件,发送onError传递到下游
	                        if (i > 4) {
	                            return Observable.error(new Throwable("stop retry!"));
	                        }
	                        // 延迟5s后进行重试
	                        return Observable.just(1).delay(5, TimeUnit.SECONDS);
	                    }
	                });
	            }
	        }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
	            @Override
	            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
	            	// 正常结束后10s开始轮询
	                return objectObservable.delay(10, TimeUnit.SECONDS);
	            }
	        }).doFinally(new Action() {
	            @Override
	            public void run() throws Exception {
	                Log.d(TAG, "Finally!!");
	            }
	        })
	        .subscribe(new Observer<Integer>() {
	            @Override
	            public void onSubscribe(Disposable d) {
	                Log.d(TAG, "开始采用subscribe连接");
	            }
	
	            @Override
	            public void onNext(Integer value) {
	                i++;
	                Log.d(TAG, "接收到了事件" + value);
	            }
	
	            @Override
	            public void onError(Throwable e) {
	                Log.d(TAG, "对Error事件作出响应:" + e.toString());
	            }
	
	            @Override
	            public void onComplete() {
	                Log.d(TAG, "对Complete事件作出响应");
	            }
	
	        });
	}

12:53:10.226 com...mple.test_android  D  开始采用subscribe连接
12:53:10.238 com...mple.test_android  D  接收到了事件1
12:53:15.242 com...mple.test_android  D  接收到了事件1
12:53:20.245 com...mple.test_android  D  接收到了事件1
12:53:25.248 com...mple.test_android  D  接收到了事件1
12:53:30.253 com...mple.test_android  D  接收到了事件1
12:53:30.281 com...mple.test_android  DError事件作出响应:java.lang.Throwable: stop retry!
12:53:30.281 com...mple.test_android  D  Finally!!

到了这里,关于Rxjava retryWhen and repeatWhen的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Android笔记(十八):面向Compose组件结合Retrofit2和Rxjava3实现网络访问

    Square公司推出的Retrofit2库(https://square.github.io/retrofit/),改变了网络访问的方式。它实现了网络请求的封装。Retrofit库采用回调处理方式,使得通过接口提交请求和相应的参数的配置,就可以获得对应的响应,并可以将响应获得的数据解析成特定的数据格式,例如将JSON数据解

    2024年02月02日
    浏览(38)
  • 【RxJava】RxJava 简介 ( RxJava 概念 | RxJava 组成 - 被观察者 / 观察者 / 订阅 / 操作符 | RxJava 适用场景 )

    RxJava 框架 是一个 异步编程函数库 , 包含了如下要素 : 观察者模式 迭代器模式 函数式编程 RxJava 框架应用场景 : 异步操作 事件流 RxJava 组成要素 : Observable(被观察者): Observable 是一个 可以发送消息的数据源 , 可以同时发送若干消息 , 消息的格式可以通过泛型进行定义 ; 消

    2024年02月06日
    浏览(44)
  • RxJava的前世【RxJava系列之设计模式】

    学习RxJava,少不了介绍它的设计模式。但我看大部分文章,都是先将其用法介绍一通,然后再结合其用法,讲解其设计模式。这样当然有很多好处,但我个人觉得,这种介绍方式,对于没有接触过RxJava的朋友来说,是不太友好的。 而我,更倾向于,先把对设计模式的认知,拉

    2024年02月13日
    浏览(35)
  • 每月一书(202304)《RxJava2.x实战》

    @ [TOC] 又到了每月一书的时间,本月阅读的是技术相关书籍《 RxJava2.x实战 》,下面分享一下我阅读完后的体会。 本书主要介绍了 RxJava 这个框架,框架版本是2.x。主要内容包含三大部分: 框架的原理和使用方法 框架中各类操作符的使用方法 和其它框架配合使用的方法,这块

    2024年02月01日
    浏览(46)
  • Rxjava3 全新详解及常用操作符

    简介 RxJava 是一个基于 Java 的响应式编程库,用于处理异步事件流和数据流。它是由 Netflix 开发并开源,现在广泛用于 Android 和 Java 后端开发。RxJava 提供了一种用于组合和处理异步数据的丰富工具集,它的核心思想是将数据流视为一系列事件,以响应事件的方式进行处理。

    2024年01月21日
    浏览(79)
  • RxJava 响应式获取List 指定名称对象的值

    可以通过 Java 8 的 Stream API 来实现。假设你有一个名为 list 的 List 对象,其中包含多个对象,每个对象都有一个名为 name 的属性和一个名为 value 的属性,你可以使用以下代码来获取名为 targetName 的对象的 value 值:

    2024年02月08日
    浏览(43)
  • 【RxJava】map过程中各个Observable生命周期分析

    首先说下map和flatMap的区别,防止有对RxJava还不够熟悉的小伙伴 map的Function指定如何将A转为B flatMap的Function则指定如何将ObservableA转为ObservableB map和flatMap最终的转换结果都是ObservableB flatMap由于可以自己创建Observable,因此更为强大灵活,map比较简单 下面,我们创建一个Observab

    2024年02月05日
    浏览(39)
  • 将 RxJava 的 Observable 转换为 Reactor 的 Flux流

    如果您想将 RxJava 的 Observable 转换为 Reactor 的 Flux,可以使用 Reactor Adapter 库来实现这一转换。这个库提供了一些静态方法来将 RxJava 类型转换为 Reactor 类型。 以下是一个示例代码,演示了如何将 RxJava 的 Observable 转换为 Reactor 的 Flux:

    2024年02月01日
    浏览(34)
  • Rxjava源码分析&实践(六)【实践环节:map操作符功能实现】

    Rxjava源码分析实践系列文章目录 Rxjava源码分析实践(一)【RxJava的基本使用】 Rxjava源码分析实践(二)【RxJava基本原理分析之构建流】 Rxjava源码分析实践(三)【RxJava基本原理分析之订阅流】 Rxjava源码分析实践(四)【RxJava基本原理分析之触发流】 Rxjava源码分析实践(五

    2024年02月02日
    浏览(42)
  • Termux 安装配置 Android 5 and 7 下的0.118版本

    Termux wiki: https://wiki.termux.com/wiki/Main_Page Termux download page: https://f-droid.org/packages/com.termux/ 下载就不用多说了吧,直接下载安装就可以了。 在Termux 官方github中有列出国内外的源,其中就拿清华源举例。 Termux mirror list: https://github.com/termux/termux-packages/wiki/Mirrors 清华大学镜像:ht

    2023年04月19日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包