rxjava: Puis-je utiliser retry () mais avec un délai?

J’utilise rxjava dans mon application Android pour gérer les demandes réseau de manière asynchrone. Maintenant, je voudrais réessayer une demande réseau en échec seulement après un certain temps.

Existe-t-il un moyen d’utiliser retry () sur un observable mais de ne réessayer qu’après un certain délai?

Y a-t-il un moyen de faire savoir à l’observable qu’il est actuellement en train de réessayer (plutôt que d’essayer pour la première fois)?

J’ai jeté un coup d’oeil à debounce () / throttleWithTimeout () mais ils semblent faire quelque chose de différent.

Modifier:

Je pense que j’ai trouvé un moyen de le faire, mais je serais intéressé soit par la confirmation que c’est la bonne façon de le faire, soit par d’autres moyens plus efficaces.

Voici ce que je fais: Dans la méthode call () de mon Observable.OnSubscribe, avant d’appeler la méthode Subscribers onError (), je laisse simplement le thread dormir pendant la durée souhaitée. Donc, pour réessayer toutes les 1000 millisecondes, je fais quelque chose comme ceci:

@Override public void call(Subscriber<? super List> subscriber) { try { Log.d(TAG, "trying to load all products with pid: " + pid); subscriber.onNext(productClient.getProductNodesForParentId(pid)); subscriber.onCompleted(); } catch (Exception e) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e.printStackTrace(); } subscriber.onError(e); } } 

Comme cette méthode est en cours d’exécution sur un thread IO, elle ne bloque pas l’interface utilisateur. Le seul problème que je peux voir est que même la première erreur est signalée avec un délai, le délai est donc présent même s’il n’y a pas de nouvelle tentative (). Je préférerais que le délai ne soit pas appliqué après une erreur mais plutôt avant une nouvelle tentative (mais pas avant le premier essai, évidemment).

Vous pouvez utiliser l’opérateur retryWhen() pour append une logique de nouvelle tentative à n’importe quelle observable.

La classe suivante contient la logique de nouvelle tentative:

