kafka實時監控

2021-08-27 21:50:27 字數 1605 閱讀 2477

在kafka的開發和維護中,我們經常需要了解kafka topic以及連線在其上的consumer的實時資訊,比如logsize,offset,owner等。為此kafka提供了consumeroffsetchecker,它的用法很簡單

bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group

輸出結果類似於

group topic pid offset logsize lag owner

group1 a.topic 0 2 2 0 none

group1 a.topic 1 0 0 0 none

group1 a.topic 2 2 2 0 none

我們也可以通過kafka web console一類的工具直觀地獲取kafka資訊,但如果我們要構建自己的監控系統,需要抓取這些資訊的話,有兩種辦法:一種是執行consumeroffsetchecker然後解析輸出的字串;另一種就是通過******consumer和zookeeper實時抓取資訊(換句話說就是把consumeroffsetchecker翻譯一下:)),以下介紹第二種方法的思路。

首先我們看kafka資訊在zookeeper的儲存結構

1,/brokers/topics/[topic]/partitions/[partitionid]/state

2,/brokers/ids/[0...n]

3,/consumers/[groupid]/ids/[consumerid]

4,/consumers/[groupid]/owners/[topic]/[partitionid]

5,/consumers/[groupid]/offsets/[topic]/[partitionid]

對於指定的topic和groupid,通過(1)可以拿到所有的partition資訊(pid),然後通過(5)可以拿到offset,通過(4)可以拿到owner。就差logsize還沒法拿到,事實上logsize在zookeeper中並沒有記錄,它必須通過kafka consumer的low level的api取得。

private long getlogsize(string topicname, string partitionid, long offsetrequesttime) 

return logsize;

}

第二行,建立****** consumer

第四行,建立topic和partition物件

第六行,建立offset request,第乙個引數是記錄寫入的時間,如果是kafka.api.offsetrequest.earliesttime(),則代表當前最早的一條記錄,也就是當前最小offset;如果是kafka.api.offsetrequest.latesttime(),則代表最新的一條記錄,也就是當前最大offset。第二個引數是獲取offset的個數。

由max offset和current offset,我們可以獲得當前還有多少訊息沒有被消費(lag),由(lag/(maxoffset-minoffset)),我們可以算出當前還沒有被消費的訊息佔的百分比,如果這個百分比接近100%,那麼接下來很可能會導致offset out of range exception而丟失資料。

Kafka 監控調研

1.如何監控kafka?kafka的資料統計是通過metrics的工具進行收集的,kafka大量使用了metrics做各種效能統計。kafka沒有自己的web管理介面 2.kafka的監控工具?1 jmx視覺化工具 metrics以jmx的形式提供了對外檢視資料的介面,所以我們可以在kafka啟動的...

日誌實時監控 上傳

摘要 日誌實時監控 關鍵字 common io monitor包,生產消費模式,非阻塞佇列,鉤子 看點 1.如何監聽檔案改變 common io提供minitor工具,可以監控檔案的改變 file directory new file logpath long interval timeunit.s...

實時監控系統介紹

之所以寫這篇文章,是因為最近遇到了兩個系統問題,都是看監控與實際資料表現不符,覺得還挺有意思,特記錄下。以上兩個問題經檢視伺服器日誌發現,確實都是監控系統曲線有問題。如何配置出正確的監控就是我們要解決的問題。為了方便理解,簡單介紹下一下本文所說的一些概念,先看下監控系統是如何工作的。如果你沒接觸過,...