Comment puis-je implémenter la hiérarchisation des tâches à l’aide d’un ExecutorService dans Java 5?

Je suis en train de mettre en place un mécanisme de pool de threads dans lequel je voudrais exécuter des tâches de différentes priorités. Je voudrais avoir un bon mécanisme par lequel je peux soumettre une tâche hautement prioritaire au service et le faire programmer avant d’autres tâches. La priorité de la tâche est une propriété insortingnsèque de la tâche elle-même (savoir si j’exprime cette tâche en tant que Callable ou Runnable n’est pas important pour moi).

Maintenant, superficiellement, il semblerait que je puisse utiliser PriorityBlockingQueue comme queue de tâches dans ThreadPoolExecutor , mais cette file contient des objects Runnable , qui peuvent être ou non les tâches Runnable je lui ai soumises. De plus, si j’ai envoyé des tâches Callable , il n’est pas clair comment cela pourrait jamais être mappé.

Y a-t-il un moyen de faire cela? Je préférerais vraiment ne pas rouler comme ça, car je suis beaucoup plus susceptible de me tromper de cette façon.

(À part, oui, je suis au courant de la possibilité de famine pour des emplois moins prioritaires dans un domaine comme celui-ci. Des points supplémentaires (?!) Pour des solutions qui garantissent raisonnablement l’équité)

À première vue, il semblerait que vous puissiez définir une interface pour vos tâches qui étend Runnable ou Callable et Comparable . Enveloppez ensuite un ThreadPoolExecutor avec un PriorityBlockingQueue comme queue et n’acceptez que les tâches qui implémentent votre interface.

Compte tenu de votre commentaire, il semble qu’une option consiste à étendre ThreadPoolExecutor et à remplacer les méthodes submit() . Reportez-vous à AbstractExecutorService pour voir à quoi ressemblent les parameters par défaut; tout ce qu’ils font, c’est d’envelopper Runnable ou Callable dans une FutureTask et de l’ execute() . Je le ferais probablement en écrivant une classe wrapper qui implémente ExecutorService et délègue à un ThreadPoolExecutor interne anonyme. Enveloppez-les dans quelque chose qui a votre priorité, afin que votre Comparator puisse y accéder.

J’ai résolu ce problème de manière raisonnable, et je le décrirai ci-dessous pour référence ultérieure à moi-même et à toute personne qui rencontre ce problème avec les bibliothèques Java Concurrent.

Utiliser PriorityBlockingQueue comme moyen de conserver des tâches pour une exécution ultérieure est en effet un mouvement dans la bonne direction. Le problème est que PriorityBlockingQueue doit être instancié de manière générique pour contenir des instances Runnable , et il est impossible d’appeler compareTo (ou similaire) sur une interface Runnable .

Pour résoudre le problème. Lors de la création de l’exécuteur, il faut lui atsortingbuer un PriorityBlockingQueue . La queue devrait en outre se voir atsortingbuer un comparateur personnalisé pour effectuer le sorting approprié:

 new PriorityBlockingQueue(size, new CustomTaskComparator()); 

Maintenant, un coup d’œil à CustomTaskComparator :

 public class CustomTaskComparator implements Comparator { @Override public int compare(MyType first, MyType second) { return comparison; } } 

