Exécutants Java: comment être averti, sans blocage, à la fin d’une tâche?

Disons que j’ai une queue remplie de tâches que je dois soumettre à un service exécuteur. Je veux qu’ils soient traités un à la fois. La façon la plus simple de penser est de:

  1. Prendre une tâche de la queue
  2. Le soumettre à l’exécuteur
  3. Appelez .get sur le Future retourné et bloquez jusqu’à ce qu’un résultat soit disponible
  4. Prenez une autre tâche de la queue …

Cependant, j’essaie d’éviter de bloquer complètement. Si j’ai 10 000 files d’attente de ce type, qui nécessitent que leurs tâches soient traitées une par une, je vais manquer d’espace de stockage car la plupart d’entre elles conserveront les threads bloqués.

Ce que je voudrais, c’est soumettre une tâche et fournir un rappel qui est appelé lorsque la tâche est terminée. Je vais utiliser cette notification de rappel comme indicateur pour envoyer la tâche suivante. (fonctionnellejava et jetlang utilisent apparemment de tels algorithmes non bloquants, mais je ne peux pas comprendre leur code)

Comment puis-je faire cela en utilisant java.util.concurrent de JDK, sans écrire mon propre service d’exécuteur?

(la queue qui me nourrit ces tâches peut elle-même bloquer, mais c’est un problème à résoudre plus tard)

Définissez une interface de rappel pour recevoir les parameters que vous souhaitez transmettre dans la notification d’achèvement. Puis invoquez-le à la fin de la tâche.

Vous pouvez même écrire un wrapper général pour les tâches Runnable et les envoyer à ExecutorService . Ou, voir ci-dessous pour un mécanisme intégré à Java 8.

 class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } } 

Avec CompletableFuture , Java 8 inclut un moyen plus élaboré de composer des pipelines où les processus peuvent être exécutés de manière asynchrone et conditionnelle. Voici un exemple artificiel mais complet de notification.

 import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(Ssortingng... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(Ssortingng msg) { System.out.println("Received message: " + msg); } } class ExampleService { Ssortingng work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } } 

Utilisez la future API accessible de Guava et ajoutez un rappel. Cf. depuis le site:

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture explosion = service.submit(new Callable() { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } }); 

En Java 8, vous pouvez utiliser CompletableFuture . Voici un exemple que j’ai utilisé dans mon code pour récupérer des utilisateurs de mon service utilisateur, les mapper sur mes objects de vue, puis mettre à jour ma vue ou afficher une boîte de dialog d’erreur (il s’agit d’une application graphique):

  CompletableFuture.supplyAsync( userService::listUsers ).thenApply( this::mapUsersToUserViews ).thenAccept( this::updateView ).exceptionally( throwable -> { showErrorDialogFor(throwable); return null; } ); 

Il s’exécute de manière asynchrone. J’utilise deux méthodes privées: mapUsersToUserViews et updateView .

Vous pouvez étendre la classe FutureTask et remplacer la méthode done() , puis append l’object FutureTask à ExecutorService , de sorte que la méthode done() invoque lorsque la FutureTask terminée immédiatement.

ThreadPoolExecutor également des méthodes de hook beforeExecute et afterExecute que vous pouvez remplacer et utiliser. Voici la description des ThreadPoolExecutor de ThreadPoolExecutor .

Méthodes de crochet

Cette classe fournit des méthodes beforeExecute(java.lang.Thread, java.lang.Runnable) et afterExecute(java.lang.Runnable, java.lang.Throwable) qui sont appelées avant et après l’exécution de chaque tâche. Ceux-ci peuvent être utilisés pour manipuler l’environnement d’exécution; par exemple, réinitialiser ThreadLocals , rassembler des statistiques ou append des entrées de journal. De plus, la méthode terminated() peut être remplacée pour effectuer tout traitement spécial qui doit être effectué une fois que l’ Executor est complètement terminé. Si les méthodes hook ou callback lancent des exceptions, les threads de travail internes peuvent à leur tour échouer et se terminer brusquement.

Utilisez un CountDownLatch .

C’est à partir de java.util.concurrent et c’est exactement la manière d’attendre que plusieurs threads se terminent avant de continuer.

Pour obtenir l’effet de rappel que vous recherchez, cela nécessite un peu de travail supplémentaire. C’est-à-dire, gérer cela par vous-même dans un thread séparé qui utilise CountDownLatch et qui attend, puis continue à notifier ce que vous devez notifier. Il n’y a pas de support natif pour le rappel, ou quelque chose de similaire à cet effet.


