Comment démarrer avec Akka Streams?

La bibliothèque Akka Streams est déjà riche en documentation . Cependant, le principal problème pour moi est qu’il fournit trop de matériel – je me sens plutôt dépassé par le nombre de concepts que je dois apprendre. Beaucoup d’exemples présentés ici sont très lourds et ne peuvent pas facilement être traduits dans des cas d’utilisation réels et sont donc assez ésotériques. Je pense que cela donne beaucoup trop de détails sans expliquer comment construire tous les blocs de construction ensemble et comment cela aide à résoudre des problèmes spécifiques.

Il y a des sources, des puits, des stream, des étapes de graphes, des graphes partiels, une matérialisation, un graphe DSL et beaucoup plus et je ne sais pas par où commencer. Le guide de démarrage rapide est censé être un sharepoint départ mais je ne le comprends pas. Il ne fait que jeter les concepts mentionnés ci-dessus sans les expliquer. De plus, les exemples de code ne peuvent pas être exécutés – il manque des parties, ce qui rend plus ou moins impossible le suivi du texte.

Est-ce que n’importe qui peut expliquer les concepts sources, puits, stream, étapes de graphe, graphes partiels, matérialisation et peut-être d’autres choses que j’ai manquées avec des mots simples et des exemples simples qui n’expliquent pas tous les détails? le début)?

Cette réponse est basée sur la version 2.4.2 akka-stream . L’API peut être légèrement différente dans les autres versions. La dépendance peut être consommée par sbt :

 libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2" 

Bon, commençons. L’API d’Akka Streams comprend trois types principaux. Contrairement aux stream réactifs , ces types sont beaucoup plus puissants et donc plus complexes. On suppose que pour tous les exemples de code, les définitions suivantes existent déjà:

 import scala.concurrent._ import akka._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.util._ implicit val system = ActorSystem("TestSystem") implicit val materializer = ActorMaterializer() import system.dispatcher 

Les instructions d’ import sont nécessaires pour les déclarations de type. system représente le système acteur d’Akka et materializer représente le contexte d’évaluation du stream. Dans notre cas, nous utilisons un ActorMaterializer , ce qui signifie que les stream sont évalués par-dessus les acteurs. Les deux valeurs sont marquées comme implicit , ce qui donne au compilateur Scala la possibilité d’injecter ces deux dépendances automatiquement chaque fois qu’elles sont nécessaires. Nous importons également system.dispatcher , qui est un contexte d’exécution pour Futures .

Une nouvelle API

Akka Streams possède les propriétés clés suivantes:

  • Ils implémentent la spécification Reactive Streams , dont les trois objectives principaux sont la contre-pression, les limites asynchrones et non-bloquantes et l’interopérabilité entre les différentes implémentations.
  • Ils fournissent une abstraction pour un moteur d’évaluation pour les stream, appelée Materializer .
  • Les programmes sont formulés sous forme de blocs de construction réutilisables, représentés par les trois types principaux Source , Sink et Flow . Les blocs de construction forment un graphique dont l’évaluation est basée sur le Materializer et doit être explicitement déclenchée.

Dans ce qui suit, une introduction plus approfondie sur l’utilisation des trois types principaux doit être donnée.

La source

Une Source est un créateur de données, elle sert de source d’entrée au stream. Chaque Source possède un seul canal de sortie et aucun canal d’entrée. Toutes les données transitent par le canal de sortie vers tout ce qui est connecté à la Source .

La source

Image tirée de boldradius.com .

Une Source peut être créée de plusieurs manières:

 scala> val s = Source.empty s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ... scala> val s = Source.single("single element") s: akka.stream.scaladsl.Source[Ssortingng,akka.NotUsed] = ... scala> val s = Source(1 to 3) s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val s = Source(Future("single value from a Future")) s: akka.stream.scaladsl.Source[Ssortingng,akka.NotUsed] = ... scala> s runForeach println res0: scala.concurrent.Future[akka.Done] = ... single value from a Future 

