一般情況下,針對偶爾出現的提價失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會成功,但如果這是發生在關閉消費者或者再均衡前的最後一次提交,就要確保能夠提交成功。
因此,在消費者關閉前一般都會組合使用commitasync()
和commitsync()
他們的工作原理如下
try
consumer.commitasync();
}}cache(exception e)finally finally
}
提交特定的偏移量
提交偏移量的頻率與處理訊息批次的頻率是一樣的,但是如果想更頻繁的提交該怎麼辦?如果poll()
方法返回一大批資料,為了避免因再均衡引起的重複處理批訊息,想要在批次中間提交偏移量怎麼辦? 這種情況無法通過呼叫commitsync()
和commitasync()
來實現,因為他們只會提交最後乙個偏移量,而此時該批次裡的訊息還沒有處理完。
消費者api允許在呼叫commitsync()
和commitasync()
方法時提交的分割槽和偏移量的map,假設你處理了半個批次的訊息,最後來自主題customers
分割槽3的訊息的偏移量是5000 你可以呼叫commitsync()
來提交他,不過,因為消費者可能不只讀取乙個分割槽,你需要跟蹤所有分割槽。
下面是提交特定偏移量的例子:
private mapcurrentoffsets = new hashmap<>();
int count = 0;
...while(true)
}
同步和非同步
同步執行模式 所謂同步執行模式,是指語句在同步執行模式下,將始終保持對程式流的控制,直至 程式結束。如查詢操作,客戶機上的應用程式在向伺服器發出查詢操作的指令後,將 一直等待伺服器將查詢結果返回客戶機端,然後才繼續進行下一步操作。眾所周知,應用程式要從乙個大表中刪除所有的記錄將是非常耗時的,如果應用...
同步和非同步
同步互動 是指傳送乙個請求,需要等待返回,然後才能傳送另乙個請求,是乙個需要等待的過程。非同步互動 是指傳送乙個請求,不需要等待,隨時可以在傳送另乙個請求,是乙個不需要等待的過程。同步可以避免出現死鎖,讀髒資料的發生,一般共享某一資源的時候用,如果每個人都有修改許可權,同時修改乙個檔案,有可能使乙個...
同步和非同步
同步執行模式 所謂同步執行模式,是指語句在同步執行模式下,將始終保持對程式流的控制,直至 程式結束。如查詢操作,客戶機上的應用程式在向伺服器發出查詢操作的指令後,將 一直等待伺服器將查詢結果返回客戶機端,然後才繼續進行下一步操作。眾所周知,應用程式要從乙個大表中刪除所有的記錄將是非常耗時的,如果應用...