kafka是最近幾年很流行的訊息佇列中介軟體。在大資料以及後端服務領域有很廣泛的應用。廢話不多說,接下來直接上**介紹python如何向kafka傳送資料以及訂閱資料。
kafka的訊息是 " 發布--訂閱" 模式的。 接下來先介紹向kakfa發布訊息。先安裝python的kafka連線模組。pip install kafka-python
.
import time
from kafka import kafkaproducer
class kafkamsgproducer:
def __init__(self, server):
self._server = server
self.producer = none
def connect(self):
if self.producer is none:
producer = kafkaproducer(bootstrap_servers=self._server)
self.producer = producer
def close(self):
if self.producer is not none:
self.producer.close()
self.producer = none
def send(self, topic, msg):
if self.producer is not none:
if not isinstance(msg, bytes):
msg = msg.encode("utf-8") # 將str型別轉換為bytes型別
self.producer.send(topic=topic, value=msg)
def run():
producer = kafkamsgproducer("localhost:9092")
producer.connect() # 建立連線
topic = "yanchampion-test"
print("start sending msg to kafka!")
for msg in "hello! this is yanchampion speaking!".split():
producer.send(topic=topic, msg=msg) # 向kafka 指定topic傳送資料
time.sleep(1)
if __name__ == '__main__':
run() # 執行發布訊息程式
以上**即可以向kafka指定topic發布訊息了。注意,為了測試,先不執行producer.py
import time
from kafka import kafkaproducer
class kafkamsgproducer:
def __init__(self, server):
self._server = server
self.producer = none
def connect(self):
if self.producer is none:
producer = kafkaproducer(bootstrap_servers=self._server)
self.producer = producer
def close(self):
if self.producer is not none:
self.producer.close()
self.producer = none
def send(self, topic, msg):
if self.producer is not none:
if not isinstance(msg, bytes):
msg = msg.encode("utf-8") # 將str型別轉換為bytes型別
self.producer.send(topic=topic, value=msg)
def run():
producer = kafkamsgproducer("localhost:9092")
producer.connect() # 建立連線
topic = "yanchampion-test"
print("start sending msg to kafka!")
for msg in " 111 222 333 444".split():
producer.send(topic=topic, msg=msg) # 向kafka 指定topic傳送資料
time.sleep(1)
if __name__ == '__main__':
run() # 執行程式
以上**即可完成訊息的訂閱。
因為kafka是 發布-定於模式。所以,乙個topic可以有多個consumer訂閱,並且,每個consumer都可以收到同一條訊息。那麼讓我們先來執行兩個consumer.py檔案。
開啟不同的終端
python3 consumer.py
接下來再執行producer.py
python3 producer.py
通過觀察,最終可以看到,兩個執行了consumer.py 的終端 都可以收到訊息
[root@yanchampion kafka-demo]# python3 consumer.py
收到訊息: b'111'
收到訊息: b'222'
收到訊息: b'333'
收到訊息: b'444'
使用python連線kafka
kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookee...
使用python操作kafka
使用python操作kafka目前比較常用的庫是kafka python庫 pip3 install kafka pythonproducer test.py from kafka import kafkaproducer producer kafkaproducer bootstrap serve...
Python使用多程序實現串列埠收發資料
在之前一篇文章中 python使用多執行緒實現串列埠收發資料,提到了使用多執行緒實現串列埠收發資料,曉得多執行緒的朋友可能會有點疑問 多執行緒是單cpu,雖然在io中速度比較快,但是對於乙個大的專案,多執行緒本身是加速不了太多的 針對這個問題,我用multiprocessing改了一下 window...