Dans les cas ci-dessus, nous avons alimenté la Source avec des données finies, ce qui signifie qu’elles se termineront éventuellement. Il ne faut pas oublier que les stream réactifs sont paresseux et asynchrones par défaut. Cela signifie qu’il faut explicitement demander l’évaluation du stream. Dans Akka Streams, cela peut se faire via les méthodes run* . Le runForeach ne serait pas différent de la fonction foreach bien connue – grâce à l’ajout d’ run il est explicite que nous demandons une évaluation du stream. Les données finies étant ennuyeuses, on continue avec une infinie:

 scala> val s = Source.repeat(5) s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> s take 3 runForeach println res1: scala.concurrent.Future[akka.Done] = ... 5 5 5 

Avec la méthode de take , nous pouvons créer un point d’arrêt artificiel qui nous empêche d’évaluer indéfiniment. Étant donné que le support des acteurs est intégré, nous pouvons également facilement alimenter le stream avec des messages envoyés à un acteur:

 def run(actor: ActorRef) = { Future { Thread.sleep(300); actor ! 1 } Future { Thread.sleep(200); actor ! 2 } Future { Thread.sleep(100); actor ! 3 } } val s = Source .actorRef[Int](bufferSize = 0, OverflowStrategy.fail) .mapMaterializedValue(run) scala> s runForeach println res1: scala.concurrent.Future[akka.Done] = ... 3 2 1 

Nous pouvons voir que les Futures sont exécutés de manière asynchrone sur différents threads, ce qui explique le résultat. Dans l’exemple ci-dessus, un tampon pour les éléments entrants n’est pas nécessaire et, par conséquent, avec OverflowStrategy.fail nous pouvons configurer le stream en cas d’échec du tampon. Surtout via cette interface d’acteur, nous pouvons alimenter le stream via n’importe quelle source de données. Peu importe si les données sont créées par le même thread, par un autre, par un autre processus ou si elles proviennent d’un système distant via Internet.

Évier

Un Sink est fondamentalement le contraire d’une Source . C’est le point final d’un stream et consum donc des données. Un Sink possède un seul canal d’entrée et aucun canal de sortie. Sinks sont particulièrement nécessaires lorsque nous souhaitons spécifier le comportement du collecteur de données de manière réutilisable et sans évaluer le stream. Les méthodes run* déjà connues ne nous permettent pas ces propriétés, il est donc préférable d’utiliser Sink place.

Évier

Image tirée de boldradius.com .

Un petit exemple d’un Sink en action:

 scala> val source = Source(1 to 3) source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem")) sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ... scala> val flow = source to sink flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> flow.run() res3: akka.NotUsed = NotUsed sink received: 1 sink received: 2 sink received: 3 

La connexion d’une Source à un Sink peut être effectuée avec la méthode to . Il retourne un ” RunnableFlow , comme nous verrons plus tard une forme spéciale de Flow – un stream qui peut être exécuté en appelant simplement sa méthode run() .

Flux runable

Image tirée de boldradius.com .

Il est bien entendu possible de transmettre toutes les valeurs qui parviennent à un évier à un acteur:

 val actor = system.actorOf(Props(new Actor { override def receive = { case msg => println(s"actor received: $msg") } })) scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed") sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ... scala> val runnable = Source(1 to 3) to sink runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> runnable.run() res3: akka.NotUsed = NotUsed actor received: 1 actor received: 2 actor received: 3 actor received: stream completed 

Couler

Les sources de données et les puits sont parfaits si vous avez besoin d’une connexion entre les stream Akka et un système existant, mais vous ne pouvez vraiment rien faire avec eux. Les stream sont la dernière pièce manquante dans l’abstraction de base des Akka Streams. Ils agissent comme un connecteur entre différents stream et peuvent être utilisés pour transformer ses éléments.

Couler

Image tirée de boldradius.com .

Si un Flow est connecté à une Source le résultat est une nouvelle Source . De même, un Flow connecté à un Sink crée un nouvel Sink . Et un Flow connecté à la fois à une Source et à un Sink génère un Flow RunnableFlow . Par conséquent, ils se situent entre le canal d’entrée et le canal de sortie, mais ne correspondent pas, par eux-mêmes, à l’une des variantes, du moment qu’ils ne sont pas connectés à une Source ou à un Sink .

