[rx-java] rxjava : retry ()를 사용할 수 있지만 지연이 있습니까?

내 Android 앱에서 rxjava를 사용하여 네트워크 요청을 비동기 적으로 처리하고 있습니다. 이제 특정 시간이 지난 후에 만 ​​실패한 네트워크 요청을 다시 시도하고 싶습니다.

Observable에서 retry ()를 사용하지만 특정 지연 후에 만 ​​재 시도하는 방법이 있습니까?

Observable에게 현재 재시도 중임을 알리는 방법이 있습니까 (처음 시도하는 것과 반대)?

debounce () / throttleWithTimeout ()을 보았지만 다른 작업을 수행하는 것 같습니다.

편집하다:

나는 한 가지 방법을 찾았다 고 생각하지만 이것이 올바른 방법인지 또는 다른 더 나은 방법인지 확인하는 데 관심이 있습니다.

내가하는 일은 다음과 같다. 내 Observable.OnSubscribe의 call () 메서드에서 Subscribers onError () 메서드를 호출하기 전에 원하는 시간 동안 스레드를 잠자기 만하면됩니다. 따라서 1000 밀리 초마다 다시 시도하려면 다음과 같이합니다.

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

이 메서드는 어쨌든 IO 스레드에서 실행되기 때문에 UI를 차단하지 않습니다. 내가 볼 수있는 유일한 문제는 첫 번째 오류조차도 지연으로보고되므로 retry ()가 없어도 지연이 있다는 것입니다. 지연이 적용되지 않은 경우 내가 더 잘하고 싶은 한 후 오류 대신 (분명하지만 첫 번째 시도하기 전에) 재 시도.



답변

retryWhen()연산자를 사용하여 Observable에 재시도 논리를 추가 할 수 있습니다 .

다음 클래스에는 재시도 논리가 포함되어 있습니다.

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

용법:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));


답변

Paul의 답변 에서 영감을 얻었으며 Abhijit SarkarretryWhen 가 언급 한 문제에 관심 이 없다면 rxJava2로 재가입을 무조건 지연하는 가장 간단한 방법은 다음과 같습니다.

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

retryWhen 및 repeatWhen 에 대한 더 많은 샘플과 설명을 볼 수 있습니다 .


답변

이 예제는 jxjava 2.2.2에서 작동합니다.

지체없이 재시도 :

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

지연 후 재시도 :

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error)
                -> log.error("I tried five times with a 300ms break"
                             + " delay in between. But it was in vain."));

someConnection.send ()가 실패하면 소스 싱글이 실패합니다. 이런 일이 발생하면 retryWhen 내부에서 관찰 가능한 오류가 오류를 내 보냅니다. 이 방출을 300ms 지연시키고 재시도 신호를 보내기 위해 다시 보냅니다. take (5)는 5 개의 오류를 수신 한 후 observable 신호가 종료되도록 보장합니다. 재시도 종료를 확인하고 다섯 번째 실패 후 재 시도하지 않습니다.


답변

이것은 내가 본 Ben Christensen의 스 니펫, RetryWhen ExampleRetryWhenTestsConditional을 기반으로 한 솔루션입니다. ( 작동 하려면 n.getThrowable()로 변경 해야 n했습니다). 내가 사용 evant / Gradle을-retrolambda을 안드로이드에 람다 표기법 작업을하는 것이 아니라 (그것은 매우 권장하지만) 당신은 람다를 사용할 필요가 없습니다. 지연을 위해 지수 백 오프를 구현했지만 원하는 백 오프 논리를 연결할 수 있습니다. 완전성을 위해 subscribeOnobserveOn연산자를 추가했습니다 . 내가 사용하고 ReactiveX / RxAndroid을 위해 AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);


답변

MyRequestObservable.retry를 사용하는 대신 지연에 대한 간접 처리를 처리하는 새 Observable을 반환하는 래퍼 함수 retryObservable (MyRequestObservable, retrycount, seconds)을 사용합니다.

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0)
        {
            //success!
        }
    },
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) {
           // failed after the 3 retries !
        }});


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}


답변

retryWhen복잡하고 버그가있는 연산자입니다. 공식 문서와 여기에서 적어도 하나의 답변은 range연산자를 사용 하며 재 시도가 없으면 실패합니다. 내ReactiveX 회원 David Karnok와의 토론 을 .

나는 변경하여 kjones ‘대답에 개선 flatMapconcatMap와 추가하여 RetryDelayStrategy클래스를. flatMap방출 순서를 보존하지 않습니다 concatMap. 이는 백 오프 지연에 중요합니다. 는 RetryDelayStrategy이름에서 알 수 있듯이,하자 사용자가 백 오프를 포함하여 생성 재시도 지연의 다양한 모드를 선택할 수 있습니다. 이 코드는 다음 테스트 케이스와 함께 내 GitHub에서 사용할 수 있습니다 .

  1. 첫 번째 시도에서 성공 (재시도 없음)
  2. 1 회 재시도 후 실패
  3. 3 회 재 시도를 시도하지만 2 회에 성공하므로 3 회 재 시도하지 않습니다.
  4. 세 번째 재시도 성공

setRandomJokes방법을 참조하십시오 .


답변

여기 에 kjones 답변을 기반으로 RxJava 2.x의 Kotlin 버전이 확장으로 지연되어 재 시도됩니다. Observable동일한 확장을 만들기 위해 바꾸기Flowable .

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

그런 다음 Observable에 사용하십시오. observable.retryWithDelay(3, 1000)