最近flink job出現了背壓的問題, 後果是導致了checkpoint的生成超時, 影響了flink job的執行.
定位問題:
如下圖:
1) flink的checkpoint生成超時, 失敗:
checkpoint超時
2) 檢視jobmanager日誌,定位問題:
jobmanager日誌
3) 找大神幫忙定位問題, 原來是出現了背壓的問題, 緩衝區的資料處理不過來,barrier流動慢,導致checkpoint生成時間長, 出現超時的現象. (checkpoint超時時間設定了30分鐘)
下圖是背壓過高, input 和 output緩衝區都佔滿的情況
buffer緩衝區情況
4) 背壓的情況也可以在flink後台的job的jobgraph中檢視
背壓過高
下面說說flink感應反壓的過程:
下面這張圖簡單展示了兩個 task 之間的資料傳輸以及 flink 如何感知到反壓的:
flink感知背壓
記錄「a」進入了 flink 並且被 task 1 處理。(這裡省略了 netty 接收、反序列化等過程)
記錄被序列化到 buffer 中。
該 buffer 被傳送到 task 2,然後 task 2 從這個 buffer 中讀出記錄。
注意:記錄能被 flink 處理的前提是,必須有空閒可用的 buffer。
結合上面兩張圖看:task 1 在輸出端有乙個相關聯的 localbufferpool(稱緩衝池1),task 2 在輸入端也有乙個相關聯的 localbufferpool(稱緩衝池2)。如果緩衝池1中有空閒可用的 buffer 來序列化記錄 「a」,我們就序列化並傳送該 buffer。
這裡我們需要注意兩個場景:
本地傳輸:如果 task 1 和 task 2 執行在同乙個 worker 節點(taskmanager),該 buffer 可以直接交給下乙個 task。一旦 task 2 消費了該 buffer,則該 buffer 會被緩衝池1**。如果 task 2 的速度比 1 慢,那麼 buffer **的速度就會趕不上 task 1 取 buffer 的速度,導致緩衝池1無可用的 buffer,task 1 等待在可用的 buffer 上。最終形成 task 1 的降速。
遠端傳輸:如果 task 1 和 task 2 執行在不同的 worker 節點上,那麼 buffer 會在傳送到網路(tcp channel)後被**。在接收端,會從 localbufferpool 中申請 buffer,然後拷貝網路中的資料到 buffer 中。如果沒有可用的 buffer,會停止從 tcp 連線中讀取資料。在輸出端,通過 netty 的水位值機制來保證不往網路中寫入太多資料(後面會說)。如果網路中的資料(netty輸出緩衝中的位元組數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入資料。這保證了網路中不會有太多的資料。如果接收端停止消費網路中的資料(由於接收端緩衝池沒有可用 buffer),網路中的緩衝資料就會堆積,那麼傳送端也會暫停傳送。另外,這會使得傳送端的緩衝池得不到**,writer 阻塞在向 localbufferpool 請求 buffer,阻塞了 writer 往 resultsubpartition 寫資料。
這種固定大小緩衝池就像阻塞佇列一樣,保證了 flink 有一套健壯的反壓機制,使得 task 生產資料的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 task 之間的資料傳輸自然地擴充套件到更複雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。
解決辦法:
1) 首先說一下flink原來的jobgraph, 如下圖, 產生背壓的是中間的運算元,
2) 背壓是什麼??
如果您看到任務的背壓警告(例如high),這意味著它生成的資料比下游運算元可以消耗的速度快。下游工作流程中的記錄(例如從源到匯)和背壓沿著相反的方向傳播到流上方。
以乙個簡單的source -> sink工作為例。如果您看到警告source,這意味著sink消耗資料的速度比source生成速度慢。sink正在向上游運算元施加壓力source。
可以得出: 第三個運算元的處理資料速度比第二個運算元生成資料的速度, 明顯的解決方法: 提高第三個運算元的併發度, 問題又出現了: 併發度要上調到多少呢?
3) 第一次上調, 從原來的10併發 上調到 40
觀察快取池對比的情況:
併發是10的buffer情況: (背壓的情況比較嚴重, 曲線持續性地達到峰值, 會導致資源占光)
10併發的buffer情況
併發是40的buffer情況:(有了比較大的改善, 但是還是存在背壓的問題, 因為曲線有達到頂峰的時候)
40併發的buffer情況
4) 從網上了解到flink的併發度的優化策略後, 有了乙個比較好的解決方法, 把第三個運算元的並行度設定成100, 與第二個運算元的併發度一致:
這樣做的好處是,flink會自動將條件合適的運算元鏈化, 形成運算元鏈,
滿足上下游形成運算元鏈的條件比較苛刻的:
1.上下游的並行度一致
2.下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
3.上下游節點都在同乙個 slot group 中(下面會解釋 slot group)
4.下游節點的 chain 策略為 always(可以與上下游鏈結,map、flatmap、filter等預設是always)
5.上游節點的 chain 策略為 always 或 head(只能與下游鏈結,不能與上游鏈結,source預設是head)
6.兩個節點間資料分割槽方式是 forward(參考理解資料流的分割槽)
7.使用者沒有禁用 chain
運算元鏈的好處: 鏈化成運算元鏈可以減少執行緒與執行緒間的切換和資料緩衝的開銷,並在降低延遲的同時提高整體吞吐量。
flink還有另外一種優化手段就是槽共享,
flink預設開啟slot共享(所有operator都在default共享組)
預設情況下,flink 允許同乙個job裡的不同的子任務可以共享同乙個slot,即使它們是不同任務的子任務但是可以分配到同乙個slot上。 這樣的結果是,乙個 slot 可以儲存整個管道pipeline, 換句話說, flink會安排並行度一樣的運算元子任務在同乙個槽裡執行
意思是每乙個taskmanager的slot裡面都可以執行上述的整個完整的流式任務, 減少了資料在不同機器不同分割槽之間的傳輸損耗, (如果運算元之間的併發度不同, 會造成資料分割槽的重新分配(rebalance, shuffle, hash....等等), 就會導致資料需要在不同機器之間傳輸)
優化後的jobgraph, 如下圖,
合併運算元鏈
taskmanager和slot中的task情況
再次觀察快取池對比的情況:
100併發的buffer情況
背壓正常
checkpoint生成的時間沒有出現超時的情況
checkpoint正常
記錄一次壓測問題
同一套程式,之前放在伺服器上使用,公司內部壓測和發布給客戶使用,均未出現問題。後由於客戶業務需求,將其移植到嵌入式平台。公司內部壓測過程中,出現三種異常。問題1 大併發壓測,服務程序被killed掉。問題2 大併發壓測,服務掛掉,最後的列印為底層的錯誤日誌。問題3 大併發壓測,服務掛掉,列印另外的底...
mysql一次更新多條記錄問題
replace into和insert into on duplicate key 區別 create table test id tinyint 3 unsigned not null auto increment,name char 10 not null default dept char 1...
一次奇葩Hama問題記錄
對hama進行改進,引用了乙個類a a繼承了執行緒類 當該類實現如下時,graphjobrunner 中 override public final void setup bsppeerpeer throws ioexception,syncexception,interruptedexceptio...