Java 8 Stream avec traitement par lots

J’ai un gros fichier qui contient une liste d’éléments.

Je voudrais créer un lot d’éléments, faire une requête HTTP avec ce lot (tous les éléments sont nécessaires en tant que parameters dans la requête HTTP). Je peux le faire très facilement avec une boucle for , mais en tant qu’amoureux de Java 8, je veux essayer d’écrire ceci avec le framework Stream de Java 8 (et profiter des avantages du traitement différé).

Exemple:

 List batch = new ArrayList(BATCH_SIZE); for (int i = 0; i  0) process(batch); 

Je veux faire quelque chose de long la ligne de lazyFileStream.group(500).map(processBatch).collect(toList())

Quelle serait la meilleure façon de faire cela?

    Vous pouvez le faire avec jOOλ , une bibliothèque qui étend les stream Java 8 pour les cas d’utilisation de stream séquentiels à thread unique:

     Seq.seq(lazyFileStream) // Seq .zipWithIndex() // Seq> .groupBy(tuple -> tuple.v2 / 500) // Map> .forEach((index, batch) -> { process(batch); }); 

    Dans les coulisses, zipWithIndex() est juste:

     static  Seq> zipWithIndex(Stream stream) { final Iterator it = stream.iterator(); class ZipWithIndex implements Iterator> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2 next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); } 

    … alors que groupBy() est pratique pour l’API pour:

     default  Map> groupBy(Function classifier) { return collect(Collectors.groupingBy(classifier)); } 

    (Disclaimer: Je travaille pour l’entreprise derrière jOOλ)

    Pour être complet, voici une solution Guava .

     Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process); 

    Dans la question, la collection est disponible, donc un stream n’est pas nécessaire et peut être écrit comme suit:

     Iterables.partition(data, batchSize).forEach(this::process); 

    Une implémentation Java-8 pure est également possible:

     int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch)); 

    Notez que contrairement à JOOl, il peut fonctionner en parallèle (à condition que vos data soient une liste d’access aléatoire).

    Pure Java 8 solution :

    Nous pouvons créer un collecteur personnalisé pour le faire avec élégance, qui prend une batch size et un Consumer pour traiter chaque lot:

     import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param  Type of the elements being collected */ class BatchCollector implements Collector, List> { private final int batchSize; private final Consumer> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier> supplier() { return ArrayList::new; } public BiConsumer, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a ssortingct batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function, List> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set characteristics() { return Collections.emptySet(); } } 

    Si vous le souhaitez, créez une classe utilitaire d’aide:

     import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param  the type of elements being processed * @return a batch collector instance */ public static  Collector, List> batchCollector(int batchSize, Consumer> batchProcessor) { return new BatchCollector(batchSize, batchProcessor); } } 

    Exemple d’utilisation:

     List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List output = new ArrayList<>(); int batchSize = 3; Consumer> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); 

    J’ai aussi posté mon code sur GitHub, si quelqu’un veut regarder:

    Lien vers Github

    Vous pouvez également utiliser RxJava :

     Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch)); 

    ou

     Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList(); 

    ou

     Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList(); 

    J’ai écrit un spliterator personnalisé pour des scénarios comme celui-ci. Il remplira les listes d’une taille donnée à partir du stream d’entrée. L’avantage de cette approche est qu’elle effectuera un traitement différé et fonctionnera avec d’autres fonctions de stream.

     public static  Stream> batches(Stream stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator implements Spliterator> { private final Spliterator base; private final int batchSize; public BatchSpliterator(Spliterator base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer> action) { final List batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } } 

    Vous pouvez aussi regarder cyclops-react , je suis l’auteur de cette bibliothèque. Il implémente l’interface jOOλ (et par extension JDK 8 Streams), mais contrairement aux JDK 8 Parallel Streams, il se concentre sur les opérations asynchrones (telles que le blocage potentiel des appels d’E / S Async). JDK Parallel Streams, en revanche, se concentre sur le parallélisme des données pour les opérations liées au processeur. Il fonctionne en gérant des agrégats de tâches basées sur le futur, mais présente une API de stream étendue standard aux utilisateurs finaux.

    Cet exemple de code peut vous aider à démarrer

     LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run(); 

    Il y a un tutoriel sur le traitement par lots ici

    Et un tutoriel plus général ici

    Pour utiliser votre propre pool de threads (ce qui est probablement plus approprié pour bloquer les E / S), vous pouvez commencer à traiter avec

      LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run(); 

    Nous avons eu un problème similaire à résoudre. Nous voulions prendre un stream plus grand que la mémoire système (en parcourant tous les objects de la firebase database) et rendre l’ordre aussi aléatoire que possible – nous avons pensé pouvoir mettre en mémoire tampon 10 000 éléments et les rendre aléatoires.

    La cible était une fonction qui prenait un stream.

    Parmi les solutions proposées ici, il semble y avoir une gamme d’options:

    • Utiliser différentes bibliothèques non Java 8 supplémentaires
    • Commencez avec quelque chose qui n’est pas un stream – par exemple une liste d’access aléatoire
    • Avoir un stream qui peut être divisé facilement dans un spliterator

    Notre instinct était à l’origine d’utiliser un collecteur personnalisé, mais cela signifiait abandonner le streaming. La solution de collecteur personnalisée ci-dessus est très bonne et nous l’avons presque utilisée.

    Voici une solution qui sortingche en utilisant le fait que les Stream peuvent vous donner un Iterator que vous pouvez utiliser comme une trappe d’évasion pour vous permettre de faire quelque chose de plus que les stream ne supportent pas. L’ Iterator est reconverti en un stream utilisant un autre bit de sorcellerie Java 8 StreamSupport .

     /** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator implements Iterator> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param  type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static  Stream> batchedStreamOf(Stream originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static  Stream asStream(Iterator iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List currentBatch; private Iterator sourceIterator; public BatchingIterator(Iterator sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } } 

    Un exemple simple d'utilisation de ceci ressemblerait à ceci:

     @Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); } 

    Les impressions ci-dessus

     [A, B, C] [D, E, F] 

    Pour notre cas d'utilisation, nous voulions mélanger les lots et les conserver comme stream: cela ressemblait à ceci:

     @Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); } 

    Cela produit quelque chose comme (c'est aléatoire, tellement différent à chaque fois)

     A C B E D F 

    La recette secrète est qu’il ya toujours un stream, vous pouvez donc opérer sur un stream de lots, ou faire quelque chose pour chaque lot, puis le flatMap sur un stream. Mieux encore, toutes les expressions ci-dessus s'exécutent uniquement en tant que final forEach ou collect ou autres expressions terminées PULL les données via le stream.

    Il s'avère que l' iterator est un type particulier d' opération de terminaison sur un stream et ne provoque pas l'exécution du stream entier et sa mémoire! Merci aux gars de Java 8 pour un design shiny!

    Exemple simple utilisant Spliterator

      // read file into stream, try-with-resources try (Stream stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator split = stream.skip(1).spliterator(); Chunker chunker = new Chunker(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toSsortingng()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } } 

    La réponse de Bruce est plus complète, mais je cherchais quelque chose de rapide et sale pour traiter un tas de fichiers.

    Pure Java 8 exemple qui fonctionne également avec des stream parallèles.

    Comment utiliser:

     Stream integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch)); 

    La déclaration et la mise en œuvre de la méthode:

     public static  void processInBatch(Stream stream, int batchSize, Consumer> batchProcessor) { List newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }