takeWhile () fonctionne différemment avec flatmap

Je crée des extraits avec du temps pour explorer ses possibilités. Lorsqu’il est utilisé avec flatMap, le comportement n’est pas conforme aux attentes. Veuillez trouver l’extrait de code ci-dessous.

Ssortingng[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}}; Arrays.stream(strArray) .flatMap(indStream -> Arrays.stream(indStream)) .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")) .forEach(ele -> System.out.println(ele)); 

Sortie réelle:

 Sample1 Sample2 Sample3 Sample5 

Production attendue:

 Sample1 Sample2 Sample3 

La raison de cette attente est que takeWhile devrait s’exécuter jusqu’au moment où la condition à l’intérieur devient vraie. J’ai également ajouté des instructions imprimées à l’intérieur de la carte pour le débogage. Les stream sont renvoyés deux fois seulement, ce qui correspond à l’attente.

Cependant, cela fonctionne très bien sans flatmap dans la chaîne.

 Ssortingng[] strArraySingle = {"Sample3", "Sample4", "Sample5"}; Arrays.stream(strArraySingle) .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")) .forEach(ele -> System.out.println(ele)); 

Sortie réelle:

 Sample3 

Ici, la sortie réelle correspond à la sortie attendue.

Clause de non-responsabilité: ces extraits de code ne sont utilisés que pour la pratique du code et ne servent aucun cas d’utilisation valide.

Mise à jour: Bug JDK-8193856 : le correctif sera disponible dans le cadre du JDK 10. Le changement consistera à corriger whileOps Sink :: accept

 @Override public void accept(T t) { if (take = predicate.test(t)) { downstream.accept(t); } } 

Implantation modifiée:

 @Override public void accept(T t) { if (take && (take = predicate.test(t))) { downstream.accept(t); } } 

Ceci est un bogue dans JDK 9 – à partir du numéro # 8193856 :

takeWhile suppose à tort qu’une opération en amont prend en charge et honore l’annulation, ce qui n’est malheureusement pas le cas pour flatMap .

Explication

Si le stream est commandé, takeWhile doit afficher le comportement attendu. Ce n’est pas tout à fait le cas dans votre code car vous utilisez forEach , qui forEach commande. Si vous vous en souciez, ce que vous faites dans cet exemple, vous devriez plutôt utiliser forEachOrdered . Chose amusante: ça ne change rien. 🤔

Alors peut-être que le stream n’est pas commandé en premier lieu? (Dans ce cas, le comportement est correct .) Si vous créez une variable temporaire pour le stream créé à partir de strArray et vérifiez si elle est ordonnée en exécutant l’expression ((StatefulOp) stream).isOrdered(); au point d’arrêt, vous constaterez qu’il est en effet commandé:

 Ssortingng[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}}; Stream stream = Arrays.stream(strArray) .flatMap(indStream -> Arrays.stream(indStream)) .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")); // breakpoint here System.out.println(stream); 

Cela signifie qu’il s’agit très probablement d’une erreur de mise en œuvre.

Dans le code

Comme d’autres l’ont suspecté, je pense maintenant que cela pourrait être lié au fait que flatMap soit impatient. Plus précisément, les deux problèmes peuvent avoir la même cause.

En regardant la source de WhileOps , nous pouvons voir ces méthodes:

 @Override public void accept(T t) { if (take = predicate.test(t)) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } 

Ce code est utilisé par takeWhile pour vérifier pour un élément de stream donné t si le predicate est rempli:

  • Si c’est le cas, il transmet l’élément à l’opération en downstream , dans ce cas System.out::println .
  • Si ce n’est pas le cas, il définit la valeur sur false, de sorte que si l’on demande la prochaine fois si le pipeline doit être annulé (c’est-à-dire si c’est fait), il renvoie true .

Cela couvre l’opération takeWhile . L’autre chose que vous devez savoir est que forEachOrdered mène à l’opération de terminal exécutant la méthode ReferencePipeline::forEachWithCancel :

 @Override final boolean forEachWithCancel(Spliterator spliterator, Sink sink) { boolean cancelled; do { } while ( !(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)); return cancelled; } 