Stream complet

Image tirée de boldradius.com .

Afin de mieux comprendre les Flows , nous examinerons quelques exemples:

 scala> val source = Source(1 to 3) source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val sink = Sink.foreach[Int](println) sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ... scala> val invert = Flow[Int].map(elem => elem * -1) invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ... scala> val doubler = Flow[Int].map(elem => elem * 2) doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ... scala> val runnable = source via invert via doubler to sink runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> runnable.run() res10: akka.NotUsed = NotUsed -2 -4 -6 

Via la méthode via , nous pouvons connecter une Source avec un Flow . Nous devons spécifier le type d’entrée car le compilateur ne peut pas le déduire pour nous. Comme nous pouvons déjà le voir dans cet exemple simple, les stream invert et double sont complètement indépendants de tout producteur et consommateur de données. Ils ne transforment que les données et les transmettent au canal de sortie. Cela signifie que nous pouvons réutiliser un stream parmi plusieurs stream:

 scala> val s1 = Source(1 to 3) via invert to sink s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> val s2 = Source(-3 to -1) via invert to sink s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> s1.run() res10: akka.NotUsed = NotUsed -1 -2 -3 scala> s2.run() res11: akka.NotUsed = NotUsed 3 2 1 

s1 et s2 représentent des stream complètement nouveaux – ils ne partagent aucune donnée via leurs blocs de construction.

Flux de données non liés

Avant de passer à autre chose, nous devrions d’abord revoir certains des aspects clés des stream réactifs. Un nombre illimité d’éléments peut arriver à n’importe quel point et peut placer un stream dans différents états. En plus d’un stream exécutable, qui est l’état habituel, un stream peut être arrêté soit par une erreur, soit par un signal indiquant qu’aucune autre donnée n’arrivera. Un stream peut être modélisé de manière graphique en marquant des événements sur une ligne de temps, comme c’est le cas ici:

Montre qu'un flux est une séquence d'événements en cours ordonnés dans le temps

Image de l’introduction à la programmation réactive que vous avez manquée .

Nous avons déjà vu des stream exécutables dans les exemples de la section précédente. Nous obtenons un RunnableGraph chaque fois qu’un stream peut réellement être matérialisé, ce qui signifie qu’un Sink est connecté à une Source . Jusqu’à présent, nous avons toujours matérialisé l’ Unit valeur, que l’on peut voir dans les types:

 val source: Source[Int, NotUsed] = Source(1 to 3) val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println) val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x) 

Pour le paramètre Source et Sink le paramètre de deuxième type et pour le paramètre de troisième Flow la valeur matérialisée. Tout au long de cette réponse, le sens complet de la matérialisation ne doit pas être expliqué. Toutefois, vous trouverez plus de détails sur la matérialisation sur la documentation officielle . Pour l’instant, la seule chose que nous devons savoir, c’est que la valeur matérialisée est ce que nous obtenons lorsque nous exécutons un stream. Comme nous étions seulement intéressés par les effets secondaires jusqu’à présent, nous avons obtenu l’ Unit comme valeur matérialisée. L’exception à cela était une matérialisation d’un évier, qui a abouti à un Future . Cela nous a redonné un Future , car cette valeur peut indiquer que le stream connecté au récepteur a été arrêté. Jusqu’à présent, les exemples de code précédents étaient intéressants pour expliquer le concept, mais ils étaient aussi ennuyeux car nous ne traitions que des stream finis ou des nombres infinis très simples. Pour le rendre plus intéressant, un stream asynchrone complet et non lié doit être expliqué ci-après.

Exemple ClickStream

