ExecutorService, comment attendre que toutes les tâches se terminent

Quel est le moyen le plus simple d’attendre que toutes les tâches d’ ExecutorService se terminent? Ma tâche est principalement informatique, alors je veux simplement exécuter un grand nombre de tâches – une sur chaque cœur. En ce moment, ma configuration ressemble à ceci:

 ExecutorService es = Executors.newFixedThreadPool(2); for (DataTable singleTable : uniquePhrases) { es.execute(new ComputeDTask(singleTable)); } try{ es.wait(); } catch (InterruptedException e){ e.printStackTrace(); } 

ComputeDTask implémente ComputeDTask . Cela semble exécuter les tâches correctement, mais le code se bloque sur wait() avec IllegalMonitorStateException . C’est étrange, car j’ai joué avec quelques exemples de jouets et cela semblait fonctionner.

uniquePhrases contient plusieurs dizaines de milliers d’éléments. Devrais-je utiliser une autre méthode? Je cherche quelque chose d’aussi simple que possible

L’approche la plus simple consiste à utiliser ExecutorService.invokeAll() qui fait ce que vous voulez dans une ligne simple. Dans votre langage, vous devrez modifier ou envelopper ComputeDTask pour implémenter Callable<> , ce qui peut vous donner un peu plus de flexibilité. Probablement dans votre application, il y a une implémentation significative de Callable.call() , mais voici un moyen de l’envelopper si vous n’utilisez pas Executors.callable() .

 ExecutorService es = Executors.newFixedThreadPool(2); List> todo = new ArrayList>(singleTable.size()); for (DataTable singleTable: uniquePhrases) { todo.add(Executors.callable(new ComputeDTask(singleTable))); } List> answers = es.invokeAll(todo); 

Comme d’autres l’ont fait remarquer, vous pouvez utiliser la version de délai d’attente d’ invokeAll() cas échéant. Dans cet exemple, les answers vont contenir un tas de Future s qui renverront des valeurs nulles (voir la définition de Executors.callable() . Ce que vous voulez probablement faire, c’est un léger refactoring pour obtenir une réponse utile ou une référence). à la ComputeDTask sous- ComputeDTask , mais je ne peux pas dire de votre exemple.

Si ce n’est pas clair, notez que invokeAll() ne reviendra pas tant que toutes les tâches ne seront pas terminées. (c.-à-d. que tous les Future de votre collection de answers signaleront .isDone() si demandé). Cela évite tout arrêt manuel, waitTermination, etc.

Il y a quelques questions connexes sur SO:

  • Comment attendre que tous les threads se terminent

  • Renvoie les valeurs des threads java

  • invokeAll () ne veut pas accepter une collection >

  • Dois-je synchroniser?

Aucun d’entre eux n’est ssortingctement lié à votre question, mais ils fournissent un peu de couleur sur la manière dont les personnes pensent Executor / ExecutorService doit être utilisé.

Si vous souhaitez attendre que toutes les tâches soient terminées, utilisez la méthode d’ shutdown au lieu d’ wait . Ensuite, suivez-le en awaitTermination .

En outre, vous pouvez utiliser Runtime.availableProcessors pour obtenir le nombre de threads matériels afin de pouvoir initialiser correctement votre pool de threads.

Si l’attente de la fin de toutes les tâches dans ExecutorService n’est pas précisément votre objective, vous pouvez plutôt utiliser CompletionService , en l’ ExecutorCompletionService un ExecutorCompletionService , plutôt que d’attendre la fin d’un lot de tâches spécifique.

L’idée est de créer un ExecutorCompletionService encapsulant votre Executor , de soumettre un certain nombre de tâches connues via CompletionService , puis de tirer le même nombre de résultats de la queue d’achèvement à l’ aide de take() (quels blocs) ou poll() ) . Une fois que vous avez dessiné tous les résultats attendus correspondant aux tâches que vous avez soumises, vous savez qu’elles sont toutes terminées.

