Comment puis-je exécuter plusieurs tâches dans Scala?

J’ai 50 000 tâches et je veux les exécuter avec 10 threads. En Java, je devrais créer Executers.threadPool (10) et passer runnable à est alors attendre pour traiter tout. Scala, si je comprends bien, utile pour cette tâche, mais je ne trouve pas de solution dans Docs.

Scala 2.9.3 et versions ultérieures

L’approche la plus simple consiste à utiliser la classe scala.concurrent.Future et l’infrastructure associée. La méthode scala.concurrent.future évalue de manière asynchrone le bloc qui lui est transmis et retourne immédiatement un Future[A] représentant le calcul asynchrone. Les contrats à terme peuvent être manipulés de différentes manières, y compris le mappage, le flatMapping, le filtrage, la récupération des erreurs, etc.

Par exemple, voici un exemple qui crée 10 tâches, où chaque tâche dort une quantité de temps arbitraire et renvoie ensuite le carré de la valeur qui lui est transmise.

 import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future { println("Executing task " + i) Thread.sleep(i * 1000L) i * i } val aggregated: Future[Seq[Int]] = Future.sequence(tasks) val squares: Seq[Int] = Await.result(aggregated, 15.seconds) println("Squares: " + squares) 

Dans cet exemple, nous créons d’abord une séquence de tâches asynchrones individuelles qui, une fois terminées, fournissent un int. Nous utilisons ensuite Future.sequence pour combiner ces tâches asynchrones en une seule tâche asynchrone - en échangeant la position du Future et de la Seq dans le type. Enfin, nous bloquons le thread en cours pendant 15 secondes en attendant le résultat. Dans l'exemple, nous utilisons le contexte d'exécution global, qui est soutenu par un pool de threads fork / join. Pour des exemples non sortingviaux, vous voudrez probablement utiliser un ExecutionContext spécifique à l'application.

Généralement, le blocage doit être évité autant que possible. Il existe d'autres combinateurs disponibles sur la classe Future qui peuvent aider à programmer dans un style asynchrone, y compris onSuccess , onFailure et onComplete .

En outre, envisagez d'étudier la bibliothèque Akka , qui fournit la simultanéité basée sur les acteurs pour Scala et Java, et interagit avec scala.concurrent .

Scala 2.9.2 et avant

Cette approche la plus simple consiste à utiliser la classe Future de Scala, qui est un sous-composant du cadre Actors. La méthode scala.actors.Futures.future crée un Future pour le bloc qui lui est transmis. Vous pouvez ensuite utiliser scala.actors.Futures.awaitAll pour attendre que toutes les tâches soient terminées.

Par exemple, voici un exemple qui crée 10 tâches, où chaque tâche dort une quantité de temps arbitraire et renvoie ensuite le carré de la valeur qui lui est transmise.

 import scala.actors.Futures._ val tasks = for (i <- 1 to 10) yield future { println("Executing task " + i) Thread.sleep(i * 1000L) i * i } val squares = awaitAll(20000L, tasks: _*) println("Squares: " + squares) 

Vous voulez regarder la bibliothèque d’acteurs Scala ou Akka. Akka a une syntaxe plus propre, mais l’un ou l’autre fera l’affaire.

Il semble donc que vous ayez besoin de créer un pool d’acteurs qui savent comment traiter vos tâches. Un acteur peut être n’importe quelle classe avec une méthode de réception – du tutoriel Akka ( http://doc.akkasource.org/tutorial-chat-server-scala ):

 class MyActor extends Actor { def receive = { case "test" => println("received test") case _ => println("received unknown message") }} val myActor = Actor.actorOf[MyActor] myActor.start 

Vous souhaiterez créer un pool d’instances d’acteurs et leur envoyer vos tâches en tant que messages. Voici un article sur le pool d’acteurs Akka qui peut être utile: http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/

Dans votre cas, un acteur par tâche peut être approprié (les acteurs sont extrêmement légers par rapport aux threads, donc vous pouvez en avoir BEAUCOUP dans une seule VM) ou vous pouvez avoir besoin d’un équilibrage de charge plus sophistiqué entre eux.

EDIT: En utilisant l’exemple ci-dessus, envoyer un message est aussi simple que cela:

 myActor ! "test" 

L’acteur va alors sortir “test reçu” à la sortie standard.

Les messages peuvent être de tout type et, combinés à la correspondance de modèle de Scala, vous disposez d’un modèle puissant pour créer des applications simultanées flexibles.

En général, les acteurs d’Akka feront “le bon choix” en termes de partage de threads, et pour les besoins du PO, j’imagine que les défauts sont corrects. Mais si vous en avez besoin, vous pouvez définir le répartiteur que l’acteur doit utiliser sur plusieurs types:

 * Thread-based * Event-based * Work-stealing * HawtDispatch-based event-driven 

Il est sortingvial de placer un répartiteur pour un acteur:

 class MyActor extends Actor { self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch") .withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(10) .setMaxPoolSize(10) .setKeepAliveTimeInMillis(10000) .build } 

Voir http://doc.akkasource.org/dispatchers-scala

De cette façon, vous pourriez limiter la taille du pool de threads, mais là encore, le cas d’utilisation original pourrait probablement être satisfait avec des instances d’acteur Akka 50K utilisant des répartiteurs par défaut et il serait parfaitement parallèle.

Cela ne fait qu’effleurer ce que peut faire Akka. Cela apporte beaucoup de ce que Erlang offre à la langue Scala. Les acteurs peuvent surveiller d’autres acteurs et les redémarrer, créant ainsi des applications auto-réparasortingces. Akka fournit également la mémoire transactionnelle logicielle et de nombreuses autres fonctionnalités. C’est sans doute la “killer app” ou “killer framework” pour Scala.

Si vous voulez “les exécuter avec 10 threads”, utilisez les threads. Le modèle d’acteur de Scala, qui est généralement ce dont les gens parlent lorsqu’ils disent que Scala est bon pour la concurrence, cache ces détails pour que vous ne les voyiez pas.

Utiliser des acteurs ou des futurs avec tout ce que vous avez, ce sont des calculs simples, il vous suffit d’en créer 50000 et de les laisser fonctionner. Vous pourriez être confronté à des problèmes, mais ils sont de nature différente.

Voici une autre réponse similaire à la réponse de mpilquist mais sans API obsolète et incluant les parameters de thread via un contexte d’exécution personnalisé:

 import java.util.concurrent.Executors import scala.concurrent.{ExecutionContext, Await, Future} import scala.concurrent.duration._ val numJobs = 50000 var numThreads = 10 // customize the execution context to use the specified number of threads implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads)) // define the tasks val tasks = for (i <- 1 to numJobs) yield Future { // do something more fancy here i } // aggregate and wait for final result val aggregated = Future.sequence(tasks) val oneToNSum = Await.result(aggregated, 15.seconds).sum