解決思路
小結
cannot find meta data file '_metadata'
in directory 'hdfs://nn-ha-service/flink/flink-checkpoints'
首先我們來掃盲一下相關知識點。
官網是這樣定義的:檢查點通過允許恢復狀態和相應的流位置使flink中的狀態容錯,從而為應用程式提供與無故障執行相同的語義。
檢查點的主要目:在意外的作業失敗時提供恢復機制。
checkpoint的生命週期由flink管理,即flink建立,擁有和發布checkpoint - 無需使用者互動。作為一種恢復和定期觸發的方法,checkpoint實現的兩個主要設計目標是:
i)建立輕量級。
ii)盡可能快地恢復。
有兩種方式:
通過配置檔案全域性配置
這種配置方式是全域性的,即預設檢查點路徑。
搭建過集群的同學應該知道,配置檔案一般都在conf目錄下。沒錯,flink也是一樣,我們需要修改的就是 flink/conf/flink-conf.yaml 這個檔案。
在配置檔案裡面將下面這個引數配置上:
state.checkpoints.dir: hdfs:
//nn-
ha-service/flink/flink-checkpoints
通過在作業裡配置
就是在每個作業裡,自定義檢查點路徑,優先順序高於預設檢查點路徑。
如果沒有定義,就走預設檢查點路徑。
streamexecutionenvironment env = streamexecutionenvironment.
getexecutionenvironment()
; env.
setstatebackend
(new
rocksdbstatebackend
("hdfs:///flink/flink-checkpoints/"
);
flink預設是不會啟用檢查點的,這需要我們在**裡面設定。
**如下:
streamexecutionenvironment env = streamexecutionenvironment.
getexecutionenvironment()
;//設定檢查點間隔時間
env.
enablecheckpointing
(60000
);
對,我們只需要設定下檢查點的時間間隔,單位是ms,就說明啟動了了乙個一分鐘完成一次的檢查點策略了。
使用檢查點,即在意外的作業失敗後恢復資料。
這時,我們只需要指定檢查點路徑重啟任務即可。
官網給的示例:
$ bin/flink run -s :checkpointmetadatapath [
:runargs]
checkpointmetadatapath : 這個是檢查點元資料路徑,並不簡單是所配置的檢查點的路徑。筆者遇到的問題關鍵就在於此。
由報出來的錯誤我們不難看出,flink是希望找到_metadata這個元資料檔案,可我並沒有指定此檔案,所以報錯了。
先介紹下我用的啟動命令:
flink run -d -c com.flinkcheckpointtest \
-s hdfs:
//nn-
ha-service/flink/flink-checkpoints/ \
-m yarn-cluster \
-yjm 1024m -ytm 1024m -ynm flinkcheckpointtest \
/home/jar/flink_test.jar
想必這個命令知道flink的同學都很熟悉,不熟悉的可以參考下flink官網(
因此我去檢查點目錄下查詢了一番,終於讓我找到了 _metadata 檔案。
如下圖所示:
我們仔細看下這個目錄:
/flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/chk-
2836
前面兩層目錄是我們設定的檢查點目錄,後面一串像是id。
最後那個chk-2836應該就是實際檢查點的目錄,2836代表當前作業已經生成過2836次檢查點了。最新的檢查點覆蓋之前的檢查點。
/flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/ 該目錄下面就乙個chk開頭的目錄,即始終儲存最新的目錄,節省hdfs資源。
中間那一串id經過仔細查閱官網得知是jobid
好,那麼現在主要的問題就是如何尋找對應作業的jobid
有兩種情況:
作業正在執行
這種情況,可以直接去flink的管理介面找到。如下圖:
作業已經停止
這種情況,就需要去看此作業的詳細日誌了。如下圖:
jobmanager.log 日誌裡面多次出現 作業名和 jobid
實際工作中,用到檢查點的時候肯定是因為作業異常中斷,所以大多數情況都是通過第二種方式去尋找作業對應的 jobid
修改後的提交作業命令:
flink run -d -c com.flinkcheckpointtest \
-s hdfs:
//nn-
ha-service/flink/flink-checkpoints/
541eaf031c6f8ed50916342b2baafe98/chk-
13 \
-m yarn-cluster \
-yjm 1024m -ytm 1024m -ynm flinkcheckpointtest \
/home/jar/flink_test.jar
執行了一段時間,作業異常中斷,發現是記憶體溢位錯誤,故適當調節記憶體,最終提交命令如下:
flink run -d -c com.flinkcheckpointtest \
-s hdfs:
//nn-
ha-service/flink/flink-checkpoints/
541eaf031c6f8ed50916342b2baafe98/chk-
13 \
-m yarn-cluster \
-yjm 4096m -ytm 4096m -ynm flinkcheckpointtest \
/home/jar/flink_test.jar
好了,至此,問題圓滿解決。
這裡順便總結下flink作業異常中斷的操作流程。
找出作業對應的jobid
進入hdfs對應目錄,找到目錄下面最新的檢查點目錄
通過指定檢查點目錄的方式重新啟動作業
待作業執行穩定,檢視作業最初異常中斷的原因,記錄下來並總結思考如何解決和避免。
檢查點 為什麼要插入檢查點 檢查點的作用
一 為什麼要插入檢查點 檢查點的作用 檢查點記錄被測系統的預期結果,在執行過程中,qtp將預期結果與實際執行結果進行比較,若一致,測試結果報告中,檢查點為passed,否則為failed。只有插入檢查點的 才具有測試能力,檢查功能點是否實現 二 標準檢查點 standard checkpoint 檢...
功能測試檢查點
測試物件 flight 程式 c s 架構 檢查mercury 是否顯示在 之間 dim a,ba window flight reservation winedit order no 4 getroproperty text b cstr a msgbox b 正規表示式檢查 if語句判定成功或者...
lr VuGen(事務 檢查點)
事務的應用 在每乙個請求前後加上transaction 和 end transaction,選單欄insert new step 記錄請求的響應時間,其中end transaction有status選項,有4個選項,若是auto就不用手工判斷事務的執行結果,但這個結果不夠準確。2 統計事務的成功率 ...