Par exemple, nous voulons avoir un stream qui capture les événements de clic. Pour rendre les choses plus difficiles, disons que nous voulons également regrouper les événements de clics qui se produisent peu de temps après. De cette façon, nous pourrions facilement découvrir des clics doubles, sortingples ou décuplés. De plus, nous voulons filtrer tous les clics simples. Respirez profondément et imaginez comment vous résoudriez ce problème de manière impérative. Je parie que personne ne serait capable de mettre en œuvre une solution qui fonctionne correctement dès le premier essai. De manière réactive, ce problème est sortingvial à résoudre. En fait, la solution est tellement simple et directe à implémenter que nous pouvons même l’exprimer dans un diagramme qui décrit directement le comportement du code:

La logique de l'exemple de flux de clics

Image de l’introduction à la programmation réactive que vous avez manquée .

Les cases grises sont des fonctions qui décrivent comment un stream est transformé en un autre. Avec la fonction d’ throttle , nous accumulons les clics dans les 250 millisecondes, les fonctions de la map et du filter doivent être explicites. Les orbes de couleurs représentent un événement et les flèches décrivent comment elles circulent dans nos fonctions. Plus tard dans les étapes de traitement, nous obtenons de moins en moins d’éléments qui circulent dans notre stream, puisque nous les regroupons et que nous les filtrons. Le code de cette image ressemblerait à ceci:

 val multiClickStream = clickStream .throttle(250.millis) .map(clickEvents => clickEvents.length) .filter(numberOfClicks => numberOfClicks >= 2) 

La logique entière peut être représentée dans seulement quatre lignes de code! En Scala, on pourrait écrire encore plus court:

 val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2) 

La définition de clickStream est un peu plus complexe, mais ce n’est le cas que parce que le programme exemple s’exécute sur la JVM, où la capture des événements de clic n’est pas facile. Une autre complication est que Akka par défaut ne fournit pas la fonction d’ throttle . Au lieu de cela, nous avons dû l’écrire nous-mêmes. Étant donné que cette fonction est (comme c’est le cas pour les fonctions de map ou de filter ) réutilisable dans différents cas d’utilisation, je ne compte pas ces lignes sur le nombre de lignes nécessaires pour implémenter la logique. Dans les langages impératifs, cependant, il est normal que la logique ne puisse pas être réutilisée facilement et que les différentes étapes logiques se produisent au même endroit au lieu d’être appliquées séquentiellement, ce qui signifie que nous aurions probablement déformé notre code avec la logique de limitation. L’exemple de code complet est disponible sous forme de liste et ne sera plus discuté ici.

Exemple SimpleWebServer

Ce qui devrait être discuté à la place est un autre exemple. Bien que le stream de clics soit un bon exemple pour laisser Akka Streams prendre en charge un exemple concret, il lui manque le pouvoir d’exécuter en parallèle une exécution en parallèle. L’exemple suivant représente un petit serveur Web capable de gérer plusieurs requêtes en parallèle. Le serveur Web doit pouvoir accepter les connexions entrantes et recevoir des séquences d’octets représentant des signes ASCII imprimables. Ces séquences d’octets ou chaînes de caractères doivent être divisées en caractères plus petits sur tous les caractères de nouvelle ligne. Après cela, le serveur doit répondre au client avec chacune des lignes fractionnées. Alternativement, il pourrait faire autre chose avec les lignes et donner un jeton de réponse spécial, mais nous voulons restr simple dans cet exemple et donc ne pas introduire de fonctionnalités sophistiquées. Rappelez-vous que le serveur doit être capable de gérer plusieurs requêtes en même temps, ce qui signifie qu’aucune requête ne peut bloquer une autre demande d’exécution ultérieure. Résoudre toutes ces exigences peut être difficile d’une manière impérative – avec Akka Streams, cependant, nous ne devrions pas avoir besoin de plus de quelques lignes pour résoudre ces problèmes. Tout d’abord, voyons le serveur lui-même:

serveur

