Thursday, November 4, 2021

Deploying DL Models with MLflow

To get my hands on MLflow, I started off by reading the excellent tutorial at MLflow.org to acquaint myself with the steps of packaging and serving a ML model to a REST API endpoint.  The linear regression model and steps described in the tutorial works right out of the box flawlessly.  Kudos to the documentation!

To gain a little more exposure, I then tried to deploy the deep learning model described in this databricks blog.  Unlike the linear regression model, the DL model uses the keras library and the API input is tensor rather than dataframe.

In this blog post, I will share the steps needed and the modification necessary to the original DL code in order to deploy the model.

For reference, my setup is:
  • Ubuntu 20.04
  • Python 3.8
  • Anaconda3
  • Keras 2.6

The original DL code which uses Keras 2.7 was throwing errors so I had to downgrade to version 2.6 and consequently modified all instances of “keras” to “tf.keras” in the code.  The complete modified code is:


import keras
from keras.layers import Dense, Flatten, Dropout
import numpy as np
import mlflow
import mlflow.keras
from mlflow.models.signature import infer_signature
import tensorflow as tf
from urllib.parse import urlparse
# Let's prepare the training data!
(train_X, train_Y), (test_X, test_Y) = tf.keras.datasets.mnist.load_data()
trainX, testX = train_X / 255.0, test_X / 255.0
trainY = tf.keras.utils.to_categorical(train_Y)
testY = tf.keras.utils.to_categorical(test_Y)
# Let's define the model!
model = tf.keras.models.Sequential(
    [
      Flatten(),
      Dense(128, activation="relu", name="layer1"),
      Dropout(0.2),
      Dense(10, activation='softmax')
    ]
)
opt = tf.keras.optimizers.SGD(lr=0.01, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
# Let's fit the model!
model.fit(trainX, trainY, epochs=2, batch_size=32, validation_data=(testX, testY))
# Create a model signature using the tensor input to store in the MLflow model registry
signature = infer_signature(testX, model.predict(testX))
# Let's check out how it looks
print(signature)
# inputs:
#    [Tensor('float64', (-1, 28, 28))]
# outputs:
#    [Tensor('float32', (-1, 10))]
# Create an input example to store in the MLflow model registry
input_example = np.expand_dims(trainX[0], axis=0)
# Let's log the model in the MLflow model registry
registered_model_name = "tensor-blog-post"
# Model registry does not work with file store
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
if tracking_url_type_store != "file":
    # Register the model
    # There are other ways to use the Model Registry, which depends on the use case,
    # please refer to the doc for more information:
    # https://mlflow.org/docs/latest/model-registry.html#api-workflow
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example, registered_model_name=registered_model_name)
else:
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example)

The steps to deploy the model is described in the MLflow tutorial so I won’t repeat it here.  Basically, what is needed to serve the model is the run ID of the model.  The run ID is generated each time you run the code.  You can find the run ID by launching the MLflow UI:

mlflow ui --host 0.0.0.0
goto http://<IP addr>:5000

Alternatively, you can find it in the mlruns subdirectory where your script was executed. With the run ID, you can deploy the model to an API endpoint by:

mlflow models serve -m mlruns/0/<RUN ID>/artifacts/model -p 1234

To test the API, the input must be a tensor of shape (-1, 28, 28) and consist of floats such as:

curl -X POST -H "Content-Type:application/json” --data '{“inputs“:[[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0]]}' http://127.0.0.1:1234/invocations
If everything goes well, the API should return a list of 10 probabilities which are the predictions for each digit.  (The DL model predicts handwritten digits from the MNIST dataset.)

[[0.026583805680274963, 0.0004244133597239852, 0.29056885838508606, 0.4104505479335785, 3.0038367185625248e-05, 0.14565511047840118, 0.002596472157165408, 0.0003772564814426005, 0.12106209993362427, 0.0022514534648507833]]


Tuesday, November 2, 2021

Crash Course on Confluent-Kafka

This is my quick stab at setting up Kafka on Ubuntu.  As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog.  I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python.  For basic background info on Kafka, I would still recommend checking out that blog.


For reference, my setup is:

  • Ubuntu 20.04
  • Python 3.8
  • Kafka 3.0.0
  • Confluent-kafka


Setting Up Kafka


Download Kafka by following these steps:

  1. Download the latest 3.0.0 release of Kafka
  2. Un-tar the archive: tar -xzf kafka_2.13-3.0.0.tgz
  3. cd to the Kafka directory

