Showing posts with label Software Development. Show all posts
Showing posts with label Software Development. Show all posts

Thursday, November 4, 2021

Deploying DL Models with MLflow

To get my hands on MLflow, I started off by reading the excellent tutorial at MLflow.org to acquaint myself with the steps of packaging and serving a ML model to a REST API endpoint.  The linear regression model and steps described in the tutorial works right out of the box flawlessly.  Kudos to the documentation!

To gain a little more exposure, I then tried to deploy the deep learning model described in this databricks blog.  Unlike the linear regression model, the DL model uses the keras library and the API input is tensor rather than dataframe.

In this blog post, I will share the steps needed and the modification necessary to the original DL code in order to deploy the model.

For reference, my setup is:
  • Ubuntu 20.04
  • Python 3.8
  • Anaconda3
  • Keras 2.6

The original DL code which uses Keras 2.7 was throwing errors so I had to downgrade to version 2.6 and consequently modified all instances of “keras” to “tf.keras” in the code.  The complete modified code is:


import keras
from keras.layers import Dense, Flatten, Dropout
import numpy as np
import mlflow
import mlflow.keras
from mlflow.models.signature import infer_signature
import tensorflow as tf
from urllib.parse import urlparse
# Let's prepare the training data!
(train_X, train_Y), (test_X, test_Y) = tf.keras.datasets.mnist.load_data()
trainX, testX = train_X / 255.0, test_X / 255.0
trainY = tf.keras.utils.to_categorical(train_Y)
testY = tf.keras.utils.to_categorical(test_Y)
# Let's define the model!
model = tf.keras.models.Sequential(
    [
      Flatten(),
      Dense(128, activation="relu", name="layer1"),
      Dropout(0.2),
      Dense(10, activation='softmax')
    ]
)
opt = tf.keras.optimizers.SGD(lr=0.01, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
# Let's fit the model!
model.fit(trainX, trainY, epochs=2, batch_size=32, validation_data=(testX, testY))
# Create a model signature using the tensor input to store in the MLflow model registry
signature = infer_signature(testX, model.predict(testX))
# Let's check out how it looks
print(signature)
# inputs:
#    [Tensor('float64', (-1, 28, 28))]
# outputs:
#    [Tensor('float32', (-1, 10))]
# Create an input example to store in the MLflow model registry
input_example = np.expand_dims(trainX[0], axis=0)
# Let's log the model in the MLflow model registry
registered_model_name = "tensor-blog-post"
# Model registry does not work with file store
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
if tracking_url_type_store != "file":
    # Register the model
    # There are other ways to use the Model Registry, which depends on the use case,
    # please refer to the doc for more information:
    # https://mlflow.org/docs/latest/model-registry.html#api-workflow
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example, registered_model_name=registered_model_name)
else:
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example)

The steps to deploy the model is described in the MLflow tutorial so I won’t repeat it here.  Basically, what is needed to serve the model is the run ID of the model.  The run ID is generated each time you run the code.  You can find the run ID by launching the MLflow UI:

mlflow ui --host 0.0.0.0
goto http://<IP addr>:5000

Alternatively, you can find it in the mlruns subdirectory where your script was executed. With the run ID, you can deploy the model to an API endpoint by:

mlflow models serve -m mlruns/0/<RUN ID>/artifacts/model -p 1234

To test the API, the input must be a tensor of shape (-1, 28, 28) and consist of floats such as:

curl -X POST -H "Content-Type:application/json” --data '{“inputs“:[[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0]]}' http://127.0.0.1:1234/invocations
If everything goes well, the API should return a list of 10 probabilities which are the predictions for each digit.  (The DL model predicts handwritten digits from the MNIST dataset.)

[[0.026583805680274963, 0.0004244133597239852, 0.29056885838508606, 0.4104505479335785, 3.0038367185625248e-05, 0.14565511047840118, 0.002596472157165408, 0.0003772564814426005, 0.12106209993362427, 0.0022514534648507833]]


Tuesday, November 2, 2021

Crash Course on Confluent-Kafka

This is my quick stab at setting up Kafka on Ubuntu.  As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog.  I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python.  For basic background info on Kafka, I would still recommend checking out that blog.


