反壓(back pressure)機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。
spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。
當批處理時間(batch processing time
)大於批次間隔(batch interval,即 batchduration
)時,說明處理資料的速度小於資料攝入的速度,持續時間過長或源頭資料暴增,容易造成資料在記憶體中堆積,最終導致executor oom或任務奔潰。
在這種情況下,若是基於receiver的資料來源,可以通過設定spark.streaming.receiver.maxrate
來控制最大輸入速率;若是基於direct的資料來源(如kafka direct stream),則可以通過設定spark.streaming.kafka.maxrateperpartition
來控制最大輸入速率。當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設定這些引數一般沒什麼問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。在spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設定spark.streaming.backpressure.enabled
為true
,spark streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和效能。
spark streaming的反壓機制主要是通過ratecontroller
元件來實現。ratecontroller
繼承自介面streaminglistener
,並實現了onbatchcompleted
方法。每乙個batch處理完成後都會呼叫此方法,具體如下:
override def onbatchcompleted
(batchcompleted: streaminglistenerbatchcompleted)
computeandpublish
(processingend, elems, workdelay, waitdelay)
}
可以看到,接著又呼叫的是computeandpublish
方法,如下:
private def computeandpublish
(time: long, elems: long, workdelay: long, waitdelay: long)
: unit =
future[unit]
}
更深一層,具體呼叫的是rateestimator.compute
方法來預估新速率,如下:
def compute
( time: long,
elements: long,
processingdelay: long,
schedulingdelay: long)
: option[double]
該方法是介面rateestimator
中的方法,會計算出新的批次每秒應攝入的記錄數。pidrateestimator
,即pid速率估算器
,是rateestimator
的唯一實現,具體估算邏輯可看pidrateestimator.compute
方法,邏輯很複雜,用到了微積分相關的知識,總之,一句話,即根據當前batch的結果和期望的差值來估算新的輸入速率。
spark.streaming.backpressure.enabled
預設值false
,是否啟用反壓機制。
spark.streaming.backpressure.initialrate
預設值無,初始最大接收速率。只適用於receiver stream
,不適用於direct stream
。
spark.streaming.backpressure.rateestimator
預設值pid,速率控制器,spark 預設只支援此控制器,可自定義。
spark.streaming.backpressure.pid.proportional
預設值1.0,只能為非負值。當前速率與最後一批速率之間的差值對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.integral
預設值0.2,只能為非負值。比例誤差累積對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.derived
預設值0.0,只能為非負值。比例誤差變化對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.minrate
預設值100,只能為正數,最小速率。
//啟用反壓
conf.
set(
"spark.streaming.backpressure.enabled"
,"true"
)//最小攝入條數控制
conf.
set(
"spark.streaming.backpressure.pid.minrate"
,"1"
)//最大攝入條數控制
conf.
set(
"spark.streaming.kafka.maxrateperpartition"
,"12"
)
注意:
a. 反壓機制真正起作用時需要至少處理乙個批
由於反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理乙個批。
b. 如何保證反壓機制真正起作用前應用不會崩潰
要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為direct stream
,如kafka direct stream
,則可以通過spark.streaming.kafka.maxrateperpartition
引數來控制。此引數代表了 每秒每個分割槽最大攝入的資料條數。假設batchduration
為10秒,spark.streaming.kafka.maxrateperpartition
為12條,kafka topic 分割槽數為3個,則乙個批(batch)最大讀取的資料條數為360條(3*12*10=360
)。同時,需要注意,該引數也代表了整個應用生命週期中的最大速率,即使是背壓調整的最大值也不會超過該引數。
建立速率控制器
info pidrateestimator: created pidrateestimator with proportional =
1.0, integral =
0.2, derivative =
0.0, min rate =
1.0
計算當前批次速率// records 記錄數(對應webui: input size)
// processing time 處理時間,毫秒(對應webui: processing time)
// scheduling delay 排程時間,毫秒(對應webui: scheduling delay)
trace pidrateestimator:
time =
1558888897224
, # records =
33, processing time =
24548
, scheduling delay =
8
預估新批次速率trace pidrateestimator:
latestrate =
-1.0
, error =
-2.344305035033404
latesterror =
-1.0
, historicalerror =
0.0010754440280267231
delaysinceupdate =
1.558888897225e9
, derror =
-8.623482003280801e-10
第一次計算跳過速率估計trace pidrateestimator: first run, rate estimation skipped
當前批次沒有記錄或沒有延遲則跳過速率估計trace pidrateestimator: rate estimation skipped
以新的預估速率執行trace pidrateestimator: new rate
=1.0
可以看到,開啟反壓後,攝入速率input rate
可以根據處理時間processing time
來調整,從而保證應用的穩定性。 Spark Streaming反壓機制探秘
spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...
Spark Streaming反壓機制原理及使用
spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...