Comment un threadpool peut-il être réutilisé après l’arrêt

J’ai un fichier .csv contenant plus de 70 millions de lignes dont chaque ligne doit générer un Runnable puis être exécutée par threadpool. Ce Runnable insérera un enregistrement dans Mysql.

De plus, je veux enregistrer une position du fichier csv pour le RandomAccessFile à localiser. La position est écrite dans un fichier. Je veux écrire cet enregistrement lorsque tous les threads de threadpool sont terminés. Ainsi, ThreadPoolExecutor.shutdown () est appelé. Mais lorsque plus de lignes arrivent, j’ai besoin d’un threadpool à nouveau. Comment puis-je réutiliser ce threadpool au lieu d’en créer un nouveau?

Le code est comme suit:

public static boolean processPage() throws Exception { long pos = getPosition(); long start = System.currentTimeMillis(); raf.seek(pos); if(pos==0) raf.readLine(); for (int i = 0; i < PAGESIZE; i++) { String lineStr = raf.readLine(); if (lineStr == null) return false; String[] line = lineStr.split(","); final ExperienceLogDO log = CsvExperienceLog.generateLog(line); //System.out.println("userId: "+log.getUserId()%512); pool.execute(new Runnable(){ public void run(){ try { experienceService.insertExperienceLog(log); } catch (BaseException e) { e.printStackTrace(); } } }); long end = System.currentTimeMillis(); } BufferedWriter resultWriter = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(new File( RESULT_FILENAME), true))); resultWriter.write("\n"); resultWriter.write(String.valueOf(raf.getFilePointer())); resultWriter.close(); long time = System.currentTimeMillis()-start; System.out.println(time); return true; } 

Merci !

Comme indiqué dans la documentation , vous ne pouvez pas réutiliser un ExecutorService qui a été arrêté. Je déconseille toute solution de contournement , car (a) ils peuvent ne pas fonctionner comme prévu dans toutes les situations; et (b) vous pouvez obtenir ce que vous voulez en utilisant des classes standard.

Vous devez soit

  1. instancier un nouveau ExecutorService ; ou

  2. ne pas mettre fin à l’ ExecutorService .

La première solution est facilement implémentée, donc je ne vais pas la détailler.

Pour la seconde, puisque vous voulez exécuter une action une fois que toutes les tâches soumises sont terminées, vous pouvez consulter ExecutorCompletionService et l’utiliser à la place. Il encapsule un ExecutorService qui fera la gestion des threads, mais les exécutables seront intégrés à quelque chose qui indiquera à ExecutorCompletionService quand ils auront fini, de sorte qu’il puisse vous rendre compte:

 ExecutorService executor = ...; ExecutorCompletionService ecs = new ExecutorCompletionService(executor); for (int i = 0; i < totalTasks; i++) { ... ecs.submit(...); ... } for (int i = 0; i < totalTasks; i++) { ecs.take(); } 

La méthode take() de la classe ExecutorCompletionService sera bloquée jusqu'à ce qu'une tâche soit terminée (normalement ou brutalement). Il retournera un Future , vous pouvez donc vérifier les résultats si vous le souhaitez.

J'espère que cela peut vous aider, car je n'ai pas complètement compris votre problème.

créer et regrouper toutes les tâches et les soumettre au pool avec invokeAll (qui ne retourne que lorsque toutes les tâches sont terminées avec succès)

Après avoir appelé l’arrêt sur un ExecutorService, aucune nouvelle tâche ne sera acceptée . Cela signifie que vous devez créer un nouveau ExecutorService pour chaque cycle de tâches.

Cependant, avec Java 8, ForkJoinPool.awaitQuiescence a été introduit. Si vous pouvez passer d’un ExecutorService normal à ForkJoinPool, vous pouvez utiliser cette méthode pour attendre que plus aucune tâche ne s’exécute dans un ForkJoinPool sans avoir à appeler shutdown. Cela signifie que vous pouvez remplir un ForkJoinPool avec des tâches, en attendant qu’il soit vide (quiescent), puis recommencez à le remplir avec Tasks, etc.