Tout semble assez simple jusqu’à présent. Ça devient un peu collant ici. Notre prochain problème consiste à gérer la création de FutureTasks à partir de l’exécuteur. Dans l’Executor, nous devons remplacer newTaskFor comme newTaskFor :

 @Override protected  RunnableFuture newTaskFor(Callable c) { //Override the default FutureTask creation and retrofit it with //a custom task. This is done so that prioritization can be accomplished. return new CustomFutureTask(c); } 

c est la tâche Callable que nous essayons d’exécuter. Maintenant, regardons CustomFutureTask :

 public class CustomFutureTask extends FutureTask { private CustomTask task; public CustomFutureTask(Callable callable) { super(callable); this.task = (CustomTask) callable; } public CustomTask getTask() { return task; } } 

Notez la méthode getTask . Nous allons l’utiliser plus tard pour récupérer la tâche originale de cette CustomFutureTask que nous avons créée.

Et enfin, modifions la tâche d’origine que nous essayions d’exécuter:

 public class CustomTask implements Callable, Comparable { private final MyType myType; public CustomTask(MyType myType) { this.myType = myType; } @Override public MyType call() { //Do some things, return something for FutureTask implementation of `call`. return myType; } @Override public int compareTo(MyType task2) { return new CustomTaskComparator().compare(this.myType, task2.myType); } } 

Vous pouvez voir que nous implémentons Comparable dans la tâche à déléguer au Comparator actuel pour MyType .

Et voilà, une hiérarchisation personnalisée pour un exécuteur utilisant les bibliothèques Java! Il faut un peu de flexion, mais c’est le plus propre que j’ai pu trouver. J’espère que cela sera utile à quelqu’un!

Vous pouvez utiliser ces classes d’assistance:

 public class PriorityFuture implements RunnableFuture { private RunnableFuture src; private int priority; public PriorityFuture(RunnableFuture other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(timeout, unit); } public void run() { src.run(); } public static Comparator COMP = new Comparator() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture) o1).getPriority(); int p2 = ((PriorityFuture) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; } 

ET

 public interface PriorityCallable extends Callable { int getPriority(); } 

ET cette méthode d’assistance:

 public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(10, PriorityFuture.COMP)) { protected  RunnableFuture newTaskFor(Callable callable) { RunnableFuture newTaskFor = super.newTaskFor(callable); return new PriorityFuture(newTaskFor, ((PriorityCallable) callable).getPriority()); } }; } 

ET ensuite l’utiliser comme ceci:

 class LenthyJob implements PriorityCallable { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } } 

Je vais essayer d’expliquer ce problème avec un code entièrement fonctionnel. Mais avant de plonger dans le code, je voudrais expliquer à propos de PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue est une implémentation de BlockingQueue. Il accepte les tâches avec leur priorité et soumet la tâche avec la plus haute priorité pour exécution en premier. Si deux tâches ont la même priorité, nous devons fournir une logique personnalisée pour décider quelle tâche doit être exécutée en premier.

Maintenant, passons directement au code.

Classe de pilote : cette classe crée un exécuteur qui accepte les tâches et les soumet ultérieurement pour exécution. Ici, nous créons deux tâches l’une avec une priorité LOW et l’autre avec une priorité HIGH. Ici, nous demandons à l’exécuteur d’exécuter un MAX de 1 threads et d’utiliser PriorityBlockingQueue.

  public static void main(Ssortingng[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.MEDIUM,"Medium"); executor.execute(new MyFutureTask(task)); } 

Classe MyTask : MyTask implémente Runnable et accepte la priorité comme argument dans le constructeur. Lorsque cette tâche est exécutée, elle imprime un message, puis met le thread en veille pendant 1 seconde.

  public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public Ssortingng getName() { return name; } private Ssortingng name; public MyTask(Priority priority,Ssortingng name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 

Classe MyFutureTask : Puisque nous utilisons PriorityBlocingQueue pour conserver nos tâches, nos tâches doivent être encapsulées dans FutureTask et notre implémentation de FutureTask doit implémenter l’interface Comparable. L’interface Comparable compare la priorité de 2 tâches différentes et soumet la tâche avec la priorité la plus élevée pour exécution.

  public class MyFutureTask extends FutureTask implements Comparable { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } } 

Classe de priorité : Classe de priorité explicite.

 public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } } 

Maintenant, lorsque nous exécutons cet exemple, nous obtenons la sortie suivante

 The following Runnable is getting executed High The following Runnable is getting executed Medium The following Runnable is getting executed Low 

Même si nous avons d’abord soumis la priorité LOW, mais plus tard la tâche de priorité HIGH, mais comme nous utilisons une PriorityBlockingQueue, toute tâche avec une priorité plus élevée sera exécutée en premier.

Ma solution préserve l’ordre de soumission des tâches pour les mêmes priorités. C’est une amélioration de cette réponse

L’ordre d’exécution des tâches est basé sur:

  1. Priorité
  2. Soumettre la commande (dans la même priorité)

