大多數情況下,我們使用 kafka 只是作為訊息處理。在有些情況下,我們需要多次讀取 kafka 集群中的資料。當然,我們可以通過呼叫 kafka 的 api 來完成,但是針對不同的業務需求,我們需要去編寫不同的介面,在經過編譯,打包,發布等一系列流程。最後才能看到我們預想的結果。那麼,我們能不能有一種 簡便的方式去實現這一部分功能,通過編寫 sql 的方式,來視覺化我們的結果。今天,筆者給大家分享一些心得,通過使用 sql 的形式來完成這些需求。
實現這些功能,其架構和思路並不複雜。這裡筆者將整個實現流程,通過乙個原理圖來呈現。如下圖所示:
這裡筆者給大家詳述一下上圖的含義,訊息資料來源存放與 kafka 集群當中,開啟低階和高階兩個消費執行緒,將消費的結果以 rpc 的方式共享出去(即:請求者)。資料共享出去後,回流經到 sql 引擎處,將記憶體中的資料翻譯成 sql tree,這裡使用到了 apache 的 calcite 專案來承擔這一部分工作。然後,我們通過 thrift 協議來響應 web console 的 sql 請求,最後將結果返回給前端,讓其以圖表的實行視覺化。
這裡,我們需要遵循 calcite 的 json models,比如,針對 kafka 集群,我們需要配置一下內容:
} ]}
另外,這裡最好對錶也做乙個表述,配置內容如下所示:
我們,可以將獲取的結果以報表的形式進行匯出。
當然,我們可以在 profile 模組下,瀏覽查詢歷史記錄和當前正在執行的查詢任務。至於其他模組,都屬於輔助功能(展示集群資訊,topic 的 partition 資訊等)這裡就不多贅述了。
分析下來,整體架構和實現的思路都不算太複雜,也不存在太大的難點,需要注意一些實現上的細節,比如消費 api 針對集群訊息引數的調整,特別是低階消費 api,尤為需要注意,其 fetch_size 的大小,以及 offset 是需要我們自己維護的。在使用 calcite 作為 sql 樹時,我們要遵循其 json model 和標準的 sql 語法來運算元據源。
Kafka SQL 引擎分享
大多數情況下,我們使用 kafka 只是作為訊息處理。在有些情況下,我們需要多次讀取 kafka 集群中的資料。當然,我們可以通過呼叫 kafka 的 api 來完成,但是針對不同的業務需求,我們需要去編寫不同的介面,在經過編譯,打包,發布等一系列流程。最後才能看到我們預想的結果。那麼,我們能不能有...
Kafka SQL 引擎分享
大多數情況下,我們使用 kafka 只是作為訊息處理。在有些情況下,我們需要多次讀取 kafka 集群中的資料。當然,我們可以通過呼叫 kafka 的 api 來完成,但是針對不同的業務需求,我們需要去編寫不同的介面,在經過編譯,打包,發布等一系列流程。最後才能看到我們預想的結果。那麼,我們能不能有...
Kafka SQL 引擎分享
大多數情況下,我們使用 kafka 只是作為訊息處理。在有些情況下,我們需要多次讀取 kafka 集群中的資料。當然,我們可以通過呼叫 kafka 的 api 來完成,但是針對不同的業務需求,我們需要去編寫不同的介面,在經過編譯,打包,發布等一系列流程。最後才能看到我們預想的結果。那麼,我們能不能有...