retry
当Observable
发生错误时接收到onError
事件,重新发射数据。可以拦截·Throwable 和 Exception
。
重载方法如下:
// 一直错误,一直重试
public final Observable<T> retry() {
return retry(Long.MAX_VALUE, Functions.alwaysTrue());
}
// 最大重试的次数
public final Observable<T> retry(long times) {
return retry(times, Functions.alwaysTrue());
}
// 重试条件
public final Observable<T> retry(Predicate<? super Throwable> predicate) {
return retry(Long.MAX_VALUE, predicate);
}
// 重试次数和条件
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate) {
if (times < 0) {
throw new IllegalArgumentException("times >= 0 required but it was " + times);
}
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableRetryPredicate<T>(this, times, predicate));
}
public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableRetryBiPredicate<T>(this, predicate));
}
Repeat
无条件地、重复发送 被观察者事件.,具备重载方法,可设置重复创建次数
public final Observable<T> repeat() {
return repeat(Long.MAX_VALUE);
}
public final Observable<T> repeat(long times) {
if (times < 0) {
throw new IllegalArgumentException("times >= 0 required but it was " + times);
}
if (times == 0) {
return empty();
}
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}
RetryWhen
遇到错误时,将发生的错误传递给一个新的被观察者 Observable
, 并根据新被观察者发送的事件,决定是否需要重新订阅原始被观察者Observable
& 发送事件
分为两种情况文章来源:https://www.toymoban.com/news/detail-690171.html
- 若 新的被观察者
Observable
发送的事件= Error
事件,那么 原始Observable
则不重新发送事件:该异常错误信息可在观察者中的onError()
中获得 - 若 新的被观察者
Observable
发送的事件= Nex
t事件 ,那么原始的Observable
则重新发送事件。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("error happen."));
emitter.onNext(4);
}
})
// 上游遇到error时回调
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
// 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
// 返回Observable<?> = 新的被观察者 Observable(任意类型)
// throwableObservable 必须被处理,不然只会发送上游发送error事件
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
/**
* 1. emit error. 不会重新发射数据。异常传递到观察的的onError中
* 10:54:26.148 com...mple.test_android D 接收到了事件1
* 10:54:26.148 com...mple.test_android D 接收到了事件2
* 10:54:26.148 com...mple.test_android D 接收到了事件3
* 10:54:26.148 com...mple.test_android D 对Error事件作出响应java.lang.Throwable: retry stop!
*/
// return Observable.error(new Throwable("retry stop!"));
/**
* 2. emit onNext
* 原始的Observable则重新发送数据
* 10:57:22.759 com...mple.test_android D 接收到了事件1
* 10:57:22.759 com...mple.test_android D 接收到了事件2
* 10:57:22.759 com...mple.test_android D 接收到了事件3
* 10:57:22.759 com...mple.test_android D 接收到了事件1
* 10:57:22.759 com...mple.test_android D 接收到了事件2
* 10:57:22.759 com...mple.test_android D 接收到了事件3
* 10:57:22.759 com...mple.test_android D 接收到了事件1
* 10:57:22.759 com...mple.test_android D 接收到了事件2
* 10:57:22.759 com...mple.test_android D 接收到了事件3
*/
return Observable.just(true);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应" + e.toString());
// 获取异常错误信息
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
RepeatWhen
有条件地、重复发送 被观察者事件。 将原始 Observable
停止发送事件的标识(Complete() / Error())
。转换成1个 Object 类型数据传递给1个新被观察者(Observable
),以此决定是否重新订阅 & 发送原来的 Observable
。文章来源地址https://www.toymoban.com/news/detail-690171.html
- 若新被观察者(
Observable
)返回1个Complete / Error
事件,则不重新订阅 & 发送原来的Observable
- 若新被观察者(
Observable
)返回其余事件时,则重新订阅 & 发送原来的Observable
Observable.just(1, 2, 3, 4)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
/**
* 1. 发送onComplete事件,不会重新发送原来的数据 但不会回调观察者的onComplete()
* 11:03:43.908 com...mple.test_android D 开始采用subscribe连接
* 11:03:43.908 com...mple.test_android D 接收到了事件1
* 11:03:43.908 com...mple.test_android D 接收到了事件2
* 11:03:43.908 com...mple.test_android D 接收到了事件3
* 11:03:43.908 com...mple.test_android D 接收到了事件4
*/
//return Observable.empty();
/**
* 2. 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
* * 11:05:38.118 com...mple.test_android D 开始采用subscribe连接
* * 11:05:38.119 com...mple.test_android D 接收到了事件1
* * 11:05:38.119 com...mple.test_android D 接收到了事件2
* * 11:05:38.119 com...mple.test_android D 接收到了事件3
* * 11:05:38.119 com...mple.test_android D 接收到了事件4
* * 11:05:38.121 com...mple.test_android D 对Error事件作出响应:java.lang.Throwable: repeat when stop!
*/
//return Observable.error(new Throwable("repeat when stop!"));
/**
* 3.若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
* 11:07:23.876 com...mple.test_android D 开始采用subscribe连接
* 11:07:23.877 com...mple.test_android D 接收到了事件1
* 11:07:23.877 com...mple.test_android D 接收到了事件2
* 11:07:23.877 com...mple.test_android D 接收到了事件3
* 11:07:23.877 com...mple.test_android D 接收到了事件4
* 11:07:23.877 com...mple.test_android D 接收到了事件1
* 11:07:23.877 com...mple.test_android D 接收到了事件2
* 11:07:23.877 com...mple.test_android D 接收到了事件3
* 11:07:23.877 com...mple.test_android D 接收到了事件4
* 11:07:23.877 com...mple.test_android D 接收到了事件1
* 11:07:23.877 com...mple.test_android D 接收到了事件2
* 11:07:23.877 com...mple.test_android D 接收到了事件3
* 11:07:23.877 com...mple.test_android D 接收到了事件4
*/
return Observable.just(1);
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
RetryWhen 和 RepeatWhen组合完成轮询请求
private int i = 0;
public void repeatAndRetryWhen() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
//emitter.onError(new Throwable("error happened!!")); // error走retryWhen
emitter.onNext(3);
emitter.onComplete(); // 顺利完成走repeatWhen
}
}).repeat()
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
// 超出最大请求次数或者这个throwable是结束条件,发送onError传递到下游
if (i > 4) {
return Observable.error(new Throwable("stop retry!"));
}
// 延迟5s后进行重试
return Observable.just(1).delay(5, TimeUnit.SECONDS);
}
});
}
}).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
// 正常结束后10s开始轮询
return objectObservable.delay(10, TimeUnit.SECONDS);
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Finally!!");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
i++;
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
12:53:10.226 com...mple.test_android D 开始采用subscribe连接
12:53:10.238 com...mple.test_android D 接收到了事件1
12:53:15.242 com...mple.test_android D 接收到了事件1
12:53:20.245 com...mple.test_android D 接收到了事件1
12:53:25.248 com...mple.test_android D 接收到了事件1
12:53:30.253 com...mple.test_android D 接收到了事件1
12:53:30.281 com...mple.test_android D 对Error事件作出响应:java.lang.Throwable: stop retry!
12:53:30.281 com...mple.test_android D Finally!!
到了这里,关于Rxjava retryWhen and repeatWhen的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!