Envelopper un calcul asynchrone dans un calcul synchrone (bloquant)

questions similaires:

  • Motif pour encapsuler une fonction JavaScript asynchrone afin de la rendre synchrone
  • Envelopper une méthode asynchrone de manière synchrone en C #

J’ai un object avec une méthode que je voudrais exposer aux clients de la bibliothèque (en particulier les clients de script) comme quelque chose comme:

interface MyNiceInterface { public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); public Future doSomething(Foo fooArg, Bar barArg); // doSomethingAndBlock is the straightforward way; // doSomething has more control but deals with // a Future and that might be too much hassle for // scripting clients } 

mais le “truc” primitif dont je dispose est un ensemble de classes axées sur les événements:

 interface BazComputationSink { public void onBazResult(Baz result); } class ImplementingThing { public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); } 

Où ImplementingThing prend des entrées, fait des trucs obscurs comme mettre en queue des choses dans une queue de tâches, puis un résultat survient, sink.onBazResult() est appelé sur un thread qui peut ou non être le même thread que ImplementingThing.doSomethingAsync () a été appelé.

Existe-t-il un moyen d’utiliser les fonctions pilotées par les événements que j’ai, ainsi que les primitives d’access simultané, pour implémenter MyNiceInterface afin que les clients de script puissent attendre sur un thread bloquant?

edit: puis-je utiliser FutureTask pour cela?

En utilisant votre propre implémentation future:

 public class BazComputationFuture implements Future, BazComputationSink { private volatile Baz result = null; private volatile boolean cancelled = false; private final CountDownLatch countDownLatch; public BazComputationFuture() { countDownLatch = new CountDownLatch(1); } @Override public boolean cancel(final boolean mayInterruptIfRunning) { if (isDone()) { return false; } else { countDownLatch.countDown(); cancelled = true; return !isDone(); } } @Override public Baz get() throws InterruptedException, ExecutionException { countDownLatch.await(); return result; } @Override public Baz get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { countDownLatch.await(timeout, unit); return result; } @Override public boolean isCancelled() { return cancelled; } @Override public boolean isDone() { return countDownLatch.getCount() == 0; } public void onBazResult(final Baz result) { this.result = result; countDownLatch.countDown(); } } public Future doSomething(Foo fooArg, Bar barArg) { BazComputationFuture future = new BazComputationFuture(); doSomethingAsync(fooArg, barArg, future); return future; } public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { return doSomething(fooArg, barArg).get(); } 

La solution crée un CountDownLatch en interne qui est effacé une fois le rappel reçu. Si les appels de l’utilisateur obtiennent, le CountDownLatch est utilisé pour bloquer le thread d’appel jusqu’à ce que le calcul se termine et appeler le rappel onBazResult. CountDownLatch assurera que si le rappel a lieu avant que get () soit appelé, la méthode get () retournera immédiatement avec un résultat.

Eh bien, il y a la solution simple de faire quelque chose comme:

 public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { final AtomicReference notifier = new AtomicReference(); doSomethingAsync(fooArg, barArg, new BazComputationSink() { public void onBazResult(Baz result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } }); synchronized (notifier) { while (notifier.get() == null) notifier.wait(); } return notifier.get(); } 

Bien sûr, cela suppose que votre résultat Baz ne sera jamais nul…

La librairie google guava a un facile à utiliser SettableFuture qui rend ce problème très simple (environ 10 lignes de code).

 public class ImplementingThing { public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { try { return doSomething(fooArg, barArg).get(); } catch (Exception e) { throw new RuntimeException("Oh dear"); } }; public Future doSomething(Foo fooArg, Bar barArg) { final SettableFuture future = new SettableFuture(); doSomethingAsync(fooArg, barArg, new BazComputationSink() { @Override public void onBazResult(Baz result) { future.set(result); } }); return future; }; // Everything below here is just mock stuff to make the example work, // so you can copy it into your IDE and see it run. public static class Baz {} public static class Foo {} public static class Bar {} public static interface BazComputationSink { public void onBazResult(Baz result); } public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } Baz baz = new Baz(); sink.onBazResult(baz); } }).start(); }; public static void main(Ssortingng[] args) { System.err.println("Starting Main"); System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); System.err.println("Ending Main"); } 

Un exemple très simple, juste pour comprendre CountDownLatch sans code supplémentaire.

Un java.util.concurrent.CountDownLatch est une construction de concurrence qui permet à un ou plusieurs threads d’attendre qu’un ensemble donné d’opérations se termine.

Un CountDownLatch est initialisé avec un nombre donné. Ce nombre est décrémenté par des appels à la méthode countDown() . Les threads attendant ce nombre pour atteindre zéro peuvent appeler l’une des méthodes await() . L’appel de await() bloque le thread jusqu’à ce que le compte atteigne zéro.

Voici un exemple simple. Une fois que le décrémenteur a appelé countDown() 3 fois sur CountDownLatch , le serveur en attente est libéré de l’appel countDown() .

Vous pouvez également mentionner certains TimeOut à attendre.

 CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter) .start(); new Thread(decrementer).start(); Thread.sleep(4000); public class Waiter implements Runnable{ CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } } 

// ————–

 public class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } public void run() { try { Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } 

Référence

Si vous ne souhaitez pas utiliser CountDownLatch ou si vos besoins correspondent à des fonctionnalités similaires à Facebook. Signifie si une méthode est appelée alors n’appelez pas l’autre méthode.

Dans ce cas, vous pouvez déclarer un

 private volatile Boolean isInprocessOfLikeOrUnLike = false; 

et ensuite vous pouvez vérifier au début de votre appel de méthode que si elle est false méthode d’appel retournera sinon .. dépend de votre implémentation.

Voici une solution plus générique basée sur la réponse de Paul Wagland:

 public abstract class AsyncRunnable { protected abstract void run(AtomicReference notifier); protected final void finish(AtomicReference notifier, T result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } public static  T wait(AsyncRunnable runnable) { final AtomicReference notifier = new AtomicReference<>(); // run the asynchronous code runnable.run(notifier); // wait for the asynchronous code to finish synchronized (notifier) { while (notifier.get() == null) { try { notifier.wait(); } catch (InterruptedException ignore) {} } } // return the result of the asynchronous code return notifier.get(); } } 

Voici un exemple d’utilisation:

  Ssortingng result = AsyncRunnable.wait(new AsyncRunnable() { @Override public void run(final AtomicReference notifier) { // here goes your async code, eg: new Thread(new Runnable() { @Override public void run() { finish(notifier, "This was a asynchronous call!"); } }).start(); } }); 

Une version plus détaillée du code peut être trouvée ici: http://pastebin.com/hKHJUBqE

EDIT: L’exemple lié à la question serait:

 public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { return AsyncRunnable.wait(new AsyncRunnable() { @Override protected void run(final AtomicReference notifier) { doSomethingAsync(fooArg, barArg, new BazComputationSink() { public void onBazResult(Baz result) { synchronized (notifier) { notifier.set(result); notifier.notify(); } } }); } }); } 

C’est simple avec RxJava 2.x:

 try { Baz baz = Single.create((SingleEmitter emitter) -> doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) .toFuture().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 

Ou sans notation Lambda:

 Baz baz = Single.create(new SingleOnSubscribe() { @Override public void subscribe(SingleEmitter emitter) { doSomethingAsync(fooArg, barArg, new BazComputationSink() { @Override public void onBazResult(Baz result) { emitter.onSuccess(result); } }); } }).toFuture().get(); 

Encore plus simple:

 Baz baz = Single.create((SingleEmitter emitter) -> doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) .blockingGet();