kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookeeper的內部細節。
kafka採用的是一種發布-訂閱的訊息發布模型:訊息模型中,首先存在一些producer,即訊息的生產者,然後將訊息寫入broker,即kafka集群中的乙個或多個節點,進行訊息的快取,最後由consumer(消費者)訂閱訊息,進行後處理。在broker中,訊息的管理通過topic和partition來實現。topic為乙個邏輯概念,表示一類訊息,topic可以包含多個partition,實現訊息的分布式儲存及吞吐。producer、topic、partition、consumer的關係如下圖所示(參考部落格
kafka的producer-topic-partition模型有如下幾個需要注意的點:
同乙個producer可以向多個topic寫入訊息
多個producer可以向同乙個topic寫入訊息
producer寫入訊息時可以指定partition,因此1、2兩點中對同乙個topic下的partition也成立
kafka的topic-partition-consumer模型有如下幾個需要注意的點:
consumer具有組的概念,乙個consumer group可以包含多個consumer例項;
對於同乙個topic下的一條訊息,只能被乙個consumer group中的乙個消費者消費;
同乙個topic下的一條訊息,可以被不同consumer group中的消費者消費;
在同乙個topic下,同乙個consumer group中的乙個消費者例項可以訂閱多個partition;
在同乙個topic下,乙個partition在某一時刻只能被乙個consumer group中的乙個消費者例項訂閱。
因此再進行訊息訂閱時,最優方案是確保乙個topic下的partition數量大於或等於訂閱該topic的consumer group中的消費者例項數量,不然就會出現消費者競爭消費的情況。
上面簡述了kafka的一些基礎知識,下面進入實際**環節,主要包含四個部分:
依賴:
- kafka 2.11
- python 3.7
- python package: kafka-python 1.4.6
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
然後啟動kafka:
nohup ./bin/kafka-server-start.sh config/server.properties &
執行kafka中topic管理的主要是kafkaadminclient類,簡單建立topic的命令如下:
# -*- coding:utf-8 -*-
import kafka
from kafka import kafkaadminclient
admin = kafkaadminclient(bootstrap_servers="127.0.0.1:9092")
# admin.delete_topics(["test"]) # 刪除特定topic
topic = kafka.admin.newtopic("test", 3, 1) # 例項化topic物件,3表示分割槽數量,1為副本數量
admin.create_topics([topic]) # 建立topic
**執行後,可進入到kafka根路徑,檢視其中topic的詳細資訊:
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
上述**建立了名為test的topic,下面我們利用生產者推送訊息:
# -*- coding:utf-8 -*-
from kafka import kafkaproducer
import time
producer = kafkaproducer(bootstrap_servers="127.0.0.1:9092") # 建立生產者例項
while true:
current = time.time()
message = "welcome to %s" % current
print(message)
producer.send("test", value=message.encode(), partition=2) # 傳送訊息,指定topic和partition
time.sleep(1)
上述**建立了乙個生產者,並向名為test的topic下的partition 2推送訊息,下面我們利用消費者消費訊息:
# -*- coding:utf-8 -*-
import kafka
from kafka import kafkaconsumer
consumer = kafkaconsumer(bootstrap_servers="localhost:9092", group_id="my") # 建立消費者例項,並指定group id
consumer.assign([kafka.topicpartition("test", 2)]) # 訂閱「test」 topic下的partition 2訊息,
# 阻塞程序,從消費者不斷拉取訊息
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # 列印訊息的topic、partition、offset及其他元資訊
print(recv)
消費結果如下:
本文簡述了kafka的一些基本概念,並展示了如何用python簡單的操作kafka實現訊息佇列模型。
使用python操作kafka
使用python操作kafka目前比較常用的庫是kafka python庫 pip3 install kafka pythonproducer test.py from kafka import kafkaproducer producer kafkaproducer bootstrap serve...
python使用kafka收發訊息
kafka是最近幾年很流行的訊息佇列中介軟體。在大資料以及後端服務領域有很廣泛的應用。廢話不多說,接下來直接上 介紹python如何向kafka傳送資料以及訂閱資料。kafka的訊息是 發布 訂閱 模式的。接下來先介紹向kakfa發布訊息。先安裝python的kafka連線模組。pip instal...
python 連線mongodb 使用
1 連線 import pymongo mongo client pymongo.mongoclient host localhost port 27017 db mongo client.myip table db.myabac 2 新增資料 table.insert 插入指定 id 欄位的文件,...