簡介:滴滴實時計算引擎從 flink-1.4 無縫公升級到 flink-1.10 版本,做到了完全對使用者透明。並且在新版本的指標、排程、sql 引擎等進行了一些優化,在效能和易用性上相較舊版本都有很大提公升。作者|alan在本次公升級之前,我們使用的主要版本為 flink-1.4.2,並且在社群版本上進行了一些增強,提供了 streamsql 和低階 api 兩種服務形式。現有集群規模達到了 1500 臺物理機,執行任務數超過 12000 ,日均處理資料 3 萬億條左右。
不過隨著社群的發展,尤其是 blink 合入 master 後有很多功能和架構上的公升級,我們希望能通過版本公升級提供更好的流計算服務。今年 2 月份,里程碑版本 flink-1.10 發布,我們開始在新版上上進行開發工作,踏上了充滿挑戰的公升級之路。
作為 flink 社群至今為止的最大的一次版本公升級,加入的新特性解決了之前遇到很多的痛點。
flink sql 原生支援了 ddl 語法,比如 create table/create function,可以使用 sql 進行元資料的註冊,而不需要使用**的方式。
也提供了 catalog 的支援,預設使用 inmemorycatalog 將資訊臨時儲存在記憶體中,同時也提供了 hivecatalog 可以與 hivemetastore 進行整合。也可以通過自己拓展 catalog 介面實現自定義的元資料管理。
之前對 flink 記憶體的管理一直是乙個比較頭疼的問題,尤其是在使用 rocksdb 時,因為乙個 taskmanager 中可能存在多個 rocksdb 例項,不好估算記憶體使用量,就導致經常發生記憶體超過限制被殺。
在新版上增加了一些記憶體配置,例如 state.backend.rocksdb.memory.fixed-per-slot 可以輕鬆限制每個 slot的rocksdb 記憶體的使用上限,避免了 oom 的風險。
本次公升級最大的挑戰是,如何保證 streamsql 的相容性。streamsql 的目的就是為了對使用者遮蔽底層細節,能夠更加專注業務邏輯,而我們可以通過版本公升級甚至更換引擎來提供更好的服務。保證任務的平滑公升級是最基本的要求。
由於跨越多個版本架構差距巨大,內部 patch 基本無法直接合入,需要在新版本上重新實現。我們首先整理了所有的歷史 commit,篩選出那些必要的修改並且在新版上進行重新實現,目的是能覆蓋已有的所有功能,確保新版本能支援現有的所有任務需求。
例如:社群在 1.4 版本時,flinksql還處於比較初始的階段,也沒有原生的 ddl 語法支援,我們使用 antlr 實現了一套自定義的 ddl 語法。但是在 flink1.10 版本上,社群已經提供了原生的 ddl 支援,而且與我們內部的語法差別較大。現在擺在我們面前有幾條路可以選擇:
最終我們選用了第三種方案,這樣可以最大限度的減少和引擎的耦合,作為外掛程式執行,未來再有引擎公升級完全可以復用現有的邏輯,能夠降低很多的開發成本。
例如:我們在舊版本上使用 "json-path" 的庫實現了 json 解析,通過在建表語句裡定義類似 $.status 的表示式表示如何提取此欄位。
新版本上原生的 json 型別解析可以使用 row 型別來表示巢狀結構,在轉換為新語法的過程中,將原本的表達是解析為樹並構建出新的字段型別,再使用計算列的方式提取出原始表中的字段,確保表結構與之前一致。型別名稱、配置屬性也通過對映轉換為社群語法。
最後是測試階段,需要進行完善的測試確保所有任務都能做到平滑公升級。我們原本的計畫是準備進行回歸測試,對已有的所有任務替換配置後進行回放,但是在實際操作中有很多問題:
所以我們按任務的提交流程分成多個階段進行測試,只有在當前階段能夠全部測試通過後後進入下乙個階段測試,提前發現問題,將問題定位範圍縮小到當前階段,提高測試效率。
除了對舊版本的相容,我們也結合了新版本的特性,對引擎進行了增強。
我們一直希望能精確衡量任務的負載狀況,使用反壓指標指標只能粗略的判斷任務的資源夠或者不夠。
結合新版的 mailbox 執行緒模型,所有互斥操作全部執行在 taskthread 中,只需統計出線程的占用時間,就可以精確計算任務負載的百分比。
未來可以使用指標進行任務的資源推薦,讓任務負載維持在乙個比較健康的水平。
在 flip-6 後,flink 修改了資源排程模型,移除了--container 引數,slot 按需申請確保不會有閒置資源。但是這也導致了乙個問題,source 的併發數常常是小於最大併發數的,而 subtask 排程是按 dag 的拓撲順序排程,這樣 sourcetask 就會集中在某些 taskmanager 中導致熱點。
我們加入了"最小 slot 數"的配置,保證在 flink session 啟動後立即申請相應數量的 slot,且閒置時也不主動退出,搭配 cluster.evenly-spread-out-slots 引數可以保證在 slot 數充足的情況下,subtask 會均勻分布在所有的 taskmanager 上。
以滾動視窗為例 tumble(time_attr, interval '1' day),視窗為一天時開始和結束時間固定為每天 0 點 -24 點,無法做到生產每天 12 點-次日 12 點的視窗。
對於**可以通過指定偏移量實現,但是 sql 目前還未實現,通過增加引數 tumble(time_attr, interval '1' day, time '12:00:00') 表示偏移時間為 12 小時。
還有另外一種場景,比如統計一天的 uv,同時希望展示當前時刻的計算結果,例如每分鐘觸發視窗計算。對於**開發的方式可以通過自定義 trigger 的方式決定視窗的觸發邏輯,而且 flink 也內建了一些 tigger 實現,比如 continuoustimetrigger 就很適合這種場景。所以我們又在視窗函式裡增加了一種可選引數,代表視窗的觸發週期,tumble(time_attr, interval '1' day, interval '1' minutes) 。
通過增加 offset 和 tiggger 週期引數(tumble(time_attr, size[,offset_time][,trigger_interval])),拓展了 sql 中視窗的使用場景,類似上面的場景可以直接使用 sql 開發而不需要使用**的方式。
在很多 sql 的使用場景裡,會多次使用上乙個計算結果,比如將 json 解析成 map 並提取多個字段 。
雖然通過子查詢,看起來 json 解析只呼叫一次,但是經過引擎的優化後,通過結果表的投影 (projection) 生成函式呼叫鏈 (rexcall),結果類似:
這樣會導致 json 解析的計算重複執行了3次,即使使用檢視分割成兩步操作,經過 planner 的優化一樣會變成上邊的樣子。
對於確定性 (isdeterministic=true) 的函式來說,相同的輸入一定代表相同的結果,重複執行 3 次 json 解析其實是沒有意義的,如何優化才能實現對函式結果的復用呢?
在**生成時,將 rexcall 生成的唯一標識(digest)和變數符號的對映儲存在 codegencontext 中,如果遇到 digest 相同的函式呼叫,則可以復用已經存在的結果變數,這樣解析 json 只需要執行第一次,之後就可以復用第一次的結果。
通過幾個月的努力,新版本已經上線執行,並且作為 streamsql 的預設引擎,任務重啟後直接使用新版本執行。相容性測試的通過率達到 99.9%,可以基本做到對使用者的透明公升級。對於新接觸 streamsql 使用者可以使用社群 sql 語法進行開發,已有任務也可以修改 dml 部分語句來使用新特性。現在新版本已經支援了公司內許多業務場景,例如公司實時資料倉儲團隊依託於新版本更強的表達能力和效能,承接了多種多樣的資料需求做到穩定執行且與離線口徑保持一致。
版本公升級不是我們的終點,隨著實時計算的發展,公司內也有越來越多團隊需要使用 flink 引擎, 也向我們提出了更多的挑戰,例如與 hive 的整合做到將結果直接寫入 hive 或直接使用 flink 作為批處理引擎,這些也是我們探索和發展的方向,通過不斷的迭代向使用者提供更加簡單好用的流計算服務。
滴滴 Flink 1 10 公升級之路
在本次公升級之前,我們使用的主要版本為 flink 1.4.2,並且在社群版本上進行了一些增強,提供了 streamsql 和低階 api 兩種服務形式。現有集群規模達到了 1500 臺物理機,執行任務數超過 12000 日均處理資料 3 萬億條左右。不過隨著社群的發展,尤其是 blink 合入 m...
fedora11公升級筆記之yum update錯誤
忘了前輩的箴言 不要追求新的發行版本,如果你現在可以正常工作的話!下了fedora11的 光碟就把fc10公升級了,但是發現公升級後機子變得很慢,終端開啟要半天,還是從新安裝,把 usr分割槽格了,重新安裝 非公升級式安裝 後,除了開機沒有想象中的那麼快之外,其他還挺好,老規矩又要公升級安裝軟體 s...
mysql 之 版本公升級,從5 5公升級到5 7
首先介紹一下使用安裝yum源的方法 作業系統是centos6.4 64位 wget rpm ivh mysql57 community release el6 8.noarch.rpm 3 檢視可安裝的mysql版本 root host 172 10 2 83 software yum repoli...