使用python連線kafka

2021-09-24 20:55:37 字數 3278 閱讀 8638

kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookeeper的內部細節。

kafka採用的是一種發布-訂閱的訊息發布模型:訊息模型中,首先存在一些producer,即訊息的生產者,然後將訊息寫入broker,即kafka集群中的乙個或多個節點,進行訊息的快取,最後由consumer(消費者)訂閱訊息,進行後處理。在broker中,訊息的管理通過topic和partition來實現。topic為乙個邏輯概念,表示一類訊息,topic可以包含多個partition,實現訊息的分布式儲存及吞吐。producer、topic、partition、consumer的關係如下圖所示(參考部落格

kafka的producer-topic-partition模型有如下幾個需要注意的點:

同乙個producer可以向多個topic寫入訊息

多個producer可以向同乙個topic寫入訊息

producer寫入訊息時可以指定partition,因此1、2兩點中對同乙個topic下的partition也成立

kafka的topic-partition-consumer模型有如下幾個需要注意的點:

consumer具有組的概念,乙個consumer group可以包含多個consumer例項;

對於同乙個topic下的一條訊息,只能被乙個consumer group中的乙個消費者消費;

同乙個topic下的一條訊息,可以被不同consumer group中的消費者消費;

在同乙個topic下,同乙個consumer group中的乙個消費者例項可以訂閱多個partition;

在同乙個topic下,乙個partition在某一時刻只能被乙個consumer group中的乙個消費者例項訂閱。

因此再進行訊息訂閱時,最優方案是確保乙個topic下的partition數量大於或等於訂閱該topic的consumer group中的消費者例項數量,不然就會出現消費者競爭消費的情況。

上面簡述了kafka的一些基礎知識,下面進入實際**環節,主要包含四個部分:

依賴:

- kafka 2.11

- python 3.7

- python package: kafka-python 1.4.6

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
然後啟動kafka:

nohup ./bin/kafka-server-start.sh config/server.properties &
執行kafka中topic管理的主要是kafkaadminclient類,簡單建立topic的命令如下:

# -*- coding:utf-8 -*-

import kafka

from kafka import kafkaadminclient

admin = kafkaadminclient(bootstrap_servers="127.0.0.1:9092")

# admin.delete_topics(["test"]) # 刪除特定topic

topic = kafka.admin.newtopic("test", 3, 1) # 例項化topic物件,3表示分割槽數量,1為副本數量

admin.create_topics([topic]) # 建立topic

**執行後,可進入到kafka根路徑,檢視其中topic的詳細資訊:

./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
上述**建立了名為test的topic,下面我們利用生產者推送訊息:

# -*- coding:utf-8 -*-

from kafka import kafkaproducer

import time

producer = kafkaproducer(bootstrap_servers="127.0.0.1:9092") # 建立生產者例項

while true:

current = time.time()

message = "welcome to %s" % current

print(message)

producer.send("test", value=message.encode(), partition=2) # 傳送訊息,指定topic和partition

time.sleep(1)

上述**建立了乙個生產者,並向名為test的topic下的partition 2推送訊息,下面我們利用消費者消費訊息:

# -*- coding:utf-8 -*-

import kafka

from kafka import kafkaconsumer

consumer = kafkaconsumer(bootstrap_servers="localhost:9092", group_id="my") # 建立消費者例項,並指定group id

consumer.assign([kafka.topicpartition("test", 2)]) # 訂閱「test」 topic下的partition 2訊息,

# 阻塞程序,從消費者不斷拉取訊息

for msg in consumer:

recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) # 列印訊息的topic、partition、offset及其他元資訊

print(recv)

消費結果如下:

本文簡述了kafka的一些基本概念,並展示了如何用python簡單的操作kafka實現訊息佇列模型。

使用python操作kafka

使用python操作kafka目前比較常用的庫是kafka python庫 pip3 install kafka pythonproducer test.py from kafka import kafkaproducer producer kafkaproducer bootstrap serve...

python使用kafka收發訊息

kafka是最近幾年很流行的訊息佇列中介軟體。在大資料以及後端服務領域有很廣泛的應用。廢話不多說,接下來直接上 介紹python如何向kafka傳送資料以及訂閱資料。kafka的訊息是 發布 訂閱 模式的。接下來先介紹向kakfa發布訊息。先安裝python的kafka連線模組。pip instal...

python 連線mongodb 使用

1 連線 import pymongo mongo client pymongo.mongoclient host localhost port 27017 db mongo client.myip table db.myabac 2 新增資料 table.insert 插入指定 id 欄位的文件,...