flink 1.11 引入了寫hive的功能後,已經在上線了一段時間。下面就聊聊我自己對flink 寫hive 小檔案的一些問題和看法。
1.flink 寫 hive 可能會產生小檔案嗎?
簡單的說,答案是會。
簡單總結下flink 讀kafka寫hive的流程
1.flink 將kafka資料根據設定的分割槽策略,實時寫入對應分割槽hdfs 目錄的臨時檔案 inprogress,如下圖所示。
在inprogress檔案的資料,通過hive是無法查詢到的。
2.打checkpoint時,將inprogress檔案的資料刷到正式檔案中,並提交kafka offset。
這裡有乙個關鍵的點:flink中乙個寫hive的並行度,同一時間只能寫乙個hdfs檔案
那麼就有乙個問題,如果我設定3個並行度,乙個checkpoint週期是不是會生成3個檔案呢?
這裡有乙個關鍵的引數:'sink.shuffle-by-partition.enable'
以上圖hive表為例,一級分割槽為年月日,二級分割槽為小時,按照資料的event時間分割槽。
設定3個並行度,如下圖所示
我們看到,3個並行度都有資料在寫,那麼hdfs上對應就有3個inprogress檔案,checkpoint後會有對應的3個hdfs檔案。(1.12的小檔案合併功能下面再討論)
我們可以看到,即使設定了3個並行度,也只有乙個並行度有資料在寫。這樣的話,乙個checkpoint週期中,只會生成乙個hdfs檔案。
但是這裡有乙個前提,當前所有資料都落在統一分割槽中,即當前小時,也就是說沒有跨小時的延遲。
如果正好是跨小時的時候,由於我們使用的是eventtime 作為分割槽,例如10:00:01s時,既有10點的資料,也有9點的資料,那麼就會有多個並行度有資料在寫,這也很容易理解,因為不同分割槽,肯定是不同的hdfs檔案。
通過這個特點,在'sink.shuffle-by-partition.enable'=true的情況下,我們也很容易看出,資料落地hive是否有延遲。
那麼問題又來了。如果考慮到小檔案的問題,什麼情況下需要將'sink.shuffle-by-partition.enable' 設定為false?
我的答案是,當單個分割槽的寫入速度超過單個並行度寫入hive速度極限時,因為如果這時還設定為true,則會永遠反壓,消費的速度跟不上生產的速度。不過好在flink寫hive在1.11.3版本之後,效能還是不錯的。所以大部分情況,建議設定為true。
場景一
任務編號
業務資料量
寫hive單個並行度極限
並行度sink.shuffle-by-partition.enable
110w/s
4w/s
3+flase
23w/s
4w/s
2+true
任務1: 10w資料會被平均分配到3個hdfs檔案中,每個檔案資料行數為3.3w/s*checkpoint週期。
任務2:在資料不跨小時延遲的情況下,3w資料會在1個hdfs檔案中,每個檔案資料行數為3w/s*checkpoint週期。
再通過設定checkpoint週期,我們可以大大減少小檔案產生的概率。
但是如果時間到了凌晨,資料量很少的情況
場景二
任務編號
業務資料量
寫hive單個並行度極限
並行度sink.shuffle-by-partition.enable
11k/s
4w/s
3+flase
21k/s
4w/s
2+true
任務1: 1k資料會被平均分配到3個hdfs檔案中,每個檔案資料行數為0.33k/s*checkpoint週期。
任務2:在資料不跨小時延遲的情況下,1k資料會在1個hdfs檔案中,每個檔案資料行數為1k/s*checkpoint週期。
這種情況下任務1的小檔案會是任務2的3倍。這也是為什麼我建議在效能跟得上時,將sink.shuffle-by-partition.enable設定為true。
關於flink 1.12的小檔案自動合併。
引數很簡單,就是按照設定的檔案大小合併。我想著重說下的是only files in a single checkpoint are compacted 這句話,只有同乙個checkpoint週期的檔案會合併。
所以對於資料量少的凌晨(場景二),任務1的小檔案數量會改善到任務2的水平,但是也無法完全避免小檔案的存在。
為了減少flink 寫hive的小檔案
1.效能滿足的情況下,盡量設定'sink.shuffle-by-partition.enable'=true
2.如果設定了'sink.shuffle-by-partition.enable'=false,建議使用flink 1.12版本的自動合併小檔案功能。
3.設定合理的checkpoint週期,業務允許的情況下,可以加大checkpoint週期,減少生成檔案的數量。
4.可以最大限度降低flink產生小檔案的情況,但是無法完全避免,根據實際情況定期合併小檔案。
附上使用spark3 合併 小檔案的攻略。
聊一聊hive資料傾斜
info基本資訊表 user id name agegender 1henry16男 2jack17男 3anny18女 4candy19女 5kate20女 burke 21frank 22ellen 23ken 24mili 25.score成績表 user id subject id scor...
聊一聊小甜餅
cookies程式設計 cookie是儲存在客戶端的小文字,儲存的位置分為兩種 cookie可能儲存在客戶端瀏覽器的所佔記憶體中,關閉瀏覽器後,cookies就不再存在。cookie也可能儲存在客戶pc機的硬碟上,設定有效時間,超過有效時間後失效。cookie的常見應用 簡化登入 很多 在登入時,可...
聊一聊 Flask 的 jsonify
首先我們來看一段 python from flask import flask,jsonify tasks api v1.0 tasks methods get defget tasks return jsonify if name main true 在這段 裡面,我們看到了今天的主角jsonif...