Classe testeur:

 public class Main { public static void main(Ssortingng[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); //Priority=0 executorService.submit(newCallable("A1", 200)); //Defaults to priority=0 executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); //Priority=1 executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); executorService.shutdown(); } private static Runnable newRunnable(Ssortingng name, int delay) { return new Runnable() { @Override public void run() { System.out.println(name); sleep(delay); } }; } private static Callable newCallable(Ssortingng name, int delay) { return new Callable() { @Override public Integer call() throws Exception { System.out.println(name); sleep(delay); return 10; } }; } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } } 

Résultat:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

La première tâche est A1 car il n’y avait pas de priorité plus élevée dans la queue lors de son insertion. Les tâches B sont 1 priorité ainsi exécutée précédemment, les tâches A ont une priorité 0 donc exécutées ultérieurement, mais l’ordre d’exécution est le suivant: B1, B2, B3, … A2, A3, A4 …

La solution:

 public class PriorityExecutors { public static ExecutorService newFixedThreadPool(int nThreads) { return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); } private static class PriorityExecutor extends ThreadPoolExecutor { private static final int DEFAULT_PRIORITY = 0; private static AtomicLong instanceCounter = new AtomicLong(); @SuppressWarnings({"unchecked"}) public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue(10, ComparableTask.comparatorByPriorityAndSequentialOrder())); } @Override public void execute(Runnable command) { // If this is ugly then delegator pattern needed if (command instanceof ComparableTask) //Already wrapped super.execute(command); else { super.execute(newComparableRunnableFor(command)); } } private Runnable newComparableRunnableFor(Runnable runnable) { return new ComparableRunnable(ensurePriorityRunnable(runnable)); } @Override protected  RunnableFuture newTaskFor(Callable callable) { return new ComparableFutureTask<>(ensurePriorityCallable(callable)); } @Override protected  RunnableFuture newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); } private  PriorityCallable ensurePriorityCallable(Callable callable) { return (callable instanceof PriorityCallable) ? (PriorityCallable) callable : PriorityCallable.of(callable, DEFAULT_PRIORITY); } private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); } private class ComparableFutureTask extends FutureTask implements ComparableTask { private Long sequentialOrder = instanceCounter.getAndIncrement(); private HasPriority hasPriority; public ComparableFutureTask(PriorityCallable priorityCallable) { super(priorityCallable); this.hasPriority = priorityCallable; } public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { super(priorityRunnable, result); this.hasPriority = priorityRunnable; } @Override public long getInstanceCount() { return sequentialOrder; } @Override public int getPriority() { return hasPriority.getPriority(); } } private static class ComparableRunnable implements Runnable, ComparableTask { private Long instanceCount = instanceCounter.getAndIncrement(); private HasPriority hasPriority; private Runnable runnable; public ComparableRunnable(PriorityRunnable priorityRunnable) { this.runnable = priorityRunnable; this.hasPriority = priorityRunnable; } @Override public void run() { runnable.run(); } @Override public int getPriority() { return hasPriority.getPriority(); } @Override public long getInstanceCount() { return instanceCount; } } private interface ComparableTask extends Runnable { int getPriority(); long getInstanceCount(); public static Comparator comparatorByPriorityAndSequentialOrder() { return (o1, o2) -> { int priorityResult = o2.getPriority() - o1.getPriority(); return priorityResult != 0 ? priorityResult : (int) (o1.getInstanceCount() - o2.getInstanceCount()); }; } } } private static interface HasPriority { int getPriority(); } public interface PriorityCallable extends Callable, HasPriority { public static  PriorityCallable of(Callable callable, int priority) { return new PriorityCallable() { @Override public V call() throws Exception { return callable.call(); } @Override public int getPriority() { return priority; } }; } } public interface PriorityRunnable extends Runnable, HasPriority { public static PriorityRunnable of(Runnable runnable, int priority) { return new PriorityRunnable() { @Override public void run() { runnable.run(); } @Override public int getPriority() { return priority; } }; } } } 

Serait-il possible d’avoir un ThreadPoolExecutor pour chaque niveau de priorité? Un ThreadPoolExecutor peut être instancié avec un ThreadFactory et vous pourriez avoir votre propre implémentation d’un ThreadFactory pour définir les différents niveaux de priorité.

  class MaxPriorityThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setPriority(Thread.MAX_PRIORITY); } }