dstream上的原語與rdd的類似,分為transformations**換)和output operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updatestatebykey()、transform()以及各種window相關的原語。
無狀態轉化操作
無狀態轉化操作就是把簡單的rdd轉化操作應用到每個批次上,也就是轉化dstream中的每乙個rdd。部分無狀態轉化操作列在了下表中。注意,針對鍵值對的dstream轉化操作(比如 reducebykey())要新增import streamingcontext._才能在scala中使用。
需要記住的是,儘管這些函式看起來像作用在整個流上一樣,但事實上每個dstream在內部是由許多rdd(批次)組成,且無狀態轉化操作是分別應用到每個rdd上的。例如,reducebykey()會歸約每個時間區間中的資料,但不會歸約不同區間之間的資料。
舉個例子,在之前的wordcount程式中,我們只會統計5秒內接收到的資料的單詞個數,而不會累加。
無狀態轉化操作也能在多個dstream間整合資料,不過也是在各個時間區間內。例如,鍵 值對dstream擁有和rdd一樣的與連線相關的轉化操作,也就是cogroup()、join()、leftouterjoin() 等。我們可以在dstream上使用這些操作,這樣就對每個批次分別執行了對應的rdd操作。
我們還可以像在常規的spark 中一樣使用 dstream的union() 操作將它和另乙個dstream 的內容合併起來,也可以使用streamingcontext.union()來合併多個流。
有狀態轉化操作
updatestatebykey
updatestatebykey原語用於記錄歷史記錄,有時,我們需要在 dstream 中跨批次維護狀態(例如流計算中累加wordcount)。針對這種情況,updatestatebykey() 為我們提供了對乙個狀態變數的訪問,用於鍵值對形式的 dstream。給定乙個由(鍵,事件)對構成的 dstream,並傳遞乙個指定如何根據新的事件 更新每個鍵對應狀態的函式,它可以構建出乙個新的 dstream,其內部資料為(鍵,狀態) 對。
updatestatebykey() 的結果會是乙個新的 dstream,其內部的 rdd 序列是由每個時間區間對應的(鍵,狀態)對組成的。
updatestatebykey操作使得我們可以在用新資訊進行更新時保持任意的狀態。為使用這個功能,你需要做下面兩步:
定義狀態,狀態可以是乙個任意的資料型別。
定義狀態更新函式,用此函式闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。
使用updatestatebykey需要對檢查點目錄進行配置,會使用檢查點來儲存狀態。
需要在kafka建立
bin/kafka-topics.sh --zookeeper hadoop102:
2181
--create --topic donglin --partitions 3
--replication-factor 2
執行命令
bin/kafka-console-producer.sh --broker-list hadoop102:
9092
--topic donglin
import org.apache.spark.sparkconf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.
object sparkstream_kafka
}
視窗函式
window
**如下
import org.apache.spark.sparkconf
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.
object sparkstream_window
}
Spark Streaming簡介 有狀態運算元
spark streaming是微批次處理方式,批處理間隔是spark streaming是的核心概念和關鍵引數。spark streaming需要單獨乙個節點來接收資料,所以spark windowlength 視窗長度 視窗的持久時間 執行一次持續多少個時間單位 slideinterval 滑動...
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...