1.spark可以通過checkpoint的方式來維護kafka的偏移量,配置簡單,只需要配置checkpoint的路徑就可以完成偏移量的維護,如果本身spark業務就採用了state狀態,那麼既不需要額外配置即可確保偏移量的維護。
原理:spark會將kafka spark straming處理的topic以及對應消費偏移量持久化到檔案當中,當spark任務崩潰後,儲存在持久化檔案的偏移量將會通過反序列化得到,達到繼續崩潰前的偏移量繼續消費的目的。
優點:配置方便,幾乎不需要額外的**量。
缺點:本身不需要state的任務會有一些額外要注意的點,廣播變數在恢復的時候需要 重新廣播,否則再重新訪問時將會直接崩潰。同一批資料如果存在問題沒有正常trycatch,再下次恢復重啟後將會直接跳過該批資料,對資料的質量存在一定風險。spark一些配置修改之後,需要刪除checkpoint目錄才能起作用,也會導致偏移量的失去。環境中的kafka如果被清空,也需要刪除kafka目錄,否則無法恢復。
2.spark在010的kafka api中給出了非同步提交偏移量的介面,可以通過將偏移量提交的方式來維護偏移量在kafka上。
原理:在kafka stream每批rdd生成的compute()方法中,將會在末尾非同步提交之前的偏移量到kafka上,而傳送的具體偏移量是在rdd處理的末尾通過commitasync()提交到stream的。
優點:可以規避checkpoint帶來的一些約束,修改配置不需要刪除checkpoint檔案也不會導致偏移量的丟失,環境中的kafka被清空只需要簡單重啟就能解決。
缺點:對**的編寫規範具有要求,如果任務的try catch不全面將會導致無法規避掉的崩潰問題,只有修改**或者更換groupid能夠解決。
kafka控制offset偏移量
通過kafkaconsumer.seek 來控制offset.注意如果設定了手動提交需要 commitasync 否則不生效。使用場景 1.消費拋異常 offset 1跳過 2.重新消費前面的資料 訊息寫入hbase失敗 重寫 try if consumerrecords.isempty catch...
kafka指定偏移量拉取與偏移量半自動提交
離去年寫了有關偏移量有關文章快一年了,但最近在偏移量方面遇到了些小問題,在這裡記錄下。還有關於偏移量半自動提交,是個很經典的問題,順便也記錄下。關於拉取指定偏移量 應該只有用consumer.assign topicpartitionlist 和consumer.seek topicpartitio...
spark消費kafka的兩種方式
一 基於receiver的方式 這種方式使用receiver來獲取資料。receiver是使用kafka的高層次consumer api來實現的。receiver從kafka中獲取的資料都是儲存在spark executor的記憶體中的 如果突然資料暴增,大量batch堆積,很容易出現記憶體溢位的問...