For reference, my setup is:

  • Ubuntu 20.04
  • Python 3.8
  • Kafka 3.0.0
  • Confluent-kafka


Setting Up Kafka


Download Kafka by following these steps:

  1. Download the latest 3.0.0 release of Kafka
  2. Un-tar the archive: tar -xzf kafka_2.13-3.0.0.tgz
  3. cd to the Kafka directory

Assuming you're setting up Kafka on your local machine, edit the following line in config/server.properties

advertised.listeners=PLAINTEXT://127.0.0.1:9092


This line will enable the Python scripts provided in this blog to work properly.


Next, start (and stop) the servers using the shell scripts in the bin directory:

# start ZooKeeper Server

bin/zookeeper-server-start.sh config/zookeeper.properties


# start Kafka Server

bin/kafka-server-start.sh config/server.properties


At this point, your Kafka cluster is set up locally and ready to use.  For demo, let's create a "sample" Kafka topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 \ --partitions 1 --topic sample




Working with Confluent-Kafka


Install confluent-kafka:

pip install confluent-kafka


We'll create a Python script to publish messages to the "sample" Kakfa topic every 2 seconds:


from confluent_kafka import Producer

import time


producer = Producer({

    'bootstrap.servers': '127.0.0.1:9092',

})


topic = 'sample'

msg_count = 0


while True:

    msg_count += 1 

    print(msg_count)

    producer.produce(topic, key='dish', value='spaghetti#%d' % msg_count)

    producer.flush()

    time.sleep(2)


We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.


from confluent_kafka import Consumer, KafkaError

 

conf = {

    'bootstrap.servers': '127.0.0.1:9092',

    'group.id': 'my-group',

    'auto.offset.reset': 'earliest'

}


consumer = Consumer(conf)

consumer.subscribe(['sample'])


msg_count = 0

while True:

  msg = consumer.poll(1.0)

  if msg is None:

      continue


  if msg.error():

      if msg.error().code() == KafkaError._PARTITION_EOF:

        continue

      else:

        print(msg.error())

        break

  else:

      msg_count += 1

      print(msg_count, msg.key(), msg.value())


If everything goes well, you should see:


