安裝kafka-python
pip install kafka-python
kafka-python官網
生產 productor
import json
from kafka import kafkaproducer
from time import sleep
defstart_producer()
: producer = kafkaproducer(
bootstrap_servers=
'localhost:9092'
,# kafka服務位址
value_serializer=
lambda m: json.dumps(m)
.encode(
'utf-8'),
)for i in
range(0
,100):
msg =
future=producer.send(
'topic_test'
, msg, partition=0)
sleep(1)
# future.get(timeout=10)
if __name__ ==
'__main__'
: start_producer(
)
消費端kafka消費
1.相同group_id的消費者,只有乙個消費者能夠消費到訊息
2.不同group_id的消費者,接受到的訊息都是一樣的
以下**消費一次(接收到訊息),就close,其他消費者就能開始消費了
from kafka import kafkaconsumer, topicpartition
import time
import random
defstart_consumer()
: consumer = kafkaconsumer(
'topic2'
, bootstrap_servers =
'localhost:9092'
, group_id=
'group1'
) msg =
next
(consumer)
print
(msg.value.decode())
consumer.close(
)def
main()
:while
true
: start_consumer(
) r_num = random.randint(1,
4)print
(f'mpc1 doing[s]...'
) time.sleep(r_num)
if __name__ ==
'__main__'
: main(
)
效果如下:...
mpc1 doing[2s]...
mpc1 doing[2s]...
mpc1 doing[1s]...
mpc1 doing[4s]...
mpc1 doing[3s]...
mpc1 doing[1s]...
mpc1 doing[4s]...
mpc1 doing[4s]...
...
...
mpc2 doing[2s]...
mpc2 doing[4s]...
mpc2 doing[3s]...
mpc2 doing[3s]...
mpc2 doing[3s]...
mpc2 doing[3s]...
mpc2 doing[2s]...
mpc2 doing[4s]...
mpc2 doing[1s]...
...
Kafka 客戶端 Produce 常用配置
配置類值作用 demo org.apache.kafka.clients.commonclientconfigs bootstrap servers config bootstrap.servers 配置kafka 伺服器位址資訊 localhost 9092,localhost 9092 org....
socket模型處理多個客戶端
最近學完了簡單的socket程式設計,發現其實socket的網路程式設計其實並沒有什麼難度,只是簡單的函式呼叫,記住客戶端與服務端的步驟,寫起來基本沒有什麼問題。在伺服器程式的設計中,乙個伺服器不可能只相應乙個客戶端的鏈結,為了響應多個客戶端的鏈結,需要使用多執行緒的方式,每當有乙個客戶端連線進來,...
kafka模擬客戶端傳送 接受訊息
producer 訊息的生成者,即發布訊息 consumer 訊息的消費者,即訂閱訊息 broker kafka以集群的方式執行,可以由乙個或多個服務組成,服務即broker zookeeper 協調 一 建立topic 建立完後可以使用list檢視下 二 重新開啟兩個終端 假設乙個終端傳送訊息 乙...