spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。
當批處理時間(batch processing time)大於批次間隔(batch interval,即 batchduration)時,說明處理資料的速度小於資料攝入的速度,持續時間過長或源頭資料暴增,容易造成資料在記憶體中堆積,最終導致executor oom或任務奔潰。
在這種情況下,若是基於kafka receiver的資料來源,可以通過設定spark.streaming.receiver.maxrate來控制最大輸入速率;若是基於direct的資料來源(如kafka direct stream),則可以通過設定spark.streaming.kafka.maxrateperpartition來控制最大輸入速率。當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設定這些引數一般沒什麼問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。在spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設定spark.streaming.backpressure.enabled為true,spark streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和效能。
override
def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted)
computeandpublish(processingend, elems, workdelay, waitdelay)
}
可以看到,接著又呼叫的是computeandpublish方法,如下:
private
def computeandpublish(time:
long
, elems:
long
, workdelay:
long
, waitdelay:
long):
unit
= future[
unit
]}
更深一層,具體呼叫的是rateestimator.compute方法來預估新速率,如下:
def compute(
time:
long
, elements:
long
, processingdelay:
long
, schedulingdelay:
long
): option[
double
]
spark.streaming.backpressure.enabled
預設值false,是否啟用反壓機制。
spark.streaming.backpressure.initialrate
預設值無,初始最大接收速率。只適用於receiver stream,不適用於direct stream。型別為整數,預設直接讀取所有,在1開啟的情況下,限制第一次批處理應該消費的資料,因為程式冷啟動佇列裡面有大量積壓,防止第一次全部讀取,造成系統阻塞
spark.streaming.kafka.maxrateperpartition
型別為整數,預設直接讀取所有,限制每秒每個消費執行緒讀取每個kafka分割槽最大的資料量
spark.streaming.stopgracefullyonshutdown
優雅關閉,確保在kill任務時,能夠處理完最後一批資料,再關閉程式,不會發生強制kill導致資料處理中斷,沒處理完的資料丟失
只有 1+3 啟用的時候,每次消費讀取的數量最大會等於3設定的值,最小是spark根據系統負載自動推斷的值,消費的資料量會在這兩個範圍之內變化根據系統情況,但第一次啟動會有多少讀多少資料。此後按 1+3 設定規則執行
1+2+3 同時啟用的時候,跟上乙個消費情況基本一樣,但第一次消費會得到限制,因為我們設定第一次消費的頻率了。
spark.streaming.backpressure.rateestimator
預設值pid,速率控制器,spark 預設只支援此控制器,可自定義。
spark.streaming.backpressure.pid.proportional
預設值1.0,只能為非負值。當前速率與最後一批速率之間的差值對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.integral
預設值0.2,只能為非負值。比例誤差累積對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.derived
預設值0.0,只能為非負值。比例誤差變化對總控制訊號貢獻的權重。用預設值即可。
spark.streaming.backpressure.pid.minrate
預設值100,只能為正數,最小速率。
//啟用反壓機制
conf.set(
"spark.streaming.backpressure.enabled"
,"true"
)//最小攝入條數控制
conf.set(
"spark.streaming.backpressure.pid.minrate"
,"1"
)//最大攝入條數控制
conf.set(
"spark.streaming.kafka.maxrateperpartition"
,"12"
)//初始最大接收速率控制
conf.set(
"spark.streaming.backpressure.initialrate"
,"10"
)
要保證反壓機制真正起作用前spark 應用程式不會崩潰,需要控制每個批次最大攝入速率。以direct stream為例,如kafka direct stream,則可以通過spark.streaming.kafka.maxrateperpartition引數來控制。此引數代表了 每秒每個分割槽最大攝入的資料條數。假設batchduration為10秒,spark.streaming.kafka.maxrateperpartition為12條,kafka topic 分割槽數為3個,則乙個批(batch)最大讀取的資料條數為360條(31210=360)。同時,需要注意,該引數也代表了整個應用生命週期中的最大速率,即使是背壓調整的最大值也不會超過該引數。 Spark Streaming反壓機制
反壓 back pressure 機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time...
Spark Streaming反壓機制探秘
spark streaming中的反壓機制是spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。當批處理時間 batch processing time 大於批次間隔 batch interval,即 batchduration 時,說明處理資料的速度小於資料攝入的速度,持續時間過...
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...