RxJava 2.x

 public class RetryWithDelay implements Function, Observable< ?>> { private final int maxResortinges; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxResortinges, final int retryDelayMillis) { this.maxResortinges = maxResortinges; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable< ?> apply(final Observable< ? extends Throwable> attempts) { return attempts .flatMap(new Function>() { @Override public Observable< ?> apply(final Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

RxJava 1.x

 public class RetryWithDelay implements Func1, Observable< ?>> { private final int maxResortinges; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxResortinges, final int retryDelayMillis) { this.maxResortinges = maxResortinges; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable< ?> call(Observable< ? extends Throwable> attempts) { return attempts .flatMap(new Func1>() { @Override public Observable< ?> call(Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

Usage:

 // Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable .retryWhen(new RetryWithDelay(3, 2000)); 

Ceci est une solution basée sur les extraits de Ben Christensen que j’ai vus, RetryWhen Example et RetryWhenTestsConditional (j’ai dû changer n.getThrowable() en n pour que cela fonctionne). J’ai utilisé evant / gradle-retrolambda pour faire fonctionner la notation lambda sur Android, mais vous n’avez pas à utiliser lambdas (bien que cela soit fortement recommandé). Pour le retard, j’ai mis en place une sauvegarde exponentielle, mais vous pouvez twigr la logique de sauvegarde que vous souhaitez. Pour être complet, j’ai ajouté les opérateurs subscribeOn et observeOn . J’utilise ReactiveX / RxAndroid pour AndroidSchedulers.mainThread() .

 int ATTEMPT_COUNT = 10; public class Tuple { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 

au lieu d’utiliser MyRequestObservable.retry, j’utilise une fonction wrapper retryObservable (MyRequestObservable, retrycount, seconds) qui retourne une nouvelle observable qui gère l’indirection du délai pour que je puisse le faire

 retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1(){ @Override public void call(Throwable arg0) { // failed after the 3 resortinges ! }}); // wrapper code private static  Observable retryObservable( final Observable requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber< ? super T> subscriber) { requestObservable.subscribe(new Action1() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1>(){ @Override public void call(Observable observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after resortinges subscriber.onError(error); } } }); } }); } 

Inspiré par la réponse de Paul , et si vous ne vous retryWhen pas de retryWhen problèmes déclarés par Abhijit Sarkar , le moyen le plus simple de retarder la réabonnement avec rxJava2 unconditionnaly est:

 source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS)) 

Vous souhaiterez peut-être voir plus d’échantillons et d’explications sur retryWhen et repeatWhen .

Maintenant, avec la version 1.0+ de RxJava, vous pouvez utiliser zipWith pour réessayer avec un délai.

Ajout de modifications aux kjones répondent.

Modifié

 public class RetryWithDelay implements Func1, Observable< ?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of resortinges and seconds to be delayed between retry. * * @param maxResortinges Number of resortinges. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxResortinges, int delayDurationInSeconds) { MAX_RETRIES = maxResortinges; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable< ?> call(Observable< ? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } } 

Vous pouvez append un délai dans l’observable retourné dans l’opérateur retryWhen

  /** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toSsortingng) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all resortinges:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); } 

Vous pouvez voir plus d’exemples ici. https://github.com/politrons/reactive

retryWhen est un opérateur compliqué, peut-être même retryWhen . La doc officielle et au moins une réponse ici utilisent un opérateur de range , qui échouera s’il n’y a pas de nouvelles tentatives. Voir ma discussion avec David Karnok, membre de ReactiveX.

J’ai amélioré la réponse de flatMap transformant flatMap en concatMap et en ajoutant une classe RetryDelayStrategy . flatMap ne préserve pas l’ordre d’émission pendant que concatMap fait, ce qui est important pour les retards avec retour en arrière. RetryDelayStrategy , comme son nom l’indique, permet à l’utilisateur de choisir parmi différents modes de génération de retards, y compris les retards. Le code est disponible sur mon GitHub avec les cas de test suivants:

  1. Réussite au 1er essai (pas de relance)
  2. Échoue après 1 tentative
  3. Essaie de réessayer 3 fois, mais réussit la 2ème fois et ne réessaye pas la 3ème fois
  4. Réussite au 3ème essai

Voir la méthode setRandomJokes .

Même réponse que pour les kjones mais mise à jour vers la dernière version pour la version RxJava 2.x : (‘io.reactivex.rxjava2: rxjava: 2.1.3’)

 public class RetryWithDelay implements Function, Publisher< ?>> { private final int maxResortinges; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxResortinges, final int retryDelayMillis) { this.maxResortinges = maxResortinges; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher< ?> apply(Flowable throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function>() { @Override public Publisher< ?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } 

}

Usage:

// Ajouter une logique de nouvelle tentative à une observable existante. // Réessayez 3 fois maximum avec un délai de 2 secondes.

 observable .retryWhen(new RetryWithDelay(3, 2000)); 

Pour la version Kotlin & RxJava1

 class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long) : Function1, Observable< *>> { private val START_RETRY: Int = 1 override fun invoke(observable: Observable): Observable< *> { return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), object : Function2 { override fun invoke(throwable: Throwable, attempt: Int): Int { return attempt } }) } } 

(Kotlin) J’ai un peu amélioré le code avec des retombées exponentielles et une émission de défense appliquée de Observable.range ():

  fun testOnRetryWithDelayExponentialBackoff() { val interval = 1 val maxCount = 3 val ai = AtomicInteger(1); val source = Observable.create { emitter -> val attempt = ai.getAndIncrement() println("Subscribe ${attempt}") if (attempt >= maxCount) { emitter.onNext(Unit) emitter.onComplete() } emitter.onError(RuntimeException("Test $attempt")) } // Below implementation of "retryWhen" function, remove all "println()" for real code. val sourceWithRetry: Observable = source.retryWhen { throwableRx -> throwableRx.doOnNext({ println("Error: $it") }) .zipWith(Observable.range(1, maxCount) .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) }, BiFunction { t1: Throwable, t2: Int -> t1 to t2 } ) .flatMap { pair -> if (pair.second >= maxCount) { Observable.error(pair.first) } else { val delay = interval * 2F.pow(pair.second) println("retry delay: $delay") Observable.timer(delay.toLong(), TimeUnit.SECONDS) } } } //Code to print the result in terminal. sourceWithRetry .doOnComplete { println("Complete") } .doOnError({ println("Final Error: $it") }) .blockingForEach { println("$it") } } 

Faites-le simplement comme ceci:

  Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1>() { @Override public Observable call(Ssortingng s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() < = 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1() { @Override public void call(File file) { postAvatar(file); } }, new Action1() { @Override public void call(Throwable throwable) { } });