Saturday, April 7, 2018

Extract, Transform and Load (ETL) in Java


Extract, Transform and Load (ETL) is a common operation in data processing.  A recent ETL task of mine involves cleansing CSV files which contain millions of records.  The cleansing is done by calling an RESTful API which takes a batch of at most 100 records at a time, processes them and returns the results.  The API is accessible via a HTTP request and up to 4 concurrent requests are allowed.  Naturally, to process the CSV file quicker, one should take advantage of the concurrency allowed by the API.  While a relatively straightforward ETL task, a slight complication is that the machine where the Java program is scheduled to run regularly has a memory limitation.  It’s not possible to load the entire file into memory at any time during the execution of the Java program.  To work around the limitation, the CSV file would need to be loaded into memory a portion at a time.

There are many ways to implement the task in Java.  The old way, before Java 8, would be to first split the input CSV file into 4 separate files and then spawn a thread for each separate file.  Each thread would read 100 records at a time into memory from the assigned file, pass the records to the API and write the results to a temporary output file.  When the threads are finished, combine the temporary output files together into a single output file.  This way works just fine and it’s sufficient for the task at hand but what if you were to use Stream and functional programming in Java 8?

The easiest (but not the most elegant) approach without loading the entire CSV file into memory is to first read the file and divide it into batches of 100 records by recording the line number of the first record of each batch into a list.  For example, an input CSV file with 333 rows would produce line number 2, 102, 202 and 302 in the list.  The first batch starts at line number 2, the second batch at line number 102 and so forth.  The code is:

public static List<Integer> partition(Path filePath, int batchSize) throws IOException { List<Integer> lineNumbers = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath)); rdr.readLine(); // skip header row while (rdr.readLine() != null) { int lineNumber = rdr.getLineNumber(); if ((lineNumber - 2) % batchSize == 0) { lineNumbers.add(lineNumber); } } rdr.close(); return lineNumbers; }


Next, we create a parallel stream from the list of line numbers by calling:

Stream<Integer> stream = lineNumbers.stream().parallel();

By default, the number of parallel streams is the same as the number of processors.  The stream is then piped to the extract method:

Stream<List<String>> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize));

The extract method returns the next 100 records from the CSV file starting from the given line number:

public static List<String> extract(Path path, int startLineNumber, int batchSize) { try { // load a batch of data rows from file to memory List<String> rows = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path)); for (String line = null; (line = rdr.readLine()) != null;) { int lineNumber = rdr.getLineNumber(); if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) { rows.add(line); } } return rows; } catch (IOException e) { throw new RuntimeException(e); } }

The next step is to process the 100 records by calling:

data = data.map(CsvBatchProcessor::transform);

The transform method is where the API is called:

public static List<String> transform(List<String> rows) { List<String> newRows = new ArrayList<>(); for (String row : rows) { newRows.add(row.toUpperCase()); } return newRows; }

Finally, we output the processed data to the console so it can redirected to a file.

data.flatMap(x -> x.stream()).forEach(System.out::println);

The careful reader may have noticed that the output CSV may not retain the same ordering as the input CSV because of the use of parallel stream.  This is definitely the case since there’s no guarantee to the ordering of the stream threads execution.  For my requirements, the ordering is not important so I’ve neglected it.

The full source code is:

import java.util.List; import java.util.ArrayList; import java.util.stream.Stream; import java.io.LineNumberReader; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.Path; /** * Perform ETL on an input file by a batch of rows at a time to conserve computational resources. */ public class CsvBatchProcessor { /** * Get the starting line number for each batch. */ public static List partition(Path filePath, int batchSize) throws IOException { List lineNumbers = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath)); rdr.readLine(); // skip header row while (rdr.readLine() != null) { int lineNumber = rdr.getLineNumber(); if ((lineNumber - 2) % batchSize == 0) { lineNumbers.add(lineNumber); } } rdr.close(); return lineNumbers; } /** * Extract a batch of rows from data file. */ public static List extract(Path path, int startLineNumber, int batchSize) { try { // load a batch of data rows from file to memory List rows = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path)); for (String line = null; (line = rdr.readLine()) != null;) { int lineNumber = rdr.getLineNumber(); if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) { rows.add(line); } } return rows; } catch (IOException e) { throw new RuntimeException(e); } } /** * Transform multiple rows of data at a time. */ public static List transform(List rows) { List newRows = new ArrayList<>(); for (String row : rows) { newRows.add(row.toUpperCase()); } return newRows; } public static void main(String[] args) throws IOException { // parse command line arguments String fileName = args[0]; Integer batchSize = Integer.parseInt(args[1]); // partition row indices into batches of given size Path filePath = Paths.get(fileName); List lineNumbers = CsvBatchProcessor.partition(filePath, batchSize); // extract data Stream stream = lineNumbers.stream().parallel(); Stream> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize)); // transform data data = data.map(CsvBatchProcessor::transform); // load data to console data.flatMap(x -> x.stream()).forEach(System.out::println); } }