EDIT: maintenant que je comprends mieux votre question, je pense que vous allez trop loin, inutilement. Si vous prenez un SingleThreadExecutor standard, SingleThreadExecutor -le à toutes les tâches, et cela fera la queue en mode natif.

Si vous voulez vous assurer qu’aucune tâche ne sera exécutée en même temps, utilisez un SingleThreadedExecutor . Les tâches seront traitées dans l’ordre dans lequel elles sont soumises. Vous n’avez même pas besoin de tenir les tâches, il suffit de les soumettre à l’exécutable.

Juste pour append à la réponse de Matt, qui a aidé, voici un exemple plus détaillé pour montrer l’utilisation d’un rappel.

 private static Primes primes = new Primes(); public static void main(Ssortingng[] args) throws InterruptedException { getPrimeAsync((p) -> System.out.println("onPrimeListener; p=" + p)); System.out.println("Adios mi amigito"); } public interface OnPrimeListener { void onPrime(int prime); } public static void getPrimeAsync(OnPrimeListener listener) { CompletableFuture.supplyAsync(primes::getNextPrime) .thenApply((prime) -> { System.out.println("getPrimeAsync(); prime=" + prime); if (listener != null) { listener.onPrime(prime); } return prime; }); } 

La sortie est la suivante:

  getPrimeAsync(); prime=241 onPrimeListener; p=241 Adios mi amigito 

Code simple pour implémenter le mécanisme de Callback aide d’ ExecutorService

 import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(Ssortingng args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable{ Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } } 

sortie:

 creating service Callable:1:pool-1-thread-1 Call back:1 Callable:3:pool-1-thread-3 Callable:2:pool-1-thread-2 Call back:2 Callable:5:pool-1-thread-5 Call back:5 Call back:3 Callable:4:pool-1-thread-4 Call back:4 

Notes clés:

  1. Si vous souhaitez que les tâches de processus newFixedThreadPool(5) séquencées dans l’ordre FIFO, remplacez newFixedThreadPool(5) par newFixedThreadPool(1)
  2. Si vous souhaitez traiter la tâche suivante après avoir analysé le résultat du callback de la tâche précédente, désélectionnez tout simplement la ligne ci-dessous.

     //System.out.println("future status:"+future.get()+":"+future.isDone()); 
  3. Vous pouvez remplacer newFixedThreadPool() par l’un des

     Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor 

    en fonction de votre cas d’utilisation.

  4. Si vous souhaitez gérer la méthode de rappel de manière asynchrone

    une. ExecutorService or ThreadPoolExecutor une tâche ExecutorService or ThreadPoolExecutor à ExecutorService or ThreadPoolExecutor

    b. Convertissez votre méthode Callable/Runnable tâche Callable/Runnable

    c. Poussez la tâche de rappel vers ExecutorService or ThreadPoolExecutor

Ceci est une extension de la réponse de Pache en utilisant ListenableFuture de Guava.

En particulier, Futures.transform() renvoie ListenableFuture et peut donc être utilisé pour enchaîner les appels asynchrones. Futures.addCallback() renvoie un void , il ne peut donc pas être utilisé pour le chaînage, mais il est utile pour gérer le succès / l’échec lors de l’achèvement asynchrone.

 // ListenableFuture1: Open Database ListenableFuture database = service.submit(() -> openDatabase()); // ListenableFuture2: Query Database for Cursor rows ListenableFuture cursor = Futures.transform(database, database -> database.query(table, ...)); // ListenableFuture3: Convert Cursor rows to List ListenableFuture> fooList = Futures.transform(cursor, cursor -> cursorToFooList(cursor)); // Final Callback: Handle the success/errors when final future completes Futures.addCallback(fooList, new FutureCallback>() { public void onSuccess(List foos) { doSomethingWith(foos); } public void onFailure(Throwable thrown) { log.error(thrown); } }); 

REMARQUE: Outre l’enchaînement des tâches asynchrones, Futures.transform() vous permet également de planifier chaque tâche sur un exécuteur distinct (non illustré dans cet exemple).

Vous pouvez utiliser une implémentation de Callable telle que

 public class MyAsyncCallable implements Callable { CallbackInterface ci; public MyAsyncCallable(CallbackInterface ci) { this.ci = ci; } public V call() throws Exception { System.out.println("Call of MyCallable invoked"); System.out.println("Result = " + this.ci.doSomething(10, 20)); return (V) "Good job"; } } 

où CallbackInterface est quelque chose de très basique comme

 public interface CallbackInterface { public int doSomething(int a, int b); } 

et maintenant la classe principale ressemblera à ceci

 ExecutorService ex = Executors.newFixedThreadPool(2); MyAsyncCallable mac = new MyAsyncCallable((a, b) -> a + b); ex.submit(mac);