kafka單分割槽,多個客戶端共同消費乙個topic

2021-10-14 06:50:33 字數 2061 閱讀 5693

安裝kafka-python

pip install kafka-python
kafka-python官網

生產 productor

import json

from kafka import kafkaproducer

from time import sleep

defstart_producer()

: producer = kafkaproducer(

bootstrap_servers=

'localhost:9092'

,# kafka服務位址

value_serializer=

lambda m: json.dumps(m)

.encode(

'utf-8'),

)for i in

range(0

,100):

msg =

future=producer.send(

'topic_test'

, msg, partition=0)

sleep(1)

# future.get(timeout=10)

if __name__ ==

'__main__'

: start_producer(

)

消費端
kafka消費

1.相同group_id的消費者,只有乙個消費者能夠消費到訊息

2.不同group_id的消費者,接受到的訊息都是一樣的

以下**消費一次(接收到訊息),就close,其他消費者就能開始消費了

from kafka import kafkaconsumer, topicpartition

import time

import random

defstart_consumer()

: consumer = kafkaconsumer(

'topic2'

, bootstrap_servers =

'localhost:9092'

, group_id=

'group1'

) msg =

next

(consumer)

print

(msg.value.decode())

consumer.close(

)def

main()

:while

true

: start_consumer(

) r_num = random.randint(1,

4)print

(f'mpc1 doing[s]...'

) time.sleep(r_num)

if __name__ ==

'__main__'

: main(

)

效果如下:
...

mpc1 doing[2s]...

mpc1 doing[2s]...

mpc1 doing[1s]...

mpc1 doing[4s]...

mpc1 doing[3s]...

mpc1 doing[1s]...

mpc1 doing[4s]...

mpc1 doing[4s]...

...

...

mpc2 doing[2s]...

mpc2 doing[4s]...

mpc2 doing[3s]...

mpc2 doing[3s]...

mpc2 doing[3s]...

mpc2 doing[3s]...

mpc2 doing[2s]...

mpc2 doing[4s]...

mpc2 doing[1s]...

...

Kafka 客戶端 Produce 常用配置

配置類值作用 demo org.apache.kafka.clients.commonclientconfigs bootstrap servers config bootstrap.servers 配置kafka 伺服器位址資訊 localhost 9092,localhost 9092 org....

socket模型處理多個客戶端

最近學完了簡單的socket程式設計,發現其實socket的網路程式設計其實並沒有什麼難度,只是簡單的函式呼叫,記住客戶端與服務端的步驟,寫起來基本沒有什麼問題。在伺服器程式的設計中,乙個伺服器不可能只相應乙個客戶端的鏈結,為了響應多個客戶端的鏈結,需要使用多執行緒的方式,每當有乙個客戶端連線進來,...

kafka模擬客戶端傳送 接受訊息

producer 訊息的生成者,即發布訊息 consumer 訊息的消費者,即訂閱訊息 broker kafka以集群的方式執行,可以由乙個或多個服務組成,服務即broker zookeeper 協調 一 建立topic 建立完後可以使用list檢視下 二 重新開啟兩個終端 假設乙個終端傳送訊息 乙...