Thursday, March 15, 2018

Streaming and Batching in Java

If you work with external API a lot, you probably have encountered API that supports batch operations.  Instead of making a separate API request for a single record, you can operate on multiple records simultaneously with one batch request.  Computationally, batching helps speed up querying, reduces network overhead and places less load on the remote server.

Batching occurs commonly in the real world in streamlining processes or meeting process constraints.  For example, your favorite pizza chains batches multiple orders into a single dispatch to save delivery costs and time.  An amusement park ride processes guests who are waiting in queue in batches with the batch size being the ride capacity.  The list goes on.

Batching is such a pervasive operation that you will likely encounter the need to write code implementing or simulating the concept in your software development career.  This was the case for me in a recent project.  The requirements involve chunking record IDs and submitting each chunk to an external API for fast look up in Java.  The records IDs are generated in real-time one at a time so the total number of records is not known in advance.

Thankfully, with the addition of streams and lambda expressions in Java 8, the task can't be any simpler.  Without further ado, here's the code that does the trick.


import java.util.List;
import java.util.ArrayList;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.function.Consumer;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
 * Read an input file containing one ID per line and process a batch of IDs at once.
 */
public class BatchProcessor {
        public static class BatchSpliterator implements Spliterator<List<String>> {
                private final Spliterator<String> source;
                private final int batchSize;
                private List<String> batch;

                BatchSpliterator(Spliterator<String> lineSpliterator, int batchSize) {
                        this.source = lineSpliterator;
                        this.batchSize = batchSize;
                        this.batch = new ArrayList<>();
                }

                public boolean tryAdvance(Consumer<? super List<String>> action) {
                        boolean success = this.source.tryAdvance(this.batch::add);
                        // flush if batch is filled or input data is exhausted
                        if (success && this.batch.size() == this.batchSize || !success && !this.batch.isEmpty()) {
                                action.accept(this.batch);
                                this.batch = new ArrayList<>();
                                return true;
                        }
                        return success;
                }

                public Spliterator<List<String>> trySplit() {
                        return null;
                }

                public long estimateSize() {
                        return Long.MAX_VALUE;
                }

                public int characteristics() {
                        return ORDERED|NONNULL;
                }

                /**
                 * Divide an input stream of IDs into batches.
                 */
                public static Stream<List<String>> batch(Stream<String> lines, int batchSize, boolean parallel) {
                        return StreamSupport.stream(
                                new BatchSpliterator(lines.spliterator(), batchSize),
                                parallel);
                }

                /**
                 * Process a batch of IDs in one call.
                 */
                public static List<String> processBatch(List<String> batch) {
                        List<String> newBatch = new ArrayList<>();
                        for (String str : batch) {
                                newBatch.add(str + "!");
                        }
                        return newBatch;
                }
        }

        public static void main(String[] args) throws IOException {
                // parse command line arguments
                String fileName = args[0];
                Integer batchSize = Integer.parseInt(args[1]);

                // stream input file
                BufferedReader br = Files.newBufferedReader(Paths.get(fileName));
                Stream<String> stream = br.lines();
                BatchSpliterator.batch(stream, batchSize, true).map(BatchSpliterator::processBatch).forEach(System.out::print);
        }
}


The solution implements the java.util.Spliterator interface and a constructor that accepts a batch size and an incoming source stream consisting of strings.  The key interface methods are:

tryAdvance() - Add the next item from source stream into a batch.  If the batch is full, then pass the batch it to the given Consumer action and initialize a new batch.

trySplit() - Return null because the source stream size is not unknown in advance, therefore, it cannot be split.  Unfortunately, this also means batching cannot be done in parallel.

estimateSize() - Return Long.MAX_VALUE because the source stream size is not unknown in advance.

characteristics() - Return the properties of the Spliterator which in this case involve iterating over an ordered sequence of non-null values.


The other two static methods in the class are our own helper methods.


batch() - Divide an input stream into batches of given size.

processBatch() - Process a batch of strings by appending an exclamation mark to each string.


In the main method, an input file specified in the command line arguments is used as our source stream.  The long chained line of code chunks the input stream of strings, calls processBatch() on each batch, and then dumps each batch out to screen.

BatchSpliterator.batch(stream, batchSize, true).map(BatchSpliterator::processBatch).forEach(System.out::print);

As an example, given an input file with one number per line starting from number 1, the output of the program is:


[1!, 2!, 3!, 4!, 5!, 6!, 7!, 8!, 9!, 10!][11!, 12!, 13!, 14!, 15!, 16!, 17!, 18!, 19!, 20!][21!, 22!, 23!, 24!, 25!, 26!, 27!, 28!, 29!, 30!][31!, 32!, 33!, 34!, 35!, 36!, 37!, 38!, 39!, 40!][41!, 42!, 43!, 44!, 45!, 46!, 47!, 48!, 49!, 50!][51!, 52!, 53!, 54!, 55!, 56!, 57!, 58!, 59!, 60!][16!, 62!, 63!, 64!, 65!, 66!, 67!, 68!, 69!, 70!][71!, 72!, 73!, 74!, 75!, 76!, 77!, 78!, 79!, 80!][81!, 82!, 83!, 84!, 85!, 86!, 87!, 88!, 89!, 90!][91!, 92!, 93!, 94!, 95!, 96!, 97!, 98!, 99!, 100!]
        

From the output, we see that the input numbers are batched in groups of 10 as expected and each number is appended with an exclamation mark as coded in the processBatch() method.  To make use of this code, simply write your own business logic in the method.  Of course, you would also need to change the input stream source according to your use case.


No comments:

Post a Comment