我們使用flinkkafkaconumser,並且啟用checkpoint,偏移量會通過checkpoint儲存到state裡
面,並且缺省會寫入到kafka的特殊主體中,也就是__consumer_offset
setcommitoffsetsoncheckpoints 缺省會true,就是把偏移量寫入特殊主題中
flink自動重啟的過程中,讀取的偏移量是state中的偏移量,如果state裡面沒有那麼從
__consumer_offset裡讀取偏移量,如果__consumer_offset裡面沒有那麼就會從earliest或者lastest讀取資料
redis通過冪等性實現僅一次語義
4.1.5 寫kafka保證exactly once
兩階段提交
flink的兩階段提交
核心原始碼
通過冪等性實現僅一次語義
在分布式系統中,可以使用兩階段提交來實現事務性從而保證資料的一致性,兩階段提交分為:預提交階段與
提交階段,通常包含兩個角色:協調者與執行者,協調者用於用於管理所有執行者的操作,執行者用於執行具
體的提交操作,具體的操作流程:
首先協調者會送預提交(pre-commit)命令有的執行者
執行者執行預提交操作然後傳送一條反饋(ack)訊息給協調者
待協調者收到所有執行者的成功反饋,則傳送一條提交資訊(commit)給執行者
執行者執行提交操作
如果在流程2中部分預提交失敗,那麼協調者就會收到一條失敗的反饋,則會傳送一條rollback訊息給所有執
行者,執行回滾操作,保證資料一致性;但是如果在流程4中,出現部分提交成功部分提交失敗,那麼就會造
成資料的不一致,因此後面也提出了3pc或者通過其他補償機制來保證資料最終一致性
flink中兩階段提交是為了保證端到端的exactly once,主要依託checkpoint機制來實現,先看一下
checkpoint的整體流程,
1.jobmanager會週期性的傳送執行checkpoint命令(start checkpoint);
2.當source端收到執行指令後會產生一條barrier訊息插入到input訊息佇列中,當處理到barrier時
會執行本地checkpoint, 並且會將barrier傳送到下乙個節點,當checkpoint完成之後會傳送一條ack信
息給jobmanager;
當所有節點都完成checkpoint之後,jobmanager會收到來自所有節點的ack資訊,那麼就表示一次
完整的checkpoint的完成;
jobmanager會給所有節點傳送一條callback資訊,表示通知checkpoint完成訊息。接下來就可以
提交事務了
對比flink整個checkpoint機制呼叫流程可以發現與2pc非常相似,jobmanager相當於協調者,flink提
供了checkpointedfunction與checkpointlistener這樣兩個介面,checkpointedfunction中有
snapshotstate方法,每次checkpoint觸發執行方法,通常會將快取資料放入狀態中,可以理解為是乙個
hook,這個方法裡面可以實現預提交,checkpointlistener中有notifycheckpointcomplete方法,
checkpoint完成之後的通知方法,這裡可以做一些額外的操作,比如真正提交kafka的事務;在2pc中提到
如果對應流程2預提交失敗,那麼本次checkpoint就被取消不會執行,不會影響資料一致性.如果流程4失
敗,那麼重啟從上一次的checkpoints重新計算。
第一種【yarn-session.sh(開闢資源)+flink run(提交任務)】
啟動乙個一直執行的flink集群
/bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
•執行任務
•./bin/flink run wordcount.jar --hostname *** --port 8888
停止任務 【web介面或者命令列執行cancel命令】
第二種【flink run -m yarn-cluster(開闢資源+提交任務)】
• 啟動集群,執行任務
•./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024
./examples/batch/wordcount.jar
注意:client端必須要設定yarn_conf_dir或者hadoop_conf_dir或者hadoop_home環境變數,通過這
個環境變數來讀取yarn和hdfs的配置資訊,否則啟動會失敗
基於袋鼠雲
Flink 狀態一致性
當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是 正確性級別 的另一種說法,也就是說在成功處理故障並恢復之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登入的使用者計數。在系統經歷故障之後,計數結果是多少?如果有偏差,是有漏掉的計數...
flink 狀態一致性(十三)
狀態一致性1.有狀態的流處理,內部每個運算元任務都可以有自己的狀態 2.對於流處理內部來說,所謂的狀態一致性就是我們所說的計算結果要保證準確 3.一條資料不丟失,也不重複計算 4.在遇到故障時可以恢復狀態,恢復以後的重新計算,結果應該也是完成正確的狀態一致性分類 1.exactly once 恰好處...
架構師之路 3 session一致性架構設計實踐
一 緣起 什麼是session?伺服器為每個使用者建立乙個會話,儲存使用者的相關資訊,以便多次請求能夠定位到同乙個上下文。web開發中,web server 可以自動為同乙個瀏覽器的訪問使用者自動建立 session 提供資料儲存功能。最常見的,會把使用者的登入資訊 使用者資訊儲存在 session...