Puis-je dupliquer un Stream en Java 8?

Parfois, je souhaite effectuer un ensemble d’opérations sur un stream, puis traiter le stream résultant de deux manières différentes avec d’autres opérations.

Puis-je le faire sans avoir à spécifier deux fois les opérations initiales communes?

Par exemple, j’espère qu’une méthode dup() telle que celle-ci existe:

 Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup(); Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14 Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10 

Ce n’est pas possible en général.

Si vous souhaitez dupliquer un stream d’entrée ou un iterator d’entrée, vous avez deux options:

A. Gardez tout dans une collection, dites une List<>

Supposons que vous dupliquiez un stream en deux stream s1 et s2 . Si vous avez avancé des éléments n1 dans les éléments s1 et n2 avec s2 , vous devez conserver |n2 - n1| éléments en mémoire, juste pour suivre le rythme. Si votre stream est infini, il ne peut y avoir aucune limite supérieure pour le stockage requirejs.

Jetez un oeil au tee() de Python tee() pour voir ce qu’il faut:

Cet outil peut nécessiter un stockage auxiliaire important (en fonction de la quantité de données temporaires à stocker). En général, si un iterator utilise la plupart ou la totalité des données avant qu’un autre iterator ne démarre, il est plus rapide d’utiliser list() au lieu de tee() .

B. Dans la mesure du possible: copiez l’état du générateur qui crée les éléments

Pour que cette option fonctionne, vous aurez probablement besoin d’accéder au fonctionnement interne du stream. En d’autres termes, le générateur – la partie qui crée les éléments – devrait prendre en charge la copie en premier lieu. [OP: Voir cette excellente réponse , comme exemple de la façon dont cela peut être fait pour l’exemple de la question]

Cela ne marchera pas avec l’entrée de l’utilisateur, car vous devrez copier l’état de l’ensemble du “monde extérieur”. Java Stream ne prend pas en charge la copie, car il est conçu pour être aussi général que possible, en particulier pour les fichiers, le réseau, le clavier, les capteurs, le hasard, etc. [OP: un stream qui lit un capteur de température à la demande. Il ne peut pas être dupliqué sans stocker une copie des lectures]

Ce n’est pas seulement le cas en Java; c’est une règle générale. Vous pouvez voir que std::istream en C ++ ne supporte que la sémantique de déplacement, pas la sémantique de copie (“copy constructeur (supprimé)”), pour cette raison (et d’autres).

Il n’est pas possible de dupliquer un stream de cette manière. Cependant, vous pouvez éviter la duplication de code en déplaçant le composant commun dans une méthode ou une expression lambda.

 Supplier supplier = () -> IntStream.range(1, 100).filter(n -> n % 2 == 0); supplier.get().filter(...); supplier.get().filter(...); 

Il est possible si vous mettez en mémoire tampon des éléments que vous avez consommés en une seule fois, mais pas encore dans l’autre.

Nous avons implémenté une méthode duplicate() pour les stream dans jOOλ , une bibliothèque Open Source créée pour améliorer les tests d’intégration de jOOQ . Essentiellement, vous pouvez simplement écrire:

 Tuple2, Seq> desired_streams = Seq.seq( IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed() ).duplicate(); 

(note: nous avons actuellement besoin de cocher le stream, car nous n’avons pas encore implémenté d’ IntSeq )

En interne, il existe un tampon LinkedList stockant toutes les valeurs consommées par un stream mais pas par l’autre. C’est probablement aussi efficace que si vos deux stream sont consommés au même rythme.

Voici comment fonctionne l’algorithme:

 static  Tuple2, Seq> duplicate(Stream stream) { final LinkedList gap = new LinkedList<>(); final Iterator it = stream.iterator(); @SuppressWarnings("unchecked") final Iterator[] ahead = new Iterator[] { null }; class Duplicate implements Iterator { @Override public boolean hasNext() { if (ahead[0] == null || ahead[0] == this) return it.hasNext(); return !gap.isEmpty(); } @Override public T next() { if (ahead[0] == null) ahead[0] = this; if (ahead[0] == this) { T value = it.next(); gap.offer(value); return value; } return gap.poll(); } } return tuple(seq(new Duplicate()), seq(new Duplicate())); } 

Plus de code source ici

En fait, en utilisant jOOλ , vous pourrez écrire un one-liner complet comme ceci:

 Tuple2, Seq> desired_streams = Seq.seq( IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed() ).duplicate() .map1(s -> s.filter(n -> n % 7 == 0)) .map2(s -> s.filter(n -> n % 5 == 0)); // This will yield 14, 28, 42, 56... desired_streams.v1.forEach(System.out::println) // This will yield 10, 20, 30, 40... desired_streams.v2.forEach(System.out::println); 

Non plus,

  • Déplace l’initialisation dans une méthode et appelle simplement la méthode à nouveau

Cela a l’avantage d’être explicite sur ce que vous faites et fonctionne également pour des stream infinis.

  • Recueillir le stream puis le re-diffuser

Dans votre exemple:

 final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray(); 

alors

 final IntStream s = IntStream.of(arr); 

Mise à jour: Cela ne fonctionne pas . Voir l’explication ci-dessous, après le texte de la réponse originale.

Comme c’est idiot de ma part Tout ce que j’ai besoin de faire c’est:

 Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0); Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14 Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10 

Explication pourquoi cela ne fonctionne pas:

Si vous le codez et que vous essayez de collecter les deux stream, le premier collectera correctement, mais en essayant de diffuser le second, il y aura une exception: java.lang.IllegalStateException: stream has already been operated upon or closed .

Pour élaborer, les stream sont des objects avec état (qui ne peuvent pas être réinitialisés ou rembobinés). Vous pouvez les considérer comme des iterators, qui à leur tour sont comme des pointeurs. Ainsi, stream14 et stream10 peuvent être considérés comme des références au même pointeur. Consumr le premier stream tout le chemin fera passer le pointeur “après la fin”. Essayer de consumr le deuxième stream, c’est comme essayer d’accéder à un pointeur déjà “passé”, ce qui est naturellement une opération illégale.

Comme le montre la réponse acceptée, le code pour créer le stream doit être exécuté deux fois mais il peut être compartimenté en un Supplier Lambda ou une construction similaire.

Code de test complet: enregistrer dans Foo.java , puis javac Foo.java , puis java Foo

 import java.util.stream.IntStream; public class Foo { public static void main (Ssortingng [] args) { IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0); IntStream s1 = s.filter(n -> n % 5 == 0); s1.forEach(n -> System.out.println(n)); IntStream s2 = s.filter(n -> n % 7 == 0); s2.forEach(n -> System.out.println(n)); } } 

Sortie:

 $ javac Foo.java $ java Foo 0 10 20 30 40 50 60 70 80 90 Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.(AbstractPipeline.java:203) at java.util.stream.IntPipeline.(IntPipeline.java:91) at java.util.stream.IntPipeline$StatelessOp.(IntPipeline.java:592) at java.util.stream.IntPipeline$9.(IntPipeline.java:332) at java.util.stream.IntPipeline.filter(IntPipeline.java:331) at Foo.main(Foo.java:8) 

Vous pouvez également déplacer la génération de stream dans une méthode / fonction distincte qui renvoie ce stream et l’appelle deux fois.