Vài ghi chú trong quá trình sử dụng kafka với python.
Gửi nhận message trong kafka
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-
import uuid
from json import dumps
from kafka import KafkaProducer
BOOTSTRAP_SERVERS = ['' ]
USERNAME = ''
PASSWORD = ''
TOPIC = ''
if __name__ == "__main__" :
producer = KafkaProducer(
bootstrap_servers= BOOTSTRAP_SERVERS,
security_protocol= 'SASL_PLAINTEXT' ,
sasl_mechanism= 'SCRAM-SHA-256' ,
sasl_plain_username= USERNAME,
sasl_plain_password= PASSWORD,
value_serializer= lambda x: dumps(x). encode('utf-8' )
)
for _ in range(10 ):
producer. send(TOPIC, {"id" : str(uuid. uuid4())})
producer. flush()
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition
import json
BOOTSTRAP_SERVERS = ['' ]
USERNAME = ''
PASSWORD = ''
TOPIC = ''
if __name__ == '__main__' :
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers= BOOTSTRAP_SERVERS,
auto_offset_reset= 'earliest' ,
security_protocol= 'SASL_PLAINTEXT' ,
sasl_mechanism= 'SCRAM-SHA-256' ,
sasl_plain_username= USERNAME,
sasl_plain_password= PASSWORD
)
for message in consumer:
print(message. value)
Tham số auto_offset_reset='earliest'
là để consumer nhận tất cả các message từ đầu đang có trong topic. Nếu không set tham số này thì consumer chỉ nhận các message mới.
Khi chạy consumer sẽ in ra nội dung các message như sau:
OUTPUT
1
2
3
4
5
6
7
8
9
10
11
$ python consumer.py
{ 'id' : '939cd4fe-79e9-4050-a9bc-3f94b31f62e3' }
{ 'id' : 'a90c71b3-4516-4df9-871d-047264f1d6b6' }
{ 'id' : '06d45529-6888-4d2d-a4df-fec15b4b1d87' }
{ 'id' : '25642eee-b51c-432a-89d1-d7a17c7ef30a' }
{ 'id' : '03229045-94c9-4825-9cb6-ce495a68e7a9' }
{ 'id' : '3b6875f4-0a64-443f-9206-3bf1a3d31dc8' }
{ 'id' : '9f45a12b-97d2-4585-b296-a80ec5c0223c' }
{ 'id' : '485b4946-18a9-47d1-a849-87a6fc60365a' }
{ 'id' : 'b1c9d75a-d56e-4dd8-9e08-89b3a818fbb1' }
{ 'id' : '14486560-b5cb-41fa-bd0e-845a424c8ed4' }
Nếu chạy lại lệnh code consumer.py thì sẽ tiếp tục nhận được các message tương tự. Để consumer không phải nhận lại các message đã nhận trước đó thì thêm vào tham số group_id như sau:
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition
import json
BOOTSTRAP_SERVERS = ['' ]
USERNAME = ''
PASSWORD = ''
TOPIC = ''
GROUP_ID = ''
if __name__ == '__main__' :
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers= BOOTSTRAP_SERVERS,
auto_offset_reset= 'earliest' ,
group_id= GROUP_ID,
security_protocol= 'SASL_PLAINTEXT' ,
sasl_mechanism= 'SCRAM-SHA-256' ,
sasl_plain_username= USERNAME,
sasl_plain_password= PASSWORD
)
for message in consumer:
print(message. value)
Khi chạy đoạn code trên lần đầu tiên thì vẫn nhận được các messages có trong topic. Nhưng khi chạy lại đoạn code trên một lần nữa thì sẽ không nhận được thêm bất kỳ messages nào nữa.
Điều này là do khi chúng ta cung cấp tham số group_id thì broker sẽ theo dõi offset của consumer group nhờ đó mà các messages sẽ không bị xử lý lặp lại.