在一文中,已經在本地安裝好了kafka,接下來就是介紹kafka開發環境的搭建與最簡單的demo 開發。
一、kafka.properties
首先是生成乙個kafka.properties,裡面配置的是生產者、消費者、topic、group的基本引數。具體含義如下:
【生產者】
(1)kafka.producer.servers
用於建立與kafka集群的連線,這個list僅僅影響用於初始化的hosts,來發現全部的servers。格式:host1:port1,host2:port2,…,數量盡量不止乙個,以防其中乙個down了。
(2)kafka.producer.retries
發生錯誤時的重傳次數。當開啟重傳時,需要將max.in.flight.requests.per.connection
設定為1,否則可能導致失序。
(3)kafka.producer.batch.size
producer可以將發往同乙個partition的資料做成乙個produce request傳送請求,即batch批處理,以減少請求次數,該值即為每次批處理的大小。另外每個request請求包含多個batch,每個batch對應乙個partition,且乙個request傳送的目的broker均為這些partition的leader副本。若將該值設為0,則不會進行批處理。
(4)kafka.producer.linger
producer缺省會把兩次傳送時間間隔內收集到的所有requests進行一次聚合然後再傳送,以此提高吞吐量,而linger.ms則更進一步,這個引數為每次傳送增加一些delay,以此來聚合更多的message。
官網解釋翻譯:producer會將request傳輸之間到達的所有records聚合到乙個批請求。通常這個值發生在欠負載情況下,record到達速度快於傳送。但是在某些場景下,client即使在正常負載下也期望減少請求數量。這個設定就是如此,通過人工新增少量時延,而不是立馬傳送乙個record,producer會等待所給的時延,以讓其他records傳送出去,這樣就會被聚合在一起。這個類似於tcp的nagle演算法。該設定給了batch的時延上限:當我們獲得乙個partition的batch.size大小的records,就會立即傳送出去,而不管該設定;但是如果對於這個partition沒有累積到足夠的record,會linger指定的時間等待更多的records出現。該設定的預設值為0(無時延)。例如,設定linger.ms=5,會減少request傳送的數量,但是在無負載下會增加5ms的傳送時延。
(5)kafka.producer.buffer.memory
producer可以用來快取資料的記憶體大小。該值實際為recordaccumulator類中的bufferpool,即producer所管理的最大記憶體。
【消費者】
(1)kafka.consumer.zookeeper.connect
用於建立與zookeeper的連線。
(2)kafka.consumer.servers
消費者初始連線kafka集群時的位址列表。不管這邊配置的什麼位址,消費者會使用所有的kafka集群伺服器。消費者會通過這些位址列表,找到所有的kafka集群機器。
(3)kafka.consumer.enable.auto.commit
如果設為true,消費者的偏移量會定期在後台提交。
(4)kafka.consumer.session.timeout
使用kafka集群管理工具時檢測失敗的超時時間。如果在session超時時間範圍內,沒有收到消費者的心跳,broker會把這個消費者置為失效,並觸發消費者負載均衡。因為只有在呼叫poll方法時才會傳送心跳,更大的session超時時間允許消費者在poll迴圈週期內處理訊息內容,儘管這會有花費更長時間檢測失效的代價。如果想控制消費者處理訊息的時間,還可以參考max.poll.records
。注意這個值的大小應該在group.min.session.timeout.ms
和group.max.session.timeout.ms
範圍內。
(5)kafka.consumer.auto.commit.interval
自動提交offset到zookeeper的時間間隔
(6)kafka.consumer.auto.offset.reset
當kafka的初始偏移量沒了,或者當前的偏移量不存在的情況下,應該怎麼辦?下面有幾種策略:earliest(將偏移量自動重置為最初的值)、latest(自動將偏移量置為最新的值)、none(如果在消費者組中沒有發現前乙個偏移量,就向消費者丟擲乙個異常)、anything else(向消費者丟擲異常)
【topic配置】
kafka.consumer.topic
,topic是kafka資料寫入操作的基本單元,它是kafka訊息的佇列。producer需要關心訊息發往哪個topic,而consumer需要關心自己訂閱哪個topic。
【group.id配置】
kafka.consumer.group.id
,監聽訊息時,groupid用於指定消費者組的名稱,如果同組中存在多個***物件則只有乙個***物件能收到訊息。
二、kafkaproducer
三、kafkaconsumer
四、listener
參考文件:
Flume整合Kafka的簡單demo記錄
啟動zookeeper和kafka,單節點 bin zookeeper server start.sh config zookeeper.properties bin kafka server start.sh config server.properties 2.建立主題 建立乙個主題 flume...
vuex的理解以及簡單的demo實現
存 mutations 只能由它來操作state 中 actions 處理非同步任務給mutation 取 getters 類似計算屬性,依賴state資料 庫 state 即資料來源 注意 解構出來的方法只能有乙個引數,如果有多個要傳遞,可以使用陣列或者物件。import computed met...
基於TCP套接字實現的簡單Demo
由於 的注釋已經很詳盡了,所以這裡不再作過多說明.僅僅貼出 和結果圖.值得注意的是必須先啟動server程式再啟動client.server include 套接字型檔 include define port 6000 伺服器端口 define msgsize 1024 收發緩衝區的大小 pragm...