Tout ce que fait c’est:

  1. vérifier si le pipeline a été annulé
  2. sinon, avancer le lavabo d’un élément
  3. arrêter si c’était le dernier élément

Semble prometteur, non?

Sans flatMap

Dans le “bon cas” (sans flatMap ; votre deuxième exemple), forEachWithCancel opère directement sur le WhileOp tant que WhileOp et vous pouvez voir comment cela se passe:

  • ReferencePipeline::forEachWithCancel fait sa boucle:
    • WhileOps::accept reçoit chaque élément de stream
    • WhileOps::cancellationRequested est interrogé après chaque élément
  • à un moment donné, "Sample4" échoue au prédicat et le stream est annulé

Yay!

Avec flatMap

Dans le “cas flatMap ” (avec flatMap ; votre premier exemple), forEachWithCancel opère sur l’opération flatMap , cependant, qui appelle simplement forEachRemaining sur ArraySpliterator pour {"Sample3", "Sample4", "Sample5"} , ce qui le fait :

 if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { do { action.accept((T)a[i]); } while (++i < hi); } 

Ignorant tout ce qui concerne les objects hi and- fence , qui n'est utilisé que si le traitement de tableau est divisé pour un stream parallèle, il s'agit d'une simple boucle qui transmet chaque élément à l'opération takeWhile , mais ne vérifie jamais s'il est annulé . Il va donc parcourir tous les éléments de ce "sous-stream" avant de s'arrêter, probablement même dans le rest du stream .

C’est un bug, peu importe comment je le regarde – et merci Holger pour vos commentaires. Je n’ai pas voulu mettre cette réponse ici (sérieusement!), Mais aucune des réponses n’indique clairement qu’il s’agit d’un bug.

Les gens disent que cela doit être ordonné / non ordonné, et ce n’est pas vrai car cela rapportera 3 fois true :

 Stream s1 = Arrays.stream(strArray); System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED)); Stream s2 = Arrays.stream(strArray) .flatMap(indStream -> Arrays.stream(indStream)); System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED)); Stream s3 = Arrays.stream(strArray) .flatMap(indStream -> Arrays.stream(indStream)) .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")); System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED)); 

C’est très intéressant aussi que si vous le changez pour:

 Ssortingng[][] strArray = { { "Sample1", "Sample2" }, { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here { "Sample7", "Sample8" } }; 

alors Sample7 et Sample8 ne feront pas partie de la sortie, sinon ils le feront. Il semble que flatmap ignore un drapeau d’annulation qui serait introduit par dropWhile .

Si vous regardez la documentation pour takeWhile :

Si ce stream est ordonné, [renvoie] un stream constitué du préfixe le plus long des éléments extraits de ce stream et correspondant au prédicat donné.

Si ce stream n’est pas ordonné, [renvoie] un stream constitué d’un sous-ensemble d’éléments extraits de ce stream et correspondant au prédicat donné.

Votre stream est commandé par coïncidence, mais takeWhile ne le sait pas. En tant que tel, il renvoie la 2ème condition – le sous-ensemble. Votre takeWhile agit simplement comme un filter .

Si vous ajoutez un appel à sorted avant takeWhile , vous verrez le résultat attendu:

 Arrays.stream(strArray) .flatMap(indStream -> Arrays.stream(indStream)) .sorted() .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")) .forEach(ele -> System.out.println(ele)); 

La raison en est que l’opération flatMap également une opération intermédiaire avec laquelle (l’une des) l’ opération intermédiaire de mise en court-circuit avec takeWhile est utilisée est utilisée.

Le comportement de flatMap comme le flatMap Holger dans cette réponse est certainement une référence à ne pas manquer pour comprendre la sortie inattendue de telles opérations de court-circuit.

Votre résultat attendu peut être obtenu en divisant ces deux opérations intermédiaires en introduisant une opération de terminal pour utiliser de manière déterministe un stream ordonné supplémentaire et en les exécutant pour un échantillon en tant que:

 List sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList()); sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4")) .forEach(System.out::println); 

En outre, il semble y avoir un bogue associé JDK-8075939 pour suivre ce comportement déjà enregistré.

Edit : Ceci peut être suivi plus loin à JDK-8193856 accepté comme un bogue.