Permettez-moi de le dire encore une fois, car ce n’est pas évident à partir de l’interface: vous devez savoir combien de choses vous mettez dans le CompletionService pour savoir combien de choses vous devez essayer. Cela est particulièrement important avec la méthode take() : appelez-la une fois de trop et cela bloquera votre thread d’appel jusqu’à ce qu’un autre thread soumette un autre travail au même CompletionService .

Certains exemples montrent comment utiliser CompletionService dans le livre Java Concurrency in Practice .

Si vous voulez attendre que le service exécutant ait fini d’exécuter, appelez shutdown() puis waitTermination (units, unitType) , par exemple awaitTermination(1, MINUTE) . L’executorService ne bloque pas sur son propre moniteur, vous ne pouvez donc pas utiliser wait etc.

Vous pouvez attendre des travaux pour terminer sur un certain intervalle:

 int maxSecondsPerComputeDTask = 20; try { while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { // consider giving up with a 'break' statement under certain conditions } } catch (InterruptedException e) { throw new RuntimeException(e); } 

Ou vous pouvez utiliser ExecutorService . submit ( Runnable ) et recueillez les objects Future qu’il retourne et appelez get () à chaque fois pour les attendre à la fin.

 ExecutorService es = Executors.newFixedThreadPool(2); Collection> futures = new LinkedList<>(); for (DataTable singleTable : uniquePhrases) { futures.add(es.submit(new ComputeDTask(singleTable))); } for (Future future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } 

InterruptedException est extrêmement important à gérer correctement. C’est ce qui vous permet, à vous ou aux utilisateurs de votre bibliothèque, de mettre fin à un long processus en toute sécurité.

Juste utiliser

 latch = new CountDownLatch(noThreads) 

Dans chaque fil

 latch.countDown(); 

et comme barrière

 latch.await(); 

Cause racine pour IllegalMonitorStateException :

Émis pour indiquer qu’un thread a tenté d’attendre sur le moniteur d’un object ou pour notifier d’autres threads en attente sur le moniteur d’un object sans posséder le moniteur spécifié.

Depuis votre code, vous venez d’appeler wait () sur ExecutorService sans posséder le verrou.

Le code ci-dessous corrigera IllegalMonitorStateException

 try { synchronized(es){ es.wait(); // Add some condition before you call wait() } } 

Suivez l’une des approches ci-dessous pour attendre l’achèvement de toutes les tâches qui ont été soumises à ExecutorService .

  1. Parcourez toutes les tâches Future de submit sur ExecutorService et vérifiez le statut avec un appel bloquant get() sur un object Future

  2. Utiliser invokeAll sur ExecutorService

  3. Utilisation de CountDownLatch

  4. Utiliser ForkJoinPool ou newWorkStealingPool of Executors (depuis Java 8)

  5. Arrêtez le pool comme recommandé dans la page de documentation d’Oracle

     void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } 

    Si vous souhaitez que toutes les tâches soient terminées correctement lorsque vous utilisez l’option 5 au lieu des options 1 à 4, modifiez

     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 

    à

    un while(condition) qui vérifie toutes les 1 minutes.

J’ai aussi la situation d’avoir un ensemble de documents à parsingr. Je commence par un document initial qui doit être traité, ce document contient des liens vers d’autres documents qui doivent également être traités, et ainsi de suite.

Dans mon programme principal, je veux juste écrire quelque chose comme ce qui suit, où Crawler contrôle un tas de threads.

 Crawler c = new Crawler(); c.schedule(seedDocument); c.waitUntilCompletion() 

La même situation se produirait si je voulais naviguer dans un arbre; Je placerais le nœud racine, le processeur de chaque nœud appendait des enfants à la queue si nécessaire, et un tas de threads traiterait tous les nœuds de l’arborescence, jusqu’à ce qu’il n’y en ait plus.

Je n’ai rien trouvé dans la JVM qui me semblait un peu surprenant. J’ai donc écrit une classe AutoStopThreadPool que l’on peut utiliser directement ou en sous-classe pour append des méthodes adaptées au domaine, par exemple la schedule(Document) . J’espère que cela aide!

AutoStopThreadPool Javadoc | Télécharger

Ajoutez tous les threads de la collection et soumettez-les en utilisant invokeAll . Si vous pouvez utiliser la méthode invokeAll d’ ExecutorService , la JVM ne passera pas à la ligne suivante tant que tous les threads ne sont pas terminés.

Voici un bon exemple: invokeAll via ExecutorService

Soumettez vos tâches dans le Runner , puis attendez d’appeler la méthode waitTillDone () comme ceci:

 Runner runner = Runner.runner(2); for (DataTable singleTable : uniquePhrases) { runner.run(new ComputeDTask(singleTable)); } // blocks until all tasks are finished (or failed) runner.waitTillDone(); runner.shutdown(); 

Pour l’utiliser, ajoutez cette dépendance gradle / maven: 'com.github.matejtymes:javafixes:1.0'

Pour plus de détails, regardez ici: https://github.com/MatejTymes/JavaFixes ou ici: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

Vous pouvez utiliser la méthode ExecutorService.invokeAll . Elle exécutera toutes les tâches et attendra que tous les threads aient terminé leur tâche.

Voici javadoc complet

Vous pouvez également utiliser la version surchargée de cette méthode pour spécifier le délai d’attente.

Voici un exemple de code avec ExecutorService.invokeAll

 public class Test { public static void main(Ssortingng[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(3); List> taskList = new ArrayList<>(); taskList.add(new Task1()); taskList.add(new Task2()); List> results = service.invokeAll(taskList); for (Future f : results) { System.out.println(f.get()); } } } class Task1 implements Callable { @Override public Ssortingng call() throws Exception { try { Thread.sleep(2000); return "Task 1 done"; } catch (Exception e) { e.printStackTrace(); return " error in task1"; } } } class Task2 implements Callable { @Override public Ssortingng call() throws Exception { try { Thread.sleep(3000); return "Task 2 done"; } catch (Exception e) { e.printStackTrace(); return " error in task2"; } } } 

Une alternative simple consiste à utiliser des threads avec join. Reportez-vous à: Joindre des threads

J’attendrai simplement que l’exécuteur se termine avec un délai spécifié que vous jugez approprié pour la réalisation des tâches.

  try { //do stuff here exe.execute(thread); } finally { exe.shutdown(); } boolean result = exe.awaitTermination(4, TimeUnit.HOURS); if (!result) { LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); } 

On dirait que vous avez besoin de ForkJoinPool et que vous utilisez le pool global pour exécuter des tâches.

 public static void main(Ssortingng[] args) { // the default `commonPool` should be sufficient for many cases. ForkJoinPool pool = ForkJoinPool.commonPool(); // The root of your task that may spawn other tasks. // Make sure it submits the additional tasks to the same executor that it is in. Runnable rootTask = new YourTask(pool); pool.execute(rootTask); pool.awaitQuiescence(...); // that's it. } 

La beauté réside dans pool.awaitQuiescence où la méthode bloquera l’utilisation du thread de l’appelant pour exécuter ses tâches, puis la renverra lorsqu’elle sera vraiment vide.

Vous pouvez utiliser un ThreadPoolExecutor avec une taille de pool définie sur Runtime.getRuntime (). AvailableProcessors () et .execute() , toutes les tâches que vous souhaitez exécuter, puis appeler tpe.shutdown() et attendre quelques instants while(!tpe.terminated()) { /* waiting for all tasks to complete */} qui bloque toutes les tâches soumises. tpe est une référence à votre instance ThreadPoolExecutor .

Ou, si cela est plus approprié, utilisez un service ExecutorCompletionService CompletionService qui utilise un exécuteur fourni pour exécuter des tâches. Cette classe dispose que les tâches soumises sont, une fois terminées, placées dans une queue accessible à l’aide de la prise. La classe est suffisamment légère pour pouvoir être utilisée de manière transitoire lors du traitement de groupes de tâches.

REMARQUE: cette solution ne bloque pas comme ExecutorService.invokeAll() en attendant que tous les travaux se terminent. Donc, même si le plus simple est aussi le plus limité car il va bloquer le thread principal de l’application. Ce n’est pas bien si vous voulez connaître l’état de la situation ou faire d’autres choses pendant l’exécution de ces tâches, comme le post-traitement des résultats ou la production de résultats agrégés incrémentiels.