Assuming you're setting up Kafka on your local machine, edit the following line in config/server.properties

advertised.listeners=PLAINTEXT://127.0.0.1:9092


This line will enable the Python scripts provided in this blog to work properly.


Next, start (and stop) the servers using the shell scripts in the bin directory:

# start ZooKeeper Server

bin/zookeeper-server-start.sh config/zookeeper.properties


# start Kafka Server

bin/kafka-server-start.sh config/server.properties


At this point, your Kafka cluster is set up locally and ready to use.  For demo, let's create a "sample" Kafka topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 \ --partitions 1 --topic sample




Working with Confluent-Kafka


Install confluent-kafka:

pip install confluent-kafka


We'll create a Python script to publish messages to the "sample" Kakfa topic every 2 seconds:


from confluent_kafka import Producer

import time


producer = Producer({

    'bootstrap.servers': '127.0.0.1:9092',

})


topic = 'sample'

msg_count = 0


while True:

    msg_count += 1 

    print(msg_count)

    producer.produce(topic, key='dish', value='spaghetti#%d' % msg_count)

    producer.flush()

    time.sleep(2)


We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.


from confluent_kafka import Consumer, KafkaError

 

conf = {

    'bootstrap.servers': '127.0.0.1:9092',

    'group.id': 'my-group',

    'auto.offset.reset': 'earliest'

}


consumer = Consumer(conf)

consumer.subscribe(['sample'])


msg_count = 0

while True:

  msg = consumer.poll(1.0)

  if msg is None:

      continue


  if msg.error():

      if msg.error().code() == KafkaError._PARTITION_EOF:

        continue

      else:

        print(msg.error())

        break

  else:

      msg_count += 1

      print(msg_count, msg.key(), msg.value())


If everything goes well, you should see:


