Akka Actor ne se termine pas si une exception est levée

J’essaie actuellement de commencer avec Akka et je suis confronté à un problème étrange. J’ai le code suivant pour mon acteur:

class AkkaWorkerFT extends Actor { def receive = { case Work(n, c) if n  throw new Exception("Negative number") case Work(n, c) => self reply n.isProbablePrime(c); } } 

Et voici comment je commence mes travailleurs:

 val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start()); val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start() 

Et voici comment je ferme tout:

 futures.foreach( _.await ) router ! Broadcast(PoisonPill) router ! PoisonPill 

Maintenant, ce qui se passe est que si j’envoie des messages aux travailleurs avec n> 0 (aucune exception n’est levée), tout fonctionne correctement et l’application s’arrête correctement. Cependant, dès que je lui envoie un seul message qui génère une exception, l’application ne se termine pas car il y a encore un acteur en cours d’exécution, mais je n’arrive pas à comprendre d’où cela vient.

Au cas où cela vous aiderait, voici la stack du thread en question:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) Unsafe.park(boolean, long) line: not available [native method] LockSupport.park(Object) line: 158 AbstractQueuedSynchronizer$ConditionObject.await() line: 1987 LinkedBlockingQueue.take() line: 399 ThreadPoolExecutor.getTask() line: 947 ThreadPoolExecutor$Worker.run() line: 907 MonitorableThread(Thread).run() line: 680 MonitorableThread.run() line: 182 

PS: le thread qui ne se termine pas n’est aucun des threads de travail, car j’ai ajouté un rappel postStop, chacun s’arrête correctement.

PPS: Actors.registry.shutdownAll solutions de contournement au problème, mais je pense que shutdownAll ne devrait être utilisé qu’en dernier recours, n’est-ce pas?

    La manière appropriée de gérer les problèmes au sein des acteurs Akka n’est pas de lancer une exception mais de définir des hiérarchies de superviseurs.

    “Lancer une exception en code concurrent (supposons que nous utilisions des acteurs non liés) fera simplement sauter le thread qui exécute l’acteur.

    Il n’y a aucun moyen de savoir que les choses ont mal tourné (hormis l’inspection de la trace de la stack). Vous ne pouvez rien y faire. ”

    voir Tolérance aux pannes à travers les hiérarchies du superviseur (1.2)

    * note * ce qui précède est vrai pour les anciennes versions d’Akka (1.2) Dans les nouvelles versions (par exemple, 2.2), vous définiriez toujours une hiérarchie de superviseur, mais cela piégerait les exceptions lancées par les processus enfants. par exemple

     class Child extends Actor { var state = 0 def receive = { case ex: Exception ⇒ throw ex case x: Int ⇒ state = x case "get" ⇒ sender ! state } } 

    et chez le superviseur:

     class Supervisor extends Actor { import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfResortinges = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate } def receive = { case p: Props ⇒ sender ! context.actorOf(p) } } 

    voir Tolérance aux pannes par le biais des hiérarchies de superviseur (2.2)

    Désactiver la journalisation pour vous assurer que les choses se terminent, comme proposé par Viktor, est un peu étrange. Ce que vous pouvez faire à la place est:

     EventHandler.shutdown() 

    qui ferme proprement tous les écouteurs (logger) qui font tourner le monde après l’exception:

     def shutdown() { foreachListener(_.stop()) EventHandlerDispatcher.shutdown() } 

    Tour de l’enregistreur dans akka.conf