kafkaconsumer 對於多執行緒訪問是不安全的,通過使用acquire()
跟release()
方法來操作atomiclong currentthread
字段(儲存當前訪問執行緒id), 有多個執行緒同時訪問丟擲concurrentmodificationexception
, 來防止對個執行緒同時訪問。
fetcher: 資料請求類
consumernetworkclient: 消費者的網路客戶端,負責網路傳輸的流程
subscriptionstate: 訂閱狀態類
metadata: 集群的元資料管理類,使用租約機制
kafka是以拉模式去消費資料,可由使用者自由控制消費速度,對使用者的消費位置可以選擇自動非同步commit,或者由使用者主動同步commit, 例項**如下:
kafkaconsumer consumer = ...
consumer.subscribe(arrays.aslist("topic"));
while (!closed.get())
時序圖: Kafka原始碼分析(一)
apache kafka 是 乙個分布式流處理平台.這到底意味著什麼呢?我們知道流處理平台有以下三種特性 它可以用於兩大類別的應用 為了理解kafka是如何做到以上所說的功能,從下面開始,我們將深入探索kafka的特性。首先是一些概念 kafka有四個核心的api 讓我們首先深入了解下kafka的核...
Kafka原始碼分析之KafkaProducer
kafkaproducer是乙個kafka客戶端實現,可以發布記錄records至kafka集群。kafkaproducer是執行緒安全的,多執行緒之間共享單獨乙個producer例項通常會比多個producer例項要快。kafkaproducer包含一組快取池空間,儲存尚未傳輸到集群的記錄reco...
kafka原始碼分析 scheduler分析
kafka scheduler用於執行作業的排程程式。控制乙個在後台定期重複執行或者延遲排程的作業!主要有一下操作 初始化任務以便可以接受任務排程 def startup 當任務排程完成關閉。def shutdown 延遲佇列實現 排程任務 def schedule name string,fun ...