Disons que j’ai une queue remplie de tâches que je dois soumettre à un service exécuteur. Je veux qu’ils soient traités un à la fois. La façon la plus simple de penser est de:
Cependant, j’essaie d’éviter de bloquer complètement. Si j’ai 10 000 files d’attente de ce type, qui nécessitent que leurs tâches soient traitées une par une, je vais manquer d’espace de stockage car la plupart d’entre elles conserveront les threads bloqués.
Ce que je voudrais, c’est soumettre une tâche et fournir un rappel qui est appelé lorsque la tâche est terminée. Je vais utiliser cette notification de rappel comme indicateur pour envoyer la tâche suivante. (fonctionnellejava et jetlang utilisent apparemment de tels algorithmes non bloquants, mais je ne peux pas comprendre leur code)
Comment puis-je faire cela en utilisant java.util.concurrent de JDK, sans écrire mon propre service d’exécuteur?
(la queue qui me nourrit ces tâches peut elle-même bloquer, mais c’est un problème à résoudre plus tard)
Définissez une interface de rappel pour recevoir les parameters que vous souhaitez transmettre dans la notification d’achèvement. Puis invoquez-le à la fin de la tâche.
Vous pouvez même écrire un wrapper général pour les tâches Runnable et les envoyer à ExecutorService
. Ou, voir ci-dessous pour un mécanisme intégré à Java 8.
class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } }
Avec CompletableFuture
, Java 8 inclut un moyen plus élaboré de composer des pipelines où les processus peuvent être exécutés de manière asynchrone et conditionnelle. Voici un exemple artificiel mais complet de notification.
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(Ssortingng... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(Ssortingng msg) { System.out.println("Received message: " + msg); } } class ExampleService { Ssortingng work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } }
Utilisez la future API accessible de Guava et ajoutez un rappel. Cf. depuis le site:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture explosion = service.submit(new Callable () { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback () { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } });
En Java 8, vous pouvez utiliser CompletableFuture . Voici un exemple que j’ai utilisé dans mon code pour récupérer des utilisateurs de mon service utilisateur, les mapper sur mes objects de vue, puis mettre à jour ma vue ou afficher une boîte de dialog d’erreur (il s’agit d’une application graphique):
CompletableFuture.supplyAsync( userService::listUsers ).thenApply( this::mapUsersToUserViews ).thenAccept( this::updateView ).exceptionally( throwable -> { showErrorDialogFor(throwable); return null; } );
Il s’exécute de manière asynchrone. J’utilise deux méthodes privées: mapUsersToUserViews
et updateView
.
Vous pouvez étendre la classe FutureTask
et remplacer la méthode done()
, puis append l’object FutureTask
à ExecutorService
, de sorte que la méthode done()
invoque lorsque la FutureTask
terminée immédiatement.
ThreadPoolExecutor
également des méthodes de hook beforeExecute
et afterExecute
que vous pouvez remplacer et utiliser. Voici la description des ThreadPoolExecutor
de ThreadPoolExecutor .
Méthodes de crochet
Cette classe fournit des méthodes
beforeExecute(java.lang.Thread, java.lang.Runnable)
etafterExecute(java.lang.Runnable, java.lang.Throwable)
qui sont appelées avant et après l’exécution de chaque tâche. Ceux-ci peuvent être utilisés pour manipuler l’environnement d’exécution; par exemple, réinitialiserThreadLocals
, rassembler des statistiques ou append des entrées de journal. De plus, la méthodeterminated()
peut être remplacée pour effectuer tout traitement spécial qui doit être effectué une fois que l’Executor
est complètement terminé. Si les méthodes hook ou callback lancent des exceptions, les threads de travail internes peuvent à leur tour échouer et se terminer brusquement.
Utilisez un CountDownLatch
.
C’est à partir de java.util.concurrent
et c’est exactement la manière d’attendre que plusieurs threads se terminent avant de continuer.
Pour obtenir l’effet de rappel que vous recherchez, cela nécessite un peu de travail supplémentaire. C’est-à-dire, gérer cela par vous-même dans un thread séparé qui utilise CountDownLatch
et qui attend, puis continue à notifier ce que vous devez notifier. Il n’y a pas de support natif pour le rappel, ou quelque chose de similaire à cet effet.
EDIT: maintenant que je comprends mieux votre question, je pense que vous allez trop loin, inutilement. Si vous prenez un SingleThreadExecutor
standard, SingleThreadExecutor
-le à toutes les tâches, et cela fera la queue en mode natif.
Si vous voulez vous assurer qu’aucune tâche ne sera exécutée en même temps, utilisez un SingleThreadedExecutor . Les tâches seront traitées dans l’ordre dans lequel elles sont soumises. Vous n’avez même pas besoin de tenir les tâches, il suffit de les soumettre à l’exécutable.
Juste pour append à la réponse de Matt, qui a aidé, voici un exemple plus détaillé pour montrer l’utilisation d’un rappel.
private static Primes primes = new Primes(); public static void main(Ssortingng[] args) throws InterruptedException { getPrimeAsync((p) -> System.out.println("onPrimeListener; p=" + p)); System.out.println("Adios mi amigito"); } public interface OnPrimeListener { void onPrime(int prime); } public static void getPrimeAsync(OnPrimeListener listener) { CompletableFuture.supplyAsync(primes::getNextPrime) .thenApply((prime) -> { System.out.println("getPrimeAsync(); prime=" + prime); if (listener != null) { listener.onPrime(prime); } return prime; }); }
La sortie est la suivante:
getPrimeAsync(); prime=241 onPrimeListener; p=241 Adios mi amigito
Code simple pour implémenter le mécanisme de Callback
aide d’ ExecutorService
import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(Ssortingng args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable { Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } }
sortie:
creating service Callable:1:pool-1-thread-1 Call back:1 Callable:3:pool-1-thread-3 Callable:2:pool-1-thread-2 Call back:2 Callable:5:pool-1-thread-5 Call back:5 Call back:3 Callable:4:pool-1-thread-4 Call back:4
Notes clés:
newFixedThreadPool(5)
séquencées dans l’ordre FIFO, remplacez newFixedThreadPool(5)
par newFixedThreadPool(1)
Si vous souhaitez traiter la tâche suivante après avoir analysé le résultat du callback
de la tâche précédente, désélectionnez tout simplement la ligne ci-dessous.
//System.out.println("future status:"+future.get()+":"+future.isDone());
Vous pouvez remplacer newFixedThreadPool()
par l’un des
Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor
en fonction de votre cas d’utilisation.
Si vous souhaitez gérer la méthode de rappel de manière asynchrone
une. ExecutorService or ThreadPoolExecutor
une tâche ExecutorService or ThreadPoolExecutor
à ExecutorService or ThreadPoolExecutor
b. Convertissez votre méthode Callable/Runnable
tâche Callable/Runnable
c. Poussez la tâche de rappel vers ExecutorService or ThreadPoolExecutor
Ceci est une extension de la réponse de Pache en utilisant ListenableFuture
de Guava.
En particulier, Futures.transform()
renvoie ListenableFuture
et peut donc être utilisé pour enchaîner les appels asynchrones. Futures.addCallback()
renvoie un void
, il ne peut donc pas être utilisé pour le chaînage, mais il est utile pour gérer le succès / l’échec lors de l’achèvement asynchrone.
// ListenableFuture1: Open Database ListenableFuture database = service.submit(() -> openDatabase()); // ListenableFuture2: Query Database for Cursor rows ListenableFuture cursor = Futures.transform(database, database -> database.query(table, ...)); // ListenableFuture3: Convert Cursor rows to List ListenableFuture> fooList = Futures.transform(cursor, cursor -> cursorToFooList(cursor)); // Final Callback: Handle the success/errors when final future completes Futures.addCallback(fooList, new FutureCallback>() { public void onSuccess(List foos) { doSomethingWith(foos); } public void onFailure(Throwable thrown) { log.error(thrown); } });
REMARQUE: Outre l’enchaînement des tâches asynchrones, Futures.transform()
vous permet également de planifier chaque tâche sur un exécuteur distinct (non illustré dans cet exemple).
Vous pouvez utiliser une implémentation de Callable telle que
public class MyAsyncCallable implements Callable { CallbackInterface ci; public MyAsyncCallable(CallbackInterface ci) { this.ci = ci; } public V call() throws Exception { System.out.println("Call of MyCallable invoked"); System.out.println("Result = " + this.ci.doSomething(10, 20)); return (V) "Good job"; } }
où CallbackInterface est quelque chose de très basique comme
public interface CallbackInterface { public int doSomething(int a, int b); }
et maintenant la classe principale ressemblera à ceci
ExecutorService ex = Executors.newFixedThreadPool(2); MyAsyncCallable mac = new MyAsyncCallable ((a, b) -> a + b); ex.submit(mac);