[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

1 b'dish' b'spaghetti#1'

2 b'dish' b'spaghetti#2'

3 b'dish' b'spaghetti#3'

...


That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages.  For more info, check out the examples directory: 

https://github.com/confluentinc/confluent-kafka-python


Saturday, August 1, 2020

Classification with Tree-augmented Naive Bayes (TAN) and Pgmpy


While working on a classification task recently, I started out with the well-known Naive Bayes classifier but soon realized that I wasn't getting the classification performance I needed.  At the same time, I was keenly aware that many of the feature variables carry a causal relationship between them that the classifier wasn't exploiting.

I needed another classifier with the simplicity and run-time performance of Naive Bayes so I started looking into Tree-augmented Naive Bayes (TAN).  My search for a Python library supporting TAN eventually led me to pgmpy.

Pgmpy is a Python Library for learning (Structure and Parameter) and inference (Statistical and Causal) in Bayesian Networks.

Pgmpy is versatile with capabilities beyond what my task needed.  It also has some structure learning algorithms but not TAN so I thought to contribute one to the library.  In this article, I'll give a tutorial on how to use TAN in pgmpy.

For demonstration purpose, I'll generate sample data from a handcrafted Bayesian Network (BN) model.  With the generated data, I'll train a Naive Bayes classifier and a TAN classifier, and compare their prediction performance.  Our BN graph is illustrated below.  Node C is our class variable while node R, S, T, U and V are the feature variables.  All variables are discrete.




Generate Sample Data


First, import all the packages we will be needing:


import pandas as pd
import numpy as np
 
from pgmpy.models.BayesianModel import BayesianModel
from pgmpy.factors.discrete import TabularCPD
from pgmpy.sampling import BayesianModelSampling
from pgmpy.estimators import TreeSearch, BayesianEstimator
 
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.naive_bayes import MultinomialNB


To encode our BN graph, we instantiate BayesianModel with the directed edges of the graph:


# construct naive bayes graph and add interaction between the features
model = BayesianModel([('C', 'R'), ('C', 'S'), ('C', 'T'), ('C', 'U'), ('C', 'V'),
                       ('R', 'S'), ('R', 'T'), ('R', 'U'), ('R', 'V')])


Next, we parameterize the graph by defining the conditional probabilities for each node.  For more information on parameterization, check out the pgmpy tutorials.


# add conditional probability distribution to edges
cpd_c = TabularCPD('C', 2, [[0.5], [0.5]])
cpd_r = TabularCPD('R', 3, [[0.6,0.2],[0.3,0.5],[0.1,0.3]], evidence=['C'],
                            evidence_card=[2])
cpd_s = TabularCPD('S', 3, [[0.1,0.1,0.2,0.2,0.7,0.1],
                            [0.1,0.3,0.1,0.2,0.1,0.2],
                            [0.8,0.6,0.7,0.6,0.2,0.7]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_t = TabularCPD('T', 2, [[0.7,0.2,0.2,0.5,0.1,0.3],
                            [0.3,0.8,0.8,0.5,0.9,0.7]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_u = TabularCPD('U', 3, [[0.3,0.8,0.2,0.8,0.4,0.7],
                            [0.4,0.1,0.4,0.1,0.1,0.1],
                            [0.3,0.1,0.4,0.1,0.5,0.2]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_v = TabularCPD('V', 2, [[0.5,0.6,0.6,0.5,0.5,0.4],
                            [0.5,0.4,0.4,0.5,0.5,0.6]],
                            evidence=['C','R'], evidence_card=[2,3])
model.add_cpds(cpd_c, cpd_r, cpd_s, cpd_t, cpd_u, cpd_v)


With our BN model defined, we can now generate sample data:


# generate sample data from our BN model
inference = BayesianModelSampling(model)
df_data = inference.forward_sample(size=30000, return_type='dataframe')


We'll be using the data to train Naive Bayes and TAN classifier to see which one performs better.  Before that, we split the data into training and test set.


# split data into training and test set
cols_features = df_data.columns.tolist()
cols_features.remove('C')

X = df_data[cols_features].values
y = df_data[['C']].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=21, stratify=y)
df_train = pd.DataFrame(np.concatenate((y_train, X_train), axis=1), columns=df_data.columns)


Predicting with Naive Bayes


Now, we are ready to train our Naive Bayes model using the training data and then measure classification performance with the test set.  I'll use the classifier from sklearn but pgmpy also has a Naive Bayes classifier as well shall you choose.


# train naive bayes classifier and predict
model_nb = MultinomialNB().fit(X_train, y_train)
y_pred = model_nb.predict(X_test)
print(classification_report(y_test, y_pred))


In short, Naive Bayes precision and recall are both 72%.


Predicting with TAN


Next up is TAN.  First, let's learn the graph structure from the training data.  To capture the interaction between the feature variables, TAN casts a tree structure over them.  So, you need to pass in the "root_node" parameter.  To be fair, we pretend that we don't know what the real graph looks like, so we pick a random node as the root node:


# learn the TAN graph structure from data
est = TreeSearch(df_train, root_node='U')
dag = est.estimate(estimator_type='tan', class_node='C')


Before we can make predictions, we also need to parameterize the model using the training data.


# construct Bayesian network by parameterizing the graph structure
model = BayesianModel(dag.edges())
model.fit(df_train, estimator=BayesianEstimator, prior_type='K2')


Now we're ready to classify the test data using the TAN model and measure the performance:


# draw inference from BN
X_test_df = pd.DataFrame(X_test, columns=cols_features)
y_pred = model.predict(X_test_df).values
print(classification_report(y_test, y_pred))


Our TAN model precision and recall are both 81% compare to 72% for Naive Bayes.  That's quite a significant improvement!


Predicting with Incomplete Data


As a bonus, now let's try to make predictions on incomplete data.  We will ask pgmpy to predict the class when only feature S, T and U are given.


from pgmpy.inference import VariableElimination

# predict the class given some of the features
infer = VariableElimination(model) 
query = infer.query(variables=['C'], evidence={'S': 0, 'T': 1, 'U': 2})
print(query)


In this case, pgmpy returns the chance of C=1 is 93%.

Instead of predicting the class, let's predict what feature R is likely to be given class C=1 and feature S=1:


# predict the feature given the class and another feature
query = infer.query(variables=['R'], evidence={'C': 1, 'S': 1})
print(query)


In this case, pgmpy returns the chance of R=0 is 27%, R=1 is 32% and R=2 is 41%.

Pretty cool, isn't it?  Like I said in the beginning of the article, pgmpy is versatile and I'm only scratching the surface of what it can do.

Saturday, April 7, 2018

Extract, Transform and Load (ETL) in Java


Extract, Transform and Load (ETL) is a common operation in data processing.  A recent ETL task of mine involves cleansing CSV files which contain millions of records.  The cleansing is done by calling an RESTful API which takes a batch of at most 100 records at a time, processes them and returns the results.  The API is accessible via a HTTP request and up to 4 concurrent requests are allowed.  Naturally, to process the CSV file quicker, one should take advantage of the concurrency allowed by the API.  While a relatively straightforward ETL task, a slight complication is that the machine where the Java program is scheduled to run regularly has a memory limitation.  It’s not possible to load the entire file into memory at any time during the execution of the Java program.  To work around the limitation, the CSV file would need to be loaded into memory a portion at a time.

There are many ways to implement the task in Java.  The old way, before Java 8, would be to first split the input CSV file into 4 separate files and then spawn a thread for each separate file.  Each thread would read 100 records at a time into memory from the assigned file, pass the records to the API and write the results to a temporary output file.  When the threads are finished, combine the temporary output files together into a single output file.  This way works just fine and it’s sufficient for the task at hand but what if you were to use Stream and functional programming in Java 8?

The easiest (but not the most elegant) approach without loading the entire CSV file into memory is to first read the file and divide it into batches of 100 records by recording the line number of the first record of each batch into a list.  For example, an input CSV file with 333 rows would produce line number 2, 102, 202 and 302 in the list.  The first batch starts at line number 2, the second batch at line number 102 and so forth.  The code is:

public static List<Integer> partition(Path filePath, int batchSize) throws IOException { List<Integer> lineNumbers = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath)); rdr.readLine(); // skip header row while (rdr.readLine() != null) { int lineNumber = rdr.getLineNumber(); if ((lineNumber - 2) % batchSize == 0) { lineNumbers.add(lineNumber); } } rdr.close(); return lineNumbers; }


Next, we create a parallel stream from the list of line numbers by calling:

Stream<Integer> stream = lineNumbers.stream().parallel();

By default, the number of parallel streams is the same as the number of processors.  The stream is then piped to the extract method:

Stream<List<String>> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize));

The extract method returns the next 100 records from the CSV file starting from the given line number:

public static List<String> extract(Path path, int startLineNumber, int batchSize) { try { // load a batch of data rows from file to memory List<String> rows = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path)); for (String line = null; (line = rdr.readLine()) != null;) { int lineNumber = rdr.getLineNumber(); if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) { rows.add(line); } } return rows; } catch (IOException e) { throw new RuntimeException(e); } }

The next step is to process the 100 records by calling:

data = data.map(CsvBatchProcessor::transform);

The transform method is where the API is called:

public static List<String> transform(List<String> rows) { List<String> newRows = new ArrayList<>(); for (String row : rows) { newRows.add(row.toUpperCase()); } return newRows; }

Finally, we output the processed data to the console so it can redirected to a file.

data.flatMap(x -> x.stream()).forEach(System.out::println);

The careful reader may have noticed that the output CSV may not retain the same ordering as the input CSV because of the use of parallel stream.  This is definitely the case since there’s no guarantee to the ordering of the stream threads execution.  For my requirements, the ordering is not important so I’ve neglected it.

The full source code is:

import java.util.List; import java.util.ArrayList; import java.util.stream.Stream; import java.io.LineNumberReader; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.Path; /** * Perform ETL on an input file by a batch of rows at a time to conserve computational resources. */ public class CsvBatchProcessor { /** * Get the starting line number for each batch. */ public static List partition(Path filePath, int batchSize) throws IOException { List lineNumbers = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath)); rdr.readLine(); // skip header row while (rdr.readLine() != null) { int lineNumber = rdr.getLineNumber(); if ((lineNumber - 2) % batchSize == 0) { lineNumbers.add(lineNumber); } } rdr.close(); return lineNumbers; } /** * Extract a batch of rows from data file. */ public static List extract(Path path, int startLineNumber, int batchSize) { try { // load a batch of data rows from file to memory List rows = new ArrayList<>(); LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path)); for (String line = null; (line = rdr.readLine()) != null;) { int lineNumber = rdr.getLineNumber(); if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) { rows.add(line); } } return rows; } catch (IOException e) { throw new RuntimeException(e); } } /** * Transform multiple rows of data at a time. */ public static List transform(List rows) { List newRows = new ArrayList<>(); for (String row : rows) { newRows.add(row.toUpperCase()); } return newRows; } public static void main(String[] args) throws IOException { // parse command line arguments String fileName = args[0]; Integer batchSize = Integer.parseInt(args[1]); // partition row indices into batches of given size Path filePath = Paths.get(fileName); List lineNumbers = CsvBatchProcessor.partition(filePath, batchSize); // extract data Stream stream = lineNumbers.stream().parallel(); Stream> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize)); // transform data data = data.map(CsvBatchProcessor::transform); // load data to console data.flatMap(x -> x.stream()).forEach(System.out::println); } }



Thursday, March 15, 2018

Streaming and Batching in Java

If you work with external API a lot, you probably have encountered API that supports batch operations.  Instead of making a separate API request for a single record, you can operate on multiple records simultaneously with one batch request.  Computationally, batching helps speed up querying, reduces network overhead and places less load on the remote server.

Batching occurs commonly in the real world in streamlining processes or meeting process constraints.  For example, your favorite pizza chains batches multiple orders into a single dispatch to save delivery costs and time.  An amusement park ride processes guests who are waiting in queue in batches with the batch size being the ride capacity.  The list goes on.

Batching is such a pervasive operation that you will likely encounter the need to write code implementing or simulating the concept in your software development career.  This was the case for me in a recent project.  The requirements involve chunking record IDs and submitting each chunk to an external API for fast look up in Java.  The records IDs are generated in real-time one at a time so the total number of records is not known in advance.

Thankfully, with the addition of streams and lambda expressions in Java 8, the task can't be any simpler.  Without further ado, here's the code that does the trick.


import java.util.List;
import java.util.ArrayList;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.function.Consumer;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
 * Read an input file containing one ID per line and process a batch of IDs at once.
 */
public class BatchProcessor {
        public static class BatchSpliterator implements Spliterator<List<String>> {
                private final Spliterator<String> source;
                private final int batchSize;
                private List<String> batch;

                BatchSpliterator(Spliterator<String> lineSpliterator, int batchSize) {
                        this.source = lineSpliterator;
                        this.batchSize = batchSize;
                        this.batch = new ArrayList<>();
                }

                public boolean tryAdvance(Consumer<? super List<String>> action) {
                        boolean success = this.source.tryAdvance(this.batch::add);
                        // flush if batch is filled or input data is exhausted
                        if (success && this.batch.size() == this.batchSize || !success && !this.batch.isEmpty()) {
                                action.accept(this.batch);
                                this.batch = new ArrayList<>();
                                return true;
                        }
                        return success;
                }

                public Spliterator<List<String>> trySplit() {
                        return null;
                }

                public long estimateSize() {
                        return Long.MAX_VALUE;
                }

                public int characteristics() {
                        return ORDERED|NONNULL;
                }

                /**
                 * Divide an input stream of IDs into batches.
                 */
                public static Stream<List<String>> batch(Stream<String> lines, int batchSize, boolean parallel) {
                        return StreamSupport.stream(
                                new BatchSpliterator(lines.spliterator(), batchSize),
                                parallel);
                }

                /**
                 * Process a batch of IDs in one call.
                 */
                public static List<String> processBatch(List<String> batch) {
                        List<String> newBatch = new ArrayList<>();
                        for (String str : batch) {
                                newBatch.add(str + "!");
                        }
                        return newBatch;
                }
        }

        public static void main(String[] args) throws IOException {
                // parse command line arguments
                String fileName = args[0];
                Integer batchSize = Integer.parseInt(args[1]);

                // stream input file
                BufferedReader br = Files.newBufferedReader(Paths.get(fileName));
                Stream<String> stream = br.lines();
                BatchSpliterator.batch(stream, batchSize, true).map(BatchSpliterator::processBatch).forEach(System.out::print);
        }
}


The solution implements the java.util.Spliterator interface and a constructor that accepts a batch size and an incoming source stream consisting of strings.  The key interface methods are:

tryAdvance() - Add the next item from source stream into a batch.  If the batch is full, then pass the batch it to the given Consumer action and initialize a new batch.

trySplit() - Return null because the source stream size is not unknown in advance, therefore, it cannot be split.  Unfortunately, this also means batching cannot be done in parallel.

estimateSize() - Return Long.MAX_VALUE because the source stream size is not unknown in advance.

characteristics() - Return the properties of the Spliterator which in this case involve iterating over an ordered sequence of non-null values.


The other two static methods in the class are our own helper methods.


batch() - Divide an input stream into batches of given size.

processBatch() - Process a batch of strings by appending an exclamation mark to each string.


In the main method, an input file specified in the command line arguments is used as our source stream.  The long chained line of code chunks the input stream of strings, calls processBatch() on each batch, and then dumps each batch out to screen.

BatchSpliterator.batch(stream, batchSize, true).map(BatchSpliterator::processBatch).forEach(System.out::print);

As an example, given an input file with one number per line starting from number 1, the output of the program is:


[1!, 2!, 3!, 4!, 5!, 6!, 7!, 8!, 9!, 10!][11!, 12!, 13!, 14!, 15!, 16!, 17!, 18!, 19!, 20!][21!, 22!, 23!, 24!, 25!, 26!, 27!, 28!, 29!, 30!][31!, 32!, 33!, 34!, 35!, 36!, 37!, 38!, 39!, 40!][41!, 42!, 43!, 44!, 45!, 46!, 47!, 48!, 49!, 50!][51!, 52!, 53!, 54!, 55!, 56!, 57!, 58!, 59!, 60!][16!, 62!, 63!, 64!, 65!, 66!, 67!, 68!, 69!, 70!][71!, 72!, 73!, 74!, 75!, 76!, 77!, 78!, 79!, 80!][81!, 82!, 83!, 84!, 85!, 86!, 87!, 88!, 89!, 90!][91!, 92!, 93!, 94!, 95!, 96!, 97!, 98!, 99!, 100!]
        

From the output, we see that the input numbers are batched in groups of 10 as expected and each number is appended with an exclamation mark as coded in the processBatch() method.  To make use of this code, simply write your own business logic in the method.  Of course, you would also need to change the input stream source according to your use case.


Saturday, October 22, 2016

Joining Data Sets in ECL

So far, I have demonstrated how to initialize, load and parse data in HPCC ECL.  We have been working with a toy data set consisting of person id, names and dates of birth.  In this article, I'll add a second data set and show how to join multiple data sets.

The new data set describes who likes whom denoted by the person ID.  The record type of the data set is:

Record_Like := RECORD
    UNSIGNED8 SourceId;
    UNSIGNED8 TargetId;
END;

And here are some sample data:

likes_file := DATASET([
        {1,2},{1,3},{1,4},{1,5},{1,6},{1,7},{1,8},{1,9},{1,10},
        {2,3},{2,4},{2,6},{2,7},{2,8},
        {3,8},{3,9},{3,11},
        {4,5},
        {5,7},
        {6,7},{6,9},
        {7,4},{7,5},{7,6},
        {8,3},{8,9},
        {9,3},{9,6},{9,8},
        {12,1}], Record_Like);

It's hard to figure out who likes whom just by looking at a pair of IDs in the likes_file data set.  To make it easier, we'll combine the members_file data set we have seen in previous articles with the new likes_file data set so that the person names are spelled out.  The joined results will have the record type:

Record_Named_Like := RECORD
    SourceLastName   := members_file.LastName;
    SourceFirstName  := members_file.FirstName;
    Record_Like;
    TargetLastName   := members_file.LastName;
    TargetFirstName  := members_file.FirstName;
END;

Essentially, Record_Named_Like extends Record_Like with 4 additional fields.  The data types for the SourceLastName, SourceFirstName, TargetLastName and TargetFirstName fields are the same as LastName or FirstName from the members_file data set.

First, we are going to join the 2 data sets so that Record_Member.Id is Record_Like.SourceId.  We'll copy the last and first name from the corresponding Record_Member record from likes_file to a new Record_Named_Like record.

Record_Named_Like JoinMembersAndLikes(Record_Member L, Record_Like R) := TRANSFORM
    SELF.SourceLastName    := L.LastName;
    SELF.SourceFirstName   := L.FirstName;
    SELF.TargetLastName    := '';
    SELF.TargetFirstName   := '';    
    SELF := R;
END;

member_likes_file := JOIN(members_file, likes_file, LEFT.Id=RIGHT.SourceId, JoinMembersAndLikes(LEFT, RIGHT));

The JOIN action joins the two data sets based on the condition LEFT.Id=RIGHT.SourceId where LEFT refers to members_file and RIGHT refers to likes_file.  During joining, the transform function JoinMemberAndLikes is called to produce the resulting joint record of type Record_Named_Like.

We do another similar join such that Record_Member.Id is equal to Record_Named_Like.TargetId to identify the name of the liked person.

Record_Named_Like JoinLikesAndMembers(Record_Member L, Record_Named_Like R) := TRANSFORM
    SELF.TargetLastName    := L.LastName;
    SELF.TargetFirstName   := L.FirstName;
    SELF := R;
END;

member_likes_member_file := JOIN(members_file, member_likes_file, LEFT.Id = RIGHT.TargetId, JoinLikesAndMembers(LEFT, RIGHT));

After both joins, the final data set member_likes_member_file looks like:

SourceLastName   SourceFirstName   SourceId   Target   TargetLastName   TargetFirstName
Guinan                             12         1        Picard           Jean-Luc
Picard           Jean-Luc          1          2        Riker            William
...

Now, we can easily spot who likes whom by name.  Let's go a bit further by doing some summary statistics and reporting.  We want to generate a report that tells us how many likes there are for each person and of those likes how many are for Data and Worf.  Precisely, the record type for the report is:

Record_Likes_Stats := RECORD
    LastName    := member_likes_member_file.SourceLastName;
    FirstName   := member_likes_member_file.SourceFirstName;
    LikesData   := COUNT(GROUP, member_likes_member_file.TargetLastName = 'Data');
    LikesWorf   := COUNT(GROUP, member_likes_member_file.TargetLastName = 'Worf');
    TotalLikes  := COUNT(GROUP);
END;

In SQL parlance, the first 2 fields (LastName and FirstName) in Record_Likes_Stats are the keys for our upcoming group-by query while the remaining fields (LikesData, LikesWorf and TotalLikes) are the aggregate functions performed on each group.  For example, TotalLikes contains the count for each unique group of last and first name.  LikesData contains the portion of the group count satisfying the filter condition of TargetLastName = 'Data'.

The group-by query is done by the TABLE action:

member_likes_stats := TABLE(member_likes_member_file, Record_Likes_Stats, SourceLastName, SourceFirstName);

The parameters to the TABLE action are the input data set, the returning record type and the group-by keys.  Here is what the output looks like if you output member_likes_stats:

LastName   FirstName   LikesData   LikesWorf   TotalLikes
Crusher    Beverly     0           0           2
Crusher    Wesley      1           0           3
... 

To limit the output to only those who like Data or Worf, you can add a filter to the OUTPUT action:

OUTPUT(member_likes_stats(LikesData > 0 OR LikesWorf > 0));

In this article, I have introduced two new ECL actions which are JOIN and TABLE.  There are other ways of using these actions which I haven't described.  To learn more, you can check out the ECL Language Reference.