Thursday, November 4, 2021

Deploying DL Models with MLflow

To get my hands on MLflow, I started off by reading the excellent tutorial at MLflow.org to acquaint myself with the steps of packaging and serving a ML model to a REST API endpoint.  The linear regression model and steps described in the tutorial works right out of the box flawlessly.  Kudos to the documentation!

To gain a little more exposure, I then tried to deploy the deep learning model described in this databricks blog.  Unlike the linear regression model, the DL model uses the keras library and the API input is tensor rather than dataframe.

In this blog post, I will share the steps needed and the modification necessary to the original DL code in order to deploy the model.

For reference, my setup is:
  • Ubuntu 20.04
  • Python 3.8
  • Anaconda3
  • Keras 2.6

The original DL code which uses Keras 2.7 was throwing errors so I had to downgrade to version 2.6 and consequently modified all instances of “keras” to “tf.keras” in the code.  The complete modified code is:


import keras
from keras.layers import Dense, Flatten, Dropout
import numpy as np
import mlflow
import mlflow.keras
from mlflow.models.signature import infer_signature
import tensorflow as tf
from urllib.parse import urlparse
# Let's prepare the training data!
(train_X, train_Y), (test_X, test_Y) = tf.keras.datasets.mnist.load_data()
trainX, testX = train_X / 255.0, test_X / 255.0
trainY = tf.keras.utils.to_categorical(train_Y)
testY = tf.keras.utils.to_categorical(test_Y)
# Let's define the model!
model = tf.keras.models.Sequential(
    [
      Flatten(),
      Dense(128, activation="relu", name="layer1"),
      Dropout(0.2),
      Dense(10, activation='softmax')
    ]
)
opt = tf.keras.optimizers.SGD(lr=0.01, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
# Let's fit the model!
model.fit(trainX, trainY, epochs=2, batch_size=32, validation_data=(testX, testY))
# Create a model signature using the tensor input to store in the MLflow model registry
signature = infer_signature(testX, model.predict(testX))
# Let's check out how it looks
print(signature)
# inputs:
#    [Tensor('float64', (-1, 28, 28))]
# outputs:
#    [Tensor('float32', (-1, 10))]
# Create an input example to store in the MLflow model registry
input_example = np.expand_dims(trainX[0], axis=0)
# Let's log the model in the MLflow model registry
registered_model_name = "tensor-blog-post"
# Model registry does not work with file store
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
if tracking_url_type_store != "file":
    # Register the model
    # There are other ways to use the Model Registry, which depends on the use case,
    # please refer to the doc for more information:
    # https://mlflow.org/docs/latest/model-registry.html#api-workflow
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example, registered_model_name=registered_model_name)
else:
    mlflow.keras.log_model(model, "model", signature=signature, input_example=input_example)

The steps to deploy the model is described in the MLflow tutorial so I won’t repeat it here.  Basically, what is needed to serve the model is the run ID of the model.  The run ID is generated each time you run the code.  You can find the run ID by launching the MLflow UI:

mlflow ui --host 0.0.0.0
goto http://<IP addr>:5000

Alternatively, you can find it in the mlruns subdirectory where your script was executed. With the run ID, you can deploy the model to an API endpoint by:

mlflow models serve -m mlruns/0/<RUN ID>/artifacts/model -p 1234

To test the API, the input must be a tensor of shape (-1, 28, 28) and consist of floats such as:

curl -X POST -H "Content-Type:application/json” --data '{“inputs“:[[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0],[1, 0, 1, 0, 1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0,1, 0, 1, 0]]}' http://127.0.0.1:1234/invocations
If everything goes well, the API should return a list of 10 probabilities which are the predictions for each digit.  (The DL model predicts handwritten digits from the MNIST dataset.)

[[0.026583805680274963, 0.0004244133597239852, 0.29056885838508606, 0.4104505479335785, 3.0038367185625248e-05, 0.14565511047840118, 0.002596472157165408, 0.0003772564814426005, 0.12106209993362427, 0.0022514534648507833]]


Tuesday, November 2, 2021

Crash Course on Confluent-Kafka

This is my quick stab at setting up Kafka on Ubuntu.  As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog.  I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python.  For basic background info on Kafka, I would still recommend checking out that blog.


For reference, my setup is:

  • Ubuntu 20.04
  • Python 3.8
  • Kafka 3.0.0
  • Confluent-kafka


Setting Up Kafka


Download Kafka by following these steps:

  1. Download the latest 3.0.0 release of Kafka
  2. Un-tar the archive: tar -xzf kafka_2.13-3.0.0.tgz
  3. cd to the Kafka directory

Assuming you're setting up Kafka on your local machine, edit the following line in config/server.properties

advertised.listeners=PLAINTEXT://127.0.0.1:9092


This line will enable the Python scripts provided in this blog to work properly.


Next, start (and stop) the servers using the shell scripts in the bin directory:

# start ZooKeeper Server

bin/zookeeper-server-start.sh config/zookeeper.properties


# start Kafka Server

bin/kafka-server-start.sh config/server.properties


At this point, your Kafka cluster is set up locally and ready to use.  For demo, let's create a "sample" Kafka topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 \ --partitions 1 --topic sample




Working with Confluent-Kafka


Install confluent-kafka:

pip install confluent-kafka


We'll create a Python script to publish messages to the "sample" Kakfa topic every 2 seconds:


from confluent_kafka import Producer

import time


producer = Producer({

    'bootstrap.servers': '127.0.0.1:9092',

})


topic = 'sample'

msg_count = 0


while True:

    msg_count += 1 

    print(msg_count)

    producer.produce(topic, key='dish', value='spaghetti#%d' % msg_count)

    producer.flush()

    time.sleep(2)


We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.


from confluent_kafka import Consumer, KafkaError

 

conf = {

    'bootstrap.servers': '127.0.0.1:9092',

    'group.id': 'my-group',

    'auto.offset.reset': 'earliest'

}


consumer = Consumer(conf)

consumer.subscribe(['sample'])


msg_count = 0

while True:

  msg = consumer.poll(1.0)

  if msg is None:

      continue


  if msg.error():

      if msg.error().code() == KafkaError._PARTITION_EOF:

        continue

      else:

        print(msg.error())

        break

  else:

      msg_count += 1

      print(msg_count, msg.key(), msg.value())


If everything goes well, you should see:


[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

1 b'dish' b'spaghetti#1'

2 b'dish' b'spaghetti#2'

3 b'dish' b'spaghetti#3'

...


That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages.  For more info, check out the examples directory: 

https://github.com/confluentinc/confluent-kafka-python