Fondamentalement, il n’y a que trois blocs de construction principaux. Le premier doit accepter les connexions entrantes. Le second doit gérer les demandes entrantes et le troisième doit envoyer une réponse. La mise en œuvre de tous ces trois éléments est un peu plus compliquée que la mise en œuvre du stream de clics:

 def mkServer(address: Ssortingng, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = { import system.dispatcher val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] = Sink.foreach[Tcp.IncomingConnection] { conn => println(s"Incoming connection from: ${conn.remoteAddress}") conn.handleWith(serverLogic) } val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = Tcp().bind(address, port) val binding: Future[Tcp.ServerBinding] = incomingCnnections.to(connectionHandler).run() binding onComplete { case Success(b) => println(s"Server started, listening on: ${b.localAddress}") case Failure(e) => println(s"Server could not be bound to $address:$port: ${e.getMessage}") } } 

La fonction mkServer prend (outre l’adresse et le port du serveur) également un système d’acteur et un matérialiseur comme parameters implicites. Le stream de contrôle du serveur est représenté par une binding , qui prend une source de connexions entrantes et les transmet à un récepteur de connexions entrantes. A l’intérieur de connectionHandler , qui est notre récepteur, nous gérons chaque connexion par le serverLogic , qui sera décrit plus tard. binding renvoie un Future , qui se termine lorsque le serveur a été démarré ou le démarrage a échoué, ce qui peut être le cas lorsque le port est déjà pris par un autre processus. Cependant, le code ne reflète pas complètement le graphique car nous ne pouvons pas voir un bloc de construction qui gère les réponses. La raison en est que la connexion fournit déjà cette logique par elle-même. C’est un stream bidirectionnel et pas seulement unidirectionnel comme les stream que nous avons vus dans les exemples précédents. Comme ce fut le cas pour la matérialisation, ces stream complexes ne seront pas expliqués ici. La documentation officielle contient beaucoup de matériel pour couvrir des graphiques de stream plus complexes. Pour l’instant, il suffit de savoir que Tcp.IncomingConnection représente une connexion qui sait comment recevoir des requêtes et comment envoyer des réponses. La partie qui manque toujours est le bloc de construction serverLogic . Cela peut ressembler à ceci:

la logique du serveur

Encore une fois, nous sums en mesure de diviser la logique en plusieurs blocs de construction simples qui forment le stream de notre programme. Nous voulons d’abord diviser notre séquence d’octets en lignes, ce que nous devons faire chaque fois que nous trouvons un caractère de nouvelle ligne. Après cela, les octets de chaque ligne doivent être convertis en chaîne, car travailler avec des octets bruts est lourd. Dans l’ensemble, nous pourrions recevoir un stream binary d’un protocole compliqué, ce qui rendrait le travail avec les données brutes entrantes extrêmement difficile. Une fois que nous avons une chaîne lisible, nous pouvons créer une réponse. Pour des raisons de simplicité, la réponse peut être quelque chose dans notre cas. En fin de compte, nous devons reconvertir notre réponse en une séquence d’octets pouvant être envoyée sur le réseau. Le code de la logique entière peut ressembler à ceci:

 val serverLogic: Flow[ByteSsortingng, ByteSsortingng, Unit] = { val delimiter = Framing.delimiter( ByteSsortingng("\n"), maximumFrameLength = 256, allowTruncation = true) val receiver = Flow[ByteSsortingng].map { bytes => val message = bytes.utf8Ssortingng println(s"Server received: $message") message } val responder = Flow[Ssortingng].map { message => val answer = s"Server hereby responds to message: $message\n" ByteSsortingng(answer) } Flow[ByteSsortingng] .via(delimiter) .via(receiver) .via(responder) } 

Nous soaps déjà que serverLogic est un stream qui prend un ByteSsortingng et doit produire un ByteSsortingng . Avec delimiter nous pouvons diviser un ByteSsortingng en parties plus petites – dans notre cas, cela doit se produire chaque fois qu’un caractère de nouvelle ligne se produit. receiver est le stream qui prend toutes les séquences d’octets divisés et les convertit en chaîne. C’est bien sûr une conversion dangereuse, car seuls les caractères ASCII imprimables doivent être convertis en chaîne, mais pour nos besoins, cela est suffisant. responder est le dernier composant et est responsable de la création d’une réponse et de la conversion de la réponse en une séquence d’octets. Contrairement au graphique, nous n’avons pas divisé ce dernier composant en deux, car la logique est sortingviale. À la fin, nous connectons tous les stream à travers la fonction via . À ce stade, on peut se demander si nous avons pris en charge la propriété multi-utilisateurs mentionnée au début. Et en effet nous l’avons fait même si cela peut ne pas être évident immédiatement. En regardant ce graphique, il devrait être plus clair:

