This is my quick stab at setting up Kafka on Ubuntu. As a newbie to Kafka, I started off by following the steps from this excellent but slightly outdated blog. I thought I'll freshen up the steps from that blog and also give a brief introduction to the confluent-kafka library for Python. For basic background info on Kafka, I would still recommend checking out that blog.
For reference, my setup is:
- Ubuntu 20.04
- Python 3.8
- Kafka 3.0.0
- Confluent-kafka
Setting Up Kafka
Download Kafka by following these steps:
- Download the latest 3.0.0 release of Kafka
- Un-tar the archive: tar -xzf kafka_2.13-3.0.0.tgz
- cd to the Kafka directory
Assuming you're setting up Kafka on your local machine, edit the following line in config/server.properties
advertised.listeners=PLAINTEXT://127.0.0.1:9092
This line will enable the Python scripts provided in this blog to work properly.
Next, start (and stop) the servers using the shell scripts in the bin directory:
# start ZooKeeper Serverbin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka Server
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 \
--partitions 1 --topic sample
Install confluent-kafka:
pip install confluent-kafka
We'll create a Python script to publish messages to the "sample" Kakfa topic every 2 seconds:
from confluent_kafka import Producer
import time
producer = Producer({
'bootstrap.servers': '127.0.0.1:9092',
})
topic = 'sample'
msg_count = 0
while True:
msg_count += 1
print(msg_count)
producer.produce(topic, key='dish', value='spaghetti#%d' % msg_count)
producer.flush()
time.sleep(2)
We'll also create a Python script to consume messages from the "sample" Kafka topic and displaying them to screen.
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['sample'])
msg_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
else:
msg_count += 1
print(msg_count, msg.key(), msg.value())
If everything goes well, you should see:
[2021-11-02 17:12:44,424] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group my-group in Empty state. Created a new member id rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,429] INFO [GroupCoordinator 0]: Preparing to rebalance group my-group in state PreparingRebalance with old generation 29 (__consumer_offsets-12) (reason: Adding new member rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,430] INFO [GroupCoordinator 0]: Stabilized group my-group generation 30 (__consumer_offsets-12) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2021-11-02 17:12:44,433] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-3df1f0f2-93c2-477b-9af1-bff1260befe5 for group my-group for generation 30. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
1 b'dish' b'spaghetti#1'
2 b'dish' b'spaghetti#2'
3 b'dish' b'spaghetti#3'
...
That's it, but of course, you could do a lot more with Kafka such as different ways of encoding and decoding the messages. For more info, check out the examples directory:
https://github.com/confluentinc/confluent-kafka-python
No comments:
Post a Comment