To get my hands on MLflow, I started off by reading the excellent tutorial at MLflow.org to acquaint myself with the steps of packaging and serving a ML model to a REST API endpoint. The linear regression model and steps described in the tutorial works right out of the box flawlessly.Kudos to the documentation!
To gain a little more exposure, I then tried to deploy the deep learning model described in this databricks blog. Unlike the linear regression model, the DL model uses the keras library and the API input is tensor rather than dataframe.
In this blog post, I will share the steps needed and the modification necessary to the original DL code in order to deploy the model.
For reference, my setup is:
Ubuntu 20.04
Python 3.8
Anaconda3
Keras 2.6
The original DL code which uses Keras 2.7 was throwing errors so I had to downgrade to version 2.6 and consequently modified all instances of “keras” to “tf.keras” in the code. The complete modified code is:
import keras
from keras.layers import Dense, Flatten, Dropout
import numpy as np
import mlflow
import mlflow.keras
from mlflow.models.signature import infer_signature
The steps to deploy the model is described in the MLflow tutorial so I won’t repeat it here. Basically, what is needed to serve the model is the run ID of the model.The run ID is generated each time you run the code.You can find the run ID by launching the MLflow UI:
mlflow ui --host 0.0.0.0
goto http://<IP addr>:5000
Alternatively, you can find it in the mlruns subdirectory where your script was executed. With the run ID, you can deploy the model to an API endpoint by:
If everything goes well, the API should return a list of 10 probabilities which are the predictions for each digit.(The DL model predicts handwritten digits from the MNIST dataset.)
This is my quick stab at setting up Kafka on Ubuntu.As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog. I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python.For basic background info on Kafka, I would still recommend checking out that blog.
We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['sample'])
msg_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
else:
msg_count += 1
print(msg_count, msg.key(), msg.value())
If everything goes well, you should see:
[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
1 b'dish' b'spaghetti#1'
2 b'dish' b'spaghetti#2'
3 b'dish' b'spaghetti#3'
...
That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages.For more info, check out the examples directory:
While working on a classification task recently, I started out with the well-known Naive Bayes classifier but soon realized that I wasn't getting the classification performance I needed. At the same time, I was keenly aware that many of the feature variables carry a causal relationship between them that the classifier wasn't exploiting.
I needed another classifier with the simplicity and run-time performance of Naive Bayes so I started looking into Tree-augmented Naive Bayes (TAN). My search for a Python library supporting TAN eventually led me to pgmpy.
Pgmpy is a Python Library for learning (Structure and Parameter) and inference (Statistical and Causal) in Bayesian Networks.
Pgmpy is versatile with capabilities beyond what my task needed. It also has some structure learning algorithms but not TAN so I thought to contribute one to the library. In this article, I'll give a tutorial on how to use TAN in pgmpy.
For demonstration purpose, I'll generate sample data from a handcrafted Bayesian Network (BN) model. With the generated data, I'll train a Naive Bayes classifier and a TAN classifier, and compare their prediction performance. Our BN graph is illustrated below. Node C is our class variable while node R, S, T, U and V are the feature variables. All variables are discrete.
Generate Sample Data
First, import all the packages we will be needing:
import pandas as pd import numpy as np
from pgmpy.models.BayesianModel import BayesianModel from pgmpy.factors.discrete import TabularCPD from pgmpy.sampling import BayesianModelSampling from pgmpy.estimators import TreeSearch, BayesianEstimator
from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report from sklearn.naive_bayes import MultinomialNB
To encode our BN graph, we instantiate BayesianModel with the directed edges of the graph:
# construct naive bayes graph and add interaction between the features model = BayesianModel([('C', 'R'), ('C', 'S'), ('C', 'T'), ('C', 'U'), ('C', 'V'),
('R', 'S'), ('R', 'T'), ('R', 'U'), ('R', 'V')])
Next, we parameterize the graph by defining the conditional probabilities for each node. For more information on parameterization, check out the pgmpy tutorials.
# add conditional probability distribution to edges
We'll be using the data to train Naive Bayes and TAN classifier to see which one performs better. Before that, we split the data into training and test set.
# split data into training and test set
cols_features = df_data.columns.tolist()
cols_features.remove('C')
X = df_data[cols_features].values
y = df_data[['C']].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=21, stratify=y)
Now, we are ready to train our Naive Bayes model using the training data and then measure classification performance with the test set. I'll use the classifier from sklearn but pgmpy also has a Naive Bayes classifier as well shall you choose.
# train naive bayes classifier and predict
model_nb = MultinomialNB().fit(X_train, y_train)
y_pred = model_nb.predict(X_test)
print(classification_report(y_test, y_pred))
In short, Naive Bayes precision and recall are both 72%.
Predicting with TAN
Next up is TAN. First, let's learn the graph structure from the training data. To capture the interaction between the feature variables, TAN casts a tree structure over them. So, you need to pass in the "root_node" parameter. To be fair, we pretend that we don't know what the real graph looks like, so we pick a random node as the root node:
# learn the TAN graph structure from data
est = TreeSearch(df_train, root_node='U')
dag = est.estimate(estimator_type='tan', class_node='C')
Before we can make predictions, we also need to parameterize the model using the training data.
# construct Bayesian network by parameterizing the graph structure
Extract, Transform and Load (ETL) is a common operation in data processing.A recent ETL task of mine involves cleansing CSV files which contain millions of records.The cleansing is done by calling an RESTful API which takes a batch of at most 100 records at a time, processes them and returns the results.The API is accessible via a HTTP request and up to 4 concurrent requests are allowed.Naturally, to process the CSV file quicker, one should take advantage of the concurrency allowed by the API. While a relatively straightforward ETL task, a slight complication is that the machine where the Java program is scheduled to run regularly has a memory limitation.It’s not possible to load the entire file into memory at any time during the execution of the Java program.To work around the limitation, the CSV file would need to be loaded into memory a portion at a time.
There are many ways to implement the task in Java.The old way, before Java 8, would be to first split the input CSV file into 4 separate files and then spawn a thread for each separate file.Each thread would read 100 records at a time into memory from the assigned file, pass the records to the API and write the results to a temporary output file.When the threads are finished, combine the temporary output files together into a single output file.This way works just fine and it’s sufficient for the task at hand but what if you were to use Stream and functional programming in Java 8?
The easiest (but not the most elegant) approach without loading the entire CSV file into memory is to first read the file and divide it into batches of 100 records by recording the line number of the first record of each batch into a list.For example, an input CSV file with 333 rows would produce line number 2, 102, 202 and 302 in the list.The first batch starts at line number 2, the second batch at line number 102 and so forth.The code is:
public static List<Integer> partition(Path filePath, int batchSize) throws IOException {
List<Integer> lineNumbers = new ArrayList<>();
LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath));
rdr.readLine(); // skip header row
while (rdr.readLine() != null) {
int lineNumber = rdr.getLineNumber();
if ((lineNumber - 2) % batchSize == 0) {
lineNumbers.add(lineNumber);
}
}
rdr.close();
return lineNumbers;
}
Next, we create a parallel stream from the list of line numbers by calling:
By default, the number of parallel streams is the same as the number of processors.The stream is then piped to the extract method:
Stream<List<String>> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize));
The extract method returns the next 100 records from the CSV file starting from the given line number:
public static List<String> extract(Path path, int startLineNumber, int batchSize) {
try {
// load a batch of data rows from file to memory
List<String> rows = new ArrayList<>();
LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path));
for (String line = null; (line = rdr.readLine()) != null;) {
int lineNumber = rdr.getLineNumber();
if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) {
rows.add(line);
}
}
return rows;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
The next step is to process the 100 records by calling:
data = data.map(CsvBatchProcessor::transform);
The transform method is where the API is called:
public static List<String> transform(List<String> rows) {
List<String> newRows = new ArrayList<>();
for (String row : rows) {
newRows.add(row.toUpperCase());
}
return newRows;
}
Finally, we output the processed data to the console so it can redirected to a file.
The careful reader may have noticed that the output CSV may not retain the same ordering as the input CSV because of the use of parallel stream.This is definitely the case since there’s no guarantee to the ordering of the stream threads execution.For my requirements, the ordering is not important so I’ve neglected it.
The full source code is:
import java.util.List;
import java.util.ArrayList;
import java.util.stream.Stream;
import java.io.LineNumberReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.Path;
/**
* Perform ETL on an input file by a batch of rows at a time to conserve computational resources.
*/
public class CsvBatchProcessor {
/**
* Get the starting line number for each batch.
*/
public static List partition(Path filePath, int batchSize) throws IOException {
List lineNumbers = new ArrayList<>();
LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(filePath));
rdr.readLine(); // skip header row
while (rdr.readLine() != null) {
int lineNumber = rdr.getLineNumber();
if ((lineNumber - 2) % batchSize == 0) {
lineNumbers.add(lineNumber);
}
}
rdr.close();
return lineNumbers;
}
/**
* Extract a batch of rows from data file.
*/
public static List extract(Path path, int startLineNumber, int batchSize) {
try {
// load a batch of data rows from file to memory
List rows = new ArrayList<>();
LineNumberReader rdr = new LineNumberReader(Files.newBufferedReader(path));
for (String line = null; (line = rdr.readLine()) != null;) {
int lineNumber = rdr.getLineNumber();
if (lineNumber >= startLineNumber && lineNumber < startLineNumber + batchSize) {
rows.add(line);
}
}
return rows;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Transform multiple rows of data at a time.
*/
public static List transform(List rows) {
List newRows = new ArrayList<>();
for (String row : rows) {
newRows.add(row.toUpperCase());
}
return newRows;
}
public static void main(String[] args) throws IOException {
// parse command line arguments
String fileName = args[0];
Integer batchSize = Integer.parseInt(args[1]);
// partition row indices into batches of given size
Path filePath = Paths.get(fileName);
List lineNumbers = CsvBatchProcessor.partition(filePath, batchSize);
// extract data
Stream stream = lineNumbers.stream().parallel();
Stream> data = stream.map(i -> CsvBatchProcessor.extract(filePath, i, batchSize));
// transform data
data = data.map(CsvBatchProcessor::transform);
// load data to console
data.flatMap(x -> x.stream()).forEach(System.out::println);
}
}
If you work with external API a lot, you probably have encountered API that supports batch operations. Instead of making a separate API request for a single record, you can operate on multiple records simultaneously with one batch request. Computationally, batching helps speed up querying, reduces network overhead and places less load on the remote server.
Batching occurs commonly in the real world in streamlining processes or meeting process constraints. For example, your favorite pizza chains batches multiple orders into a single dispatch to save delivery costs and time. An amusement park ride processes guests who are waiting in queue in batches with the batch size being the ride capacity. The list goes on.
Batching is such a pervasive operation that you will likely encounter the need to write code implementing or simulating the concept in your software development career. This was the case for me in a recent project. The requirements involve chunking record IDs and submitting each chunk to an external API for fast look up in Java. The records IDs are generated in real-time one at a time so the total number of records is not known in advance.
Thankfully, with the addition of streams and lambda expressions in Java 8, the task can't be any simpler. Without further ado, here's the code that does the trick.
import java.util.List;
import java.util.ArrayList;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.function.Consumer;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Read an input file containing one ID per line and process a batch of IDs at once.
*/
public class BatchProcessor {
public static class BatchSpliterator implements Spliterator<List<String>> {
private final Spliterator<String> source;
private final int batchSize;
private List<String> batch;
BatchSpliterator(Spliterator<String> lineSpliterator, int batchSize) {
this.source = lineSpliterator;
this.batchSize = batchSize;
this.batch = new ArrayList<>();
}
public boolean tryAdvance(Consumer<? super List<String>> action) {
boolean success = this.source.tryAdvance(this.batch::add);
// flush if batch is filled or input data is exhausted
if (success && this.batch.size() == this.batchSize || !success && !this.batch.isEmpty()) {
action.accept(this.batch);
this.batch = new ArrayList<>();
return true;
}
return success;
}
public Spliterator<List<String>> trySplit() {
return null;
}
public long estimateSize() {
return Long.MAX_VALUE;
}
public int characteristics() {
return ORDERED|NONNULL;
}
/**
* Divide an input stream of IDs into batches.
*/
public static Stream<List<String>> batch(Stream<String> lines, int batchSize, boolean parallel) {
return StreamSupport.stream(
new BatchSpliterator(lines.spliterator(), batchSize),
parallel);
}
/**
* Process a batch of IDs in one call.
*/
public static List<String> processBatch(List<String> batch) {
List<String> newBatch = new ArrayList<>();
for (String str : batch) {
newBatch.add(str + "!");
}
return newBatch;
}
}
public static void main(String[] args) throws IOException {
// parse command line arguments
String fileName = args[0];
Integer batchSize = Integer.parseInt(args[1]);
// stream input file
BufferedReader br = Files.newBufferedReader(Paths.get(fileName));
Stream<String> stream = br.lines();
BatchSpliterator.batch(stream, batchSize, true).map(BatchSpliterator::processBatch).forEach(System.out::print);
}
}
The solution implements the java.util.Spliterator interface and a constructor that accepts a batch size and an incoming source stream consisting of strings. The key interface methods are:
tryAdvance() - Add the next item from source stream into a batch. If the batch is full, then pass the batch it to the given Consumer action and initialize a new batch.
trySplit() - Return null because the source stream size is not unknown in advance, therefore, it cannot be split. Unfortunately, this also means batching cannot be done in parallel.
estimateSize() - Return Long.MAX_VALUE because the source stream size is not unknown in advance.
characteristics() - Return the properties of the Spliterator which in this case involve iterating over an ordered sequence of non-null values.
The other two static methods in the class are our own helper methods.
batch() - Divide an input stream into batches of given size.
processBatch() - Process a batch of strings by appending an exclamation mark to each string.
In the main method, an input file specified in the command line arguments is used as our source stream. The long chained line of code chunks the input stream of strings, calls processBatch() on each batch, and then dumps each batch out to screen.
From the output, we see that the input numbers are batched in groups of 10 as expected and each number is appended with an exclamation mark as coded in the processBatch() method. To make use of this code, simply write your own business logic in the method. Of course, you would also need to change the input stream source according to your use case.
So far, I have demonstrated how to initialize, load and parse data in HPCC ECL. We have been working with a toy data set consisting of person id, names and dates of birth. In this article, I'll add a second data set and show how to join multiple data sets.
The new data set describes who likes whom denoted by the person ID. The record type of the data set is:
Record_Like := RECORD
UNSIGNED8 SourceId;
UNSIGNED8 TargetId;
END;
It's hard to figure out who likes whom just by looking at a pair of IDs in the likes_file data set. To make it easier, we'll combine the members_file data set we have seen in previous articles with the new likes_file data set so that the person names are spelled out. The joined results will have the record type:
Essentially, Record_Named_Like extends Record_Like with 4 additional fields. The data types for the SourceLastName, SourceFirstName, TargetLastName and TargetFirstName fields are the same as LastName or FirstName from the members_file data set.
First, we are going to join the 2 data sets so that Record_Member.Id is Record_Like.SourceId. We'll copy the last and first name from the corresponding Record_Member record from likes_file to a new Record_Named_Like record.
The JOIN action joins the two data sets based on the condition LEFT.Id=RIGHT.SourceId where LEFT refers to members_file and RIGHT refers to likes_file. During joining, the transform function JoinMemberAndLikes is called to produce the resulting joint record of type Record_Named_Like.
We do another similar join such that Record_Member.Id is equal to Record_Named_Like.TargetId to identify the name of the liked person.
Now, we can easily spot who likes whom by name. Let's go a bit further by doing some summary statistics and reporting. We want to generate a report that tells us how many likes there are for each person and of those likes how many are for Data and Worf. Precisely, the record type for the report is:
In SQL parlance, the first 2 fields (LastName and FirstName) in Record_Likes_Stats are the keys for our upcoming group-by query while the remaining fields (LikesData, LikesWorf and TotalLikes) are the aggregate functions performed on each group. For example, TotalLikes contains the count for each unique group of last and first name. LikesData contains the portion of the group count satisfying the filter condition of TargetLastName = 'Data'.
The parameters to the TABLE action are the input data set, the returning record type and the group-by keys. Here is what the output looks like if you output member_likes_stats:
To limit the output to only those who like Data or Worf, you can add a filter to the OUTPUT action:
OUTPUT(member_likes_stats(LikesData > 0 OR LikesWorf > 0));
In this article, I have introduced two new ECL actions which are JOIN and TABLE. There are other ways of using these actions which I haven't described. To learn more, you can check out the ECL Language Reference.