la logique serveur et serveur combinée

Le composant serverLogic n’est rien d’autre qu’un stream contenant des stream plus petits. Ce composant prend une entrée, qui est une requête, et produit une sortie, qui est la réponse. Puisque les stream peuvent être construits plusieurs fois et qu’ils fonctionnent tous indépendamment les uns des autres, nous obtenons grâce à cette imbrication notre propriété multi-utilisateurs. Chaque requête est gérée dans sa propre requête et, par conséquent, une demande en cours d’exécution peut dépasser une longue demande en cours d’exécution. Au cas où vous vous serverLogic question, la définition de serverLogic présentée précédemment peut bien sûr être beaucoup plus courte en insérant la plupart de ses définitions internes:

 val serverLogic = Flow[ByteSsortingng] .via(Framing.delimiter( ByteSsortingng("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8Ssortingng) .map(msg => s"Server hereby responds to message: $msg\n") .map(ByteSsortingng(_)) 

Un test du serveur Web peut ressembler à ceci:

 $ # Client $ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666 Server hereby responds to message: Hello World Server hereby responds to message: How are you? 

Pour que l’exemple de code ci-dessus fonctionne correctement, nous devons d’abord démarrer le serveur, qui est représenté par le script startServer :

 $ # Server $ ./startServer 127.0.0.1 6666 [DEBUG] Server started, listening on: /127.0.0.1:6666 [DEBUG] Incoming connection from: /127.0.0.1:37972 [DEBUG] Server received: Hello World [DEBUG] Server received: How are you? 

L’exemple de code complet de ce serveur TCP simple peut être trouvé ici . Nous ne pouvons pas seulement écrire un serveur avec Akka Streams, mais aussi le client. Cela peut ressembler à ceci:

 val connection = Tcp().outgoingConnection(address, port) val flow = Flow[ByteSsortingng] .via(Framing.delimiter( ByteSsortingng("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8Ssortingng) .map(println) .map(_ ⇒ StdIn.readLine("> ")) .map(_+"\n") .map(ByteSsortingng(_)) connection.join(flow).run() 

Le client TCP complet peut être trouvé ici . Le code est assez similaire mais contrairement au serveur, nous n’avons plus besoin de gérer les connexions entrantes.

Graphes complexes

Dans les sections précédentes, nous avons vu comment construire des programmes simples en dehors des stream. Cependant, en réalité, il ne suffit souvent pas de compter sur des fonctions déjà intégrées pour créer des stream plus complexes. Si nous voulons pouvoir utiliser Akka Streams pour des programmes arbitraires, nous devons savoir comment construire nos propres structures de contrôle personnalisées et stream combinables, ce qui nous permet de gérer la complexité de nos applications. La bonne nouvelle est que Akka Streams a été conçu pour s’adapter aux besoins des utilisateurs. Afin de vous présenter brièvement les parties les plus complexes d’Akka Streams, nous ajoutons quelques fonctionnalités supplémentaires à notre exemple client / serveur.

Une chose que nous ne pouvons pas encore faire est de fermer une connexion. À ce stade, il commence à se compliquer un peu car l’API de stream que nous avons vue jusqu’à présent ne nous permet pas d’arrêter un stream à un point arbitraire. Cependant, il existe l’abstraction GraphStage , qui peut être utilisée pour créer des étapes de traitement de graphe arbitraires avec un nombre quelconque de ports d’entrée ou de sortie. Examinons d’abord le côté serveur, où nous introduisons un nouveau composant, appelé closeConnection :

 val closeConnection = new GraphStage[FlowShape[Ssortingng, Ssortingng]] { val in = Inlet[Ssortingng]("closeConnection.in") val out = Outlet[Ssortingng]("closeConnection.out") override val shape = FlowShape(in, out) override def createLogic(inheritedAtsortingbutes: Atsortingbutes) = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush() = grab(in) match { case "q" ⇒ push(out, "BYE") completeStage() case msg ⇒ push(out, s"Server hereby responds to message: $msg\n") } }) setHandler(out, new OutHandler { override def onPull() = pull(in) }) } } 

Cette API est beaucoup plus lourde que l’API de stream. Pas étonnant, nous devons faire beaucoup d’étapes impératives ici. En échange, nous avons plus de contrôle sur le comportement de nos stream. Dans l’exemple ci-dessus, nous spécifions uniquement une entrée et un port de sortie et les mettons à la disposition du système en remplaçant la valeur de la shape . De plus, nous avons défini InHandler et OutHandler , qui sont dans cet ordre responsables de la réception et de l’émission d’éléments. Si vous avez examiné attentivement l’exemple de stream de clics complet, vous devez déjà reconnaître ces composants. Dans InHandler nous InHandler un élément et si c’est une chaîne avec un seul caractère 'q' , nous voulons fermer le stream. Afin de donner au client une chance de découvrir que le stream sera bientôt fermé, nous émettons la chaîne "BYE" et ensuite nous fermons immédiatement l’étape suivante. Le composant closeConnection peut être combiné avec un stream via la méthode via , qui a été introduite dans la section sur les stream.

En plus de pouvoir fermer des connexions, il serait également intéressant de pouvoir afficher un message de bienvenue sur une nouvelle connexion. Pour ce faire, nous devons encore aller un peu plus loin:

 def serverLogic (conn: Tcp.IncomingConnection) (implicit system: ActorSystem) : Flow[ByteSsortingng, ByteSsortingng, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val welcome = Source.single(ByteSsortingng(s"Welcome port ${conn.remoteAddress}!\n")) val logic = b.add(internalLogic) val concat = b.add(Concat[ByteSsortingng]()) welcome ~> concat.in(0) logic.outlet ~> concat.in(1) FlowShape(logic.in, concat.out) }) 

La fonction serverLogic prend maintenant la connexion entrante comme paramètre. À l’intérieur de son corps, nous utilisons un DSL qui nous permet de décrire un comportement de stream complexe. Avec welcome nous créons un stream qui ne peut émettre qu’un seul élément – le message de bienvenue. logic est ce qui a été décrit comme serverLogic dans la section précédente. La seule différence notable est que nous y avons ajouté closeConnection . Vient maintenant la partie intéressante de la DSL. La fonction GraphDSL.create rend un générateur b disponible, qui est utilisé pour exprimer le stream sous forme de graphique. Avec la fonction ~> , il est possible de connecter les ports d’entrée et de sortie les uns aux autres. Le composant Concat utilisé dans l’exemple peut concaténer des éléments et est utilisé ici pour append le message de bienvenue devant les autres éléments issus de internalLogic . Dans la dernière ligne, nous ne mettons à disposition que le port d’entrée de la logique du serveur et le port de sortie du stream concaténé, car tous les autres ports doivent restr un détail d’ serverLogic composant serverLogic . Pour une introduction détaillée au graphe DSL d’Akka Streams, consultez la section correspondante dans la documentation officielle . L’exemple de code complet du serveur TCP complexe et d’un client pouvant communiquer avec lui peut être trouvé ici . Chaque fois que vous ouvrez une nouvelle connexion à partir du client, vous devriez voir un message de bienvenue et en tapant "q" sur le client, vous devriez voir un message vous indiquant que la connexion a été annulée.

Il y a encore des sujets qui n’ont pas été couverts par cette réponse. En particulier, la matérialisation peut effrayer un lecteur ou un autre, mais je suis sûr que, avec le matériel couvert ici, tout le monde devrait pouvoir aller de l’avant seul. Comme déjà dit, la documentation officielle est un bon endroit pour continuer à apprendre sur Akka Streams.