kafka的簡單demo實現

2021-09-21 13:31:25 字數 3029 閱讀 1430

在一文中,已經在本地安裝好了kafka,接下來就是介紹kafka開發環境的搭建與最簡單的demo 開發。

一、kafka.properties

首先是生成乙個kafka.properties,裡面配置的是生產者、消費者、topic、group的基本引數。具體含義如下:

【生產者】

(1)kafka.producer.servers

用於建立與kafka集群的連線,這個list僅僅影響用於初始化的hosts,來發現全部的servers。格式:host1:port1,host2:port2,…,數量盡量不止乙個,以防其中乙個down了。

(2)kafka.producer.retries

發生錯誤時的重傳次數。當開啟重傳時,需要將max.in.flight.requests.per.connection設定為1,否則可能導致失序。

(3)kafka.producer.batch.size

producer可以將發往同乙個partition的資料做成乙個produce request傳送請求,即batch批處理,以減少請求次數,該值即為每次批處理的大小。另外每個request請求包含多個batch,每個batch對應乙個partition,且乙個request傳送的目的broker均為這些partition的leader副本。若將該值設為0,則不會進行批處理。

(4)kafka.producer.linger

producer缺省會把兩次傳送時間間隔內收集到的所有requests進行一次聚合然後再傳送,以此提高吞吐量,而linger.ms則更進一步,這個引數為每次傳送增加一些delay,以此來聚合更多的message。

官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到乙個批請求。通常這個值發生在欠負載情況下,record到達速度快於傳送。但是在某些場景下,client即使在正常負載下也期望減少請求數量。這個設定就是如此,通過人工新增少量時延,而不是立馬傳送乙個record,producer會等待所給的時延,以讓其他records傳送出去,這樣就會被聚合在一起。這個類似於tcp的nagle演算法。該設定給了batch的時延上限:當我們獲得乙個partition的batch.size大小的records,就會立即傳送出去,而不管該設定;但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設定的預設值為0(無時延)。例如,設定linger.ms=5,會減少request傳送的數量,但是在無負載下會增加5ms的傳送時延。

(5)kafka.producer.buffer.memory

producer可以用來快取資料的記憶體大小。該值實際為recordaccumulator類中的bufferpool,即producer所管理的最大記憶體。

【消費者】

(1)kafka.consumer.zookeeper.connect

用於建立與zookeeper的連線。

(2)kafka.consumer.servers

消費者初始連線kafka集群時的位址列表。不管這邊配置的什麼位址,消費者會使用所有的kafka集群伺服器。消費者會通過這些位址列表,找到所有的kafka集群機器。

(3)kafka.consumer.enable.auto.commit

如果設為true,消費者的偏移量會定期在後台提交。

(4)kafka.consumer.session.timeout

使用kafka集群管理工具時檢測失敗的超時時間。如果在session超時時間範圍內,沒有收到消費者的心跳,broker會把這個消費者置為失效,並觸發消費者負載均衡。因為只有在呼叫poll方法時才會傳送心跳,更大的session超時時間允許消費者在poll迴圈週期內處理訊息內容,儘管這會有花費更長時間檢測失效的代價。如果想控制消費者處理訊息的時間,還可以參考max.poll.records。注意這個值的大小應該在group.min.session.timeout.msgroup.max.session.timeout.ms範圍內。

(5)kafka.consumer.auto.commit.interval

自動提交offset到zookeeper的時間間隔

(6)kafka.consumer.auto.offset.reset

當kafka的初始偏移量沒了,或者當前的偏移量不存在的情況下,應該怎麼辦?下面有幾種策略:earliest(將偏移量自動重置為最初的值)、latest(自動將偏移量置為最新的值)、none(如果在消費者組中沒有發現前乙個偏移量,就向消費者丟擲乙個異常)、anything else(向消費者丟擲異常)

【topic配置】

kafka.consumer.topic,topic是kafka資料寫入操作的基本單元,它是kafka訊息的佇列。producer需要關心訊息發往哪個topic,而consumer需要關心自己訂閱哪個topic。

【group.id配置】

kafka.consumer.group.id,監聽訊息時,groupid用於指定消費者組的名稱,如果同組中存在多個***物件則只有乙個***物件能收到訊息。

二、kafkaproducer

三、kafkaconsumer

四、listener

參考文件:

Flume整合Kafka的簡單demo記錄

啟動zookeeper和kafka,單節點 bin zookeeper server start.sh config zookeeper.properties bin kafka server start.sh config server.properties 2.建立主題 建立乙個主題 flume...

vuex的理解以及簡單的demo實現

存 mutations 只能由它來操作state 中 actions 處理非同步任務給mutation 取 getters 類似計算屬性,依賴state資料 庫 state 即資料來源 注意 解構出來的方法只能有乙個引數,如果有多個要傳遞,可以使用陣列或者物件。import computed met...

基於TCP套接字實現的簡單Demo

由於 的注釋已經很詳盡了,所以這裡不再作過多說明.僅僅貼出 和結果圖.值得注意的是必須先啟動server程式再啟動client.server include 套接字型檔 include define port 6000 伺服器端口 define msgsize 1024 收發緩衝區的大小 pragm...