Spark Streaming大規模流式處理

2021-07-07 08:51:06 字數 3965 閱讀 5575

1.1概述:

spark streaming架構概述和原理

spark streaming案例集錦

原始碼分析與效能優化

1.2 spark streaming架構概述和原理

what is spark streaming?

是規模的,可伸縮的,實時流處理。

spark streaming的資料**除了上述kafka,flume,hdfs/s3,kinsesis,twitter之外,還可以**tcp sockets**發來的資料,並且可以使用高階函式例如,map,join,reduce和window,來構建複雜的演算法。最後被處理過的資料也可以被儲存在hdfs,databases, dashboards裡面。並且可以用流處理來處理圖計算和機器學習。

在spark streaming內部實現是接收到輸入資料之後,以時間為分片對資料進行批次處理。,切分好資料分片之後,spark engine對資料進行計算,最後,產生一批又一批的處理後的資料。對於每一批的處理batch是並行處理的。例如,一秒產生一批,如果前一秒的還沒處理完,下一秒的將不會被計算,這時候就會產生阻塞。因此這裡面的時間設定也是乙個優化點。

資料是序列輸入的,每個batch處理是並行的。

discretized streams 離散流,dstream就是一系列rdd的集合,隨著時間的流逝rdd會不斷地產生,這些rdd會被dsream管理和計算。流進來的資料會被dstream劃分為不同的時間段,每個時間段都會產生很多rdd,每個時間段是有自己id的,第乙個時間區間是[0.1),左閉右開區間的,然後依次類推。

每乙個時間段進行單詞統計,進行flatmap操作,單詞統計,然後隨著時間流逝依次類推。

batch處理流程:

這是spark streaming中的 lineage(血統)關係,處理的單位是每個 rdd, 首先對batch進行split操作,把batch轉化成切片產生partition,這裡面的平行計算指的是batch切分成分片partition平行計算,裡面的切片是平行計算。平行計算中都是batch級別的,將最終的結果以batch儲存。隨著時間的流逝,每個時間段都是batch同樣操作。

容錯性:

圖中的每個橢圓是乙個rdd,橢圓裡面的每個小圓是乙個partition,圖中的每一列多個rdd,

表示乙個dstream(圖中有三個rdd),而每一行的最後乙個rdd表示batch size所產生的中間結果。

每個rdd是lineage(血統)關係的,並且spark streaming資料**有多種,可以來自磁碟,如hdfs(多份拷貝)或

是來自網路的資料流(spark streaming 網路輸入的資料拷貝兩份到其他機器上),因此可以保證spark streaming

很高的容錯性。即使某個rdd上的partition出錯,可以並行地將其他機器上將出錯的partition計算出來。y

spark streaming 有3種主要的執行場景:

1. 無狀態操作: 每次操作都只是計算當前時間切片的內容,例如:每次只計算1秒鐘時間切片中產生的資料的rdd。

2. 有狀態操作: 要不斷的把當前和歷史的時間切片的rdd累加計算,隨著時間的流逝,計算的資料規模會越來越大。(例

如: updatestatebykey)

3. window操作: 是針對特定時間段並以特定時間間隔為單位進行的滑動操作,例如:在以1秒為時間切片的情況下,我們要統計最近10分鐘內spark streaming產生的資料,並且每個2分鐘進行一次更新。

1.3 spark streaming案例集錦

使用twitter方式收集資料

tweets是dstream,對它進行flatmap操作之後生成新的dstream.

將產生的資料儲存到hdfs,上圖是把結果儲存到磁碟上。

上圖中每乙個紅色的方框表示乙個視窗,視窗的長度為:3,滑動間隔:視窗操作的

時間間隔為:2,也就是說,每個2,對過去時間為3進行統計。注意:比如視窗長度為3

的話,則是左閉右開的,也就是說 window at time 3 是對time1 和time2 進行統計的,不

包含time3。

假設對過去10分鐘進行統計計算,minutes(10),指的是過去10分鐘,seconds(1)每個1s,這個意思就是,每隔1s對過去10分鐘的資料進行統計。上面的操作很容易就重讀計算,所以繼續往下看,對上面操作進行優化。

假設我們現在要求出 t 到 t + 4 中所有狀態的值。

滑動視窗的疊加處理上面已經介紹過了,我們現在來看增量處理。現在可以把 t-1 到 t+3

總和儲存下來,然後再與t+4時刻相加,最後減去t-1時刻的就可以了,這個過程中我們只需要儲存三個中間結果。

而(a)圖的話,要儲存5個中間結果,效率一下就提公升了40%。如果資料量很大的話,這樣的效果是非常明顯的。

具體**實現:

綜上最後優化後的**為:

1.3 效能優化

batch size 設定的大小.

假設:現在有乙個batch是10s,但是你處理的時候時間超過了10s,但是每個batch之間必須要保證前乙個batch結束下面的batch才可以執行,因此,就會造成阻塞。

如果真的阻塞的話怎麼辦?

優化記憶體使用

資料放入記憶體,要存兩塊副本,而我們只操作乙個副本,預設資料存入到記憶體是需要序列化的。我們對資料進行操作的

時候就需要反序列化,這中間就比較耗cpu。

1. 如果記憶體足夠的話,我們可以將序列化給關閉,直接寫入記憶體。

2. 如果記憶體不足的情況下,可以使用kyro序列化器。

3. 設定spark.cleaner.ttl引數:spark streaming會將接收到的資料全部儲存在記憶體中,因此很多不用的資料任然會被

儲存在記憶體中。因此可以通過設定該引數的時長,來及時清理無用的資料。

總結:

spark streaming提供了一套高效、可容錯的準實時大規模流式處理框架,spark streaming能夠整合spark批處理和互動查詢,通過簡單的介面就可以實現複雜演算法等等優點,spark streaming在未來將會有更大的發展。 期待更多spark好訊息。

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...