[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

1 b'dish' b'spaghetti#1'

2 b'dish' b'spaghetti#2'

3 b'dish' b'spaghetti#3'

...


That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages.  For more info, check out the examples directory: 

https://github.com/confluentinc/confluent-kafka-python


Saturday, August 1, 2020

Classification with Tree-augmented Naive Bayes (TAN) and Pgmpy


While working on a classification task recently, I started out with the well-known Naive Bayes classifier but soon realized that I wasn't getting the classification performance I needed.  At the same time, I was keenly aware that many of the feature variables carry a causal relationship between them that the classifier wasn't exploiting.

I needed another classifier with the simplicity and run-time performance of Naive Bayes so I started looking into Tree-augmented Naive Bayes (TAN).  My search for a Python library supporting TAN eventually led me to pgmpy.

Pgmpy is a Python Library for learning (Structure and Parameter) and inference (Statistical and Causal) in Bayesian Networks.

Pgmpy is versatile with capabilities beyond what my task needed.  It also has some structure learning algorithms but not TAN so I thought to contribute one to the library.  In this article, I'll give a tutorial on how to use TAN in pgmpy.

For demonstration purpose, I'll generate sample data from a handcrafted Bayesian Network (BN) model.  With the generated data, I'll train a Naive Bayes classifier and a TAN classifier, and compare their prediction performance.  Our BN graph is illustrated below.  Node C is our class variable while node R, S, T, U and V are the feature variables.  All variables are discrete.




Generate Sample Data


First, import all the packages we will be needing:


import pandas as pd
import numpy as np
 
from pgmpy.models.BayesianModel import BayesianModel
from pgmpy.factors.discrete import TabularCPD
from pgmpy.sampling import BayesianModelSampling
from pgmpy.estimators import TreeSearch, BayesianEstimator
 
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.naive_bayes import MultinomialNB


To encode our BN graph, we instantiate BayesianModel with the directed edges of the graph:


# construct naive bayes graph and add interaction between the features
model = BayesianModel([('C', 'R'), ('C', 'S'), ('C', 'T'), ('C', 'U'), ('C', 'V'),
                       ('R', 'S'), ('R', 'T'), ('R', 'U'), ('R', 'V')])


Next, we parameterize the graph by defining the conditional probabilities for each node.  For more information on parameterization, check out the pgmpy tutorials.


# add conditional probability distribution to edges
cpd_c = TabularCPD('C', 2, [[0.5], [0.5]])
cpd_r = TabularCPD('R', 3, [[0.6,0.2],[0.3,0.5],[0.1,0.3]], evidence=['C'],
                            evidence_card=[2])
cpd_s = TabularCPD('S', 3, [[0.1,0.1,0.2,0.2,0.7,0.1],
                            [0.1,0.3,0.1,0.2,0.1,0.2],
                            [0.8,0.6,0.7,0.6,0.2,0.7]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_t = TabularCPD('T', 2, [[0.7,0.2,0.2,0.5,0.1,0.3],
                            [0.3,0.8,0.8,0.5,0.9,0.7]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_u = TabularCPD('U', 3, [[0.3,0.8,0.2,0.8,0.4,0.7],
                            [0.4,0.1,0.4,0.1,0.1,0.1],
                            [0.3,0.1,0.4,0.1,0.5,0.2]],
                            evidence=['C','R'], evidence_card=[2,3])
cpd_v = TabularCPD('V', 2, [[0.5,0.6,0.6,0.5,0.5,0.4],
                            [0.5,0.4,0.4,0.5,0.5,0.6]],
                            evidence=['C','R'], evidence_card=[2,3])
model.add_cpds(cpd_c, cpd_r, cpd_s, cpd_t, cpd_u, cpd_v)


With our BN model defined, we can now generate sample data:


# generate sample data from our BN model
inference = BayesianModelSampling(model)
df_data = inference.forward_sample(size=30000, return_type='dataframe')


We'll be using the data to train Naive Bayes and TAN classifier to see which one performs better.  Before that, we split the data into training and test set.


# split data into training and test set
cols_features = df_data.columns.tolist()
cols_features.remove('C')

X = df_data[cols_features].values
y = df_data[['C']].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=21, stratify=y)
df_train = pd.DataFrame(np.concatenate((y_train, X_train), axis=1), columns=df_data.columns)


Predicting with Naive Bayes


Now, we are ready to train our Naive Bayes model using the training data and then measure classification performance with the test set.  I'll use the classifier from sklearn but pgmpy also has a Naive Bayes classifier as well shall you choose.


# train naive bayes classifier and predict
model_nb = MultinomialNB().fit(X_train, y_train)
y_pred = model_nb.predict(X_test)
print(classification_report(y_test, y_pred))


In short, Naive Bayes precision and recall are both 72%.


Predicting with TAN


Next up is TAN.  First, let's learn the graph structure from the training data.  To capture the interaction between the feature variables, TAN casts a tree structure over them.  So, you need to pass in the "root_node" parameter.  To be fair, we pretend that we don't know what the real graph looks like, so we pick a random node as the root node:


# learn the TAN graph structure from data
est = TreeSearch(df_train, root_node='U')
dag = est.estimate(estimator_type='tan', class_node='C')


Before we can make predictions, we also need to parameterize the model using the training data.


# construct Bayesian network by parameterizing the graph structure
model = BayesianModel(dag.edges())
model.fit(df_train, estimator=BayesianEstimator, prior_type='K2')


Now we're ready to classify the test data using the TAN model and measure the performance:


# draw inference from BN
X_test_df = pd.DataFrame(X_test, columns=cols_features)
y_pred = model.predict(X_test_df).values
print(classification_report(y_test, y_pred))


Our TAN model precision and recall are both 81% compare to 72% for Naive Bayes.  That's quite a significant improvement!


Predicting with Incomplete Data


As a bonus, now let's try to make predictions on incomplete data.  We will ask pgmpy to predict the class when only feature S, T and U are given.


from pgmpy.inference import VariableElimination

# predict the class given some of the features
infer = VariableElimination(model) 
query = infer.query(variables=['C'], evidence={'S': 0, 'T': 1, 'U': 2})
print(query)


In this case, pgmpy returns the chance of C=1 is 93%.

Instead of predicting the class, let's predict what feature R is likely to be given class C=1 and feature S=1:


# predict the feature given the class and another feature
query = infer.query(variables=['R'], evidence={'C': 1, 'S': 1})
print(query)


In this case, pgmpy returns the chance of R=0 is 27%, R=1 is 32% and R=2 is 41%.

Pretty cool, isn't it?  Like I said in the beginning of the article, pgmpy is versatile and I'm only scratching the surface of what it can do.

Saturday, July 4, 2020

Identifying Forename and Surname for Different Ethnicities using Machine Learning

In the previous post, I’ve used logistic regression and tf-idf from Python’s sklearn to predict ethnicity based on a person forename, middle and surname.  In this post, I’ll continue to use the same Wikipedia dataset but this time, the task is to predict if a name is forename or surname.  The motivation is to use such classifier for input form validation and ETL.


To start with, let’s summarize the number of records available:


Ethnicity      Forenames          Surnames           Total 
Nordic         1,691           3,230           4,921
EastAsian         2,331           2,784           5,115
Germanic         1,975           3,329           5,304
Africans         2,969           3,170           6,139
Muslim         3,539           4,557           8,096
Japanese         3,864           4,236           8,100
IndianSubContinent         5,133           3,839           8,972
EastEuropean         2,888           6,627           9,515
Hispanic         3,662           6,254           9,916
Jewish         3,676           6,304           9,980
Italian         4,315           8,781         13,096
French         3,802           9,630         13,432
British         6,933         16,062         22,995



We observe that:

  1.  Some datasets like Nordic have much fewer records than others
  2.  The dataset for French has 2.5 times more surnames than forenames


Our intuition tells us that a general classifier to predict forename and surname for all ethnicities is probably a bad idea as it ignores the subtle differences in name spelling among ethnicities.  The confusion matrix from the previous post on ethnicity prediction speaks to this point.  Therefore, I’ll build a separate forename-surname classifier for each ethnicity.


I will use the same logistic regression approach from the previous post which feature engineers bigrams, trigrams and quadgrams from names.  To address data imbalance in observation #2, I’ll upsample the data of the minority class using the handy Imblearn package.  To address the curse of dimensionality in observation #1, I’ll employ the following approaches to avoid overfitting the training data:

  1. Penalize large coefficients in logistic regression
  2. Reduce the number of dimensions using SVD


Without further ado, here is the classification performance on a 20% holdout data using upsampling and L2 penalization:


EthnicityPrecisionRecallF1
Nordic79.0%78.2%78.4%
EastAsian63.4%62.8%62.8%
Germanic76.9%75.9%76.1%
Africans59.7%59.7%59.7%
Muslim67.2%66.4%66.6%
Japanese60.4%60.3%60.3%
IndianSubContinent60.6%60.1%60.3%
EastEuropean79.6%77.2%77.9%
Hispanic71.1%70.6%70.8%
Jewish74.8%74.0%74.3%
Italian76.4%74.0%74.6%
French76.4%75.0%75.6%
British74.8%73.7%74.2%


The performance is about the same with SVD dimensionality reduction to capture 80 to 90% of variance in the data.  Generally, that reduces the number of features by 80%.


EthnicityPrecisionRecallF1
Nordic78.4%77.6%77.8%
EastAsian64.1%63.4%63.5%
Germanic77.0%76.0%76.2%
Africans60.0%59.9%59.9%
Muslim67.2%66.4%66.5%
Japanese61.3%61.2%61.2%
IndianSubContinent60.6%60.2%60.3%
EastEuropean80.2%77.9%78.5%
Hispanic71.8%70.9%71.2%
Jewish74.8%73.9%74.2%
Italian76.4%73.7%74.4%
French76.8%75.1%75.7%


The results are sorted by the size of dataset from smallest to largest.  The result for British is missing due to my old laptop running out of memory during SVD matrix factorization.


Interestingly, focusing on the F1 score, we see that poor performance is not necessarily correlated with small data.  For example, Nordic being the smallest dataset yields rather high F1 score.  The ethnicities with F1 score below 70% are Africans, IndianSubContinent, Japanese, EastAsian and Muslim.


Some of the Muslim names misclassified are:

  • navi, rabia, bapsi, khar, ravi, fadela, szold


Some of the EastAsian names misclassified are:

  • guanqiu, huang, heshen, dai, eitel, gagnon, ming, kimble, liang, samata


Besides insufficient data being the possible cause of poor classification performance, it's conceivable that some of these misclassified names can be tricky even to humans to decide if it's a forename or surname. For example, "Davis" can be either a forename or surname.


In the meantime, a classifier with 60% accuracy can still be useful.  For example, for input form validation where forename and surname are entered and the goal is to validate if they are entered into the right field, we can use the classifier to predict whether a swap has taken place accidentally by the user.


A swap occurs if the classifier predicts that the input forename is a surname and the input surname is a forename.  For a classifier with 60% accuracy, the chance of erroneously predicting a swap and alerting the user is only 100 x (0.4 x 0.4) = 16%.