Tuesday, November 2, 2021

Crash Course on Confluent-Kafka

This is my quick stab at setting up Kafka on Ubuntu.  As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog.  I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python.  For basic background info on Kafka, I would still recommend checking out that blog.


For reference, my setup is:

  • Ubuntu 20.04
  • Python 3.8
  • Kafka 3.0.0
  • Confluent-kafka


Setting Up Kafka


Download Kafka by following these steps:

  1. Download the latest 3.0.0 release of Kafka
  2. Un-tar the archive: tar -xzf kafka_2.13-3.0.0.tgz
  3. cd to the Kafka directory

Assuming you're setting up Kafka on your local machine, edit the following line in config/server.properties

advertised.listeners=PLAINTEXT://127.0.0.1:9092


This line will enable the Python scripts provided in this blog to work properly.


Next, start (and stop) the servers using the shell scripts in the bin directory:

# start ZooKeeper Server

bin/zookeeper-server-start.sh config/zookeeper.properties


# start Kafka Server

bin/kafka-server-start.sh config/server.properties


At this point, your Kafka cluster is set up locally and ready to use.  For demo, let's create a "sample" Kafka topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 \ --partitions 1 --topic sample




Working with Confluent-Kafka


Install confluent-kafka:

pip install confluent-kafka


We'll create a Python script to publish messages to the "sample" Kakfa topic every 2 seconds:


from confluent_kafka import Producer

import time


producer = Producer({

    'bootstrap.servers': '127.0.0.1:9092',

})


topic = 'sample'

msg_count = 0


while True:

    msg_count += 1 

    print(msg_count)

    producer.produce(topic, key='dish', value='spaghetti#%d' % msg_count)

    producer.flush()

    time.sleep(2)


We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.


from confluent_kafka import Consumer, KafkaError

 

conf = {

    'bootstrap.servers': '127.0.0.1:9092',

    'group.id': 'my-group',

    'auto.offset.reset': 'earliest'

}


consumer = Consumer(conf)

consumer.subscribe(['sample'])


msg_count = 0

while True:

  msg = consumer.poll(1.0)

  if msg is None:

      continue


  if msg.error():

      if msg.error().code() == KafkaError._PARTITION_EOF:

        continue

      else:

        print(msg.error())

        break

  else:

      msg_count += 1

      print(msg_count, msg.key(), msg.value())


If everything goes well, you should see:


[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)

[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

1 b'dish' b'spaghetti#1'

2 b'dish' b'spaghetti#2'

3 b'dish' b'spaghetti#3'

...


That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages.  For more info, check out the examples directory: 

https://github.com/confluentinc/confluent-kafka-python


No comments:

Post a Comment