一、基於receiver的方式
這種方式使用receiver來獲取資料。receiver是使用kafka的高層次consumer api來實現的。receiver從kafka中獲取的資料都是儲存在spark executor的記憶體中的(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢位的問題),然後spark streaming啟動的job會去處理那些資料。
然而,在預設的配置下,這種方式可能會因為底層的失敗而丟失資料。如果要啟用高可靠機制,讓資料零丟失,就必須啟用spark streaming的預寫日誌機制(write ahead log,wal)。該機制會同步地將接收到的kafka資料寫入分布式檔案系統(比如hdfs)上的預寫日誌中。所以,即使底層節點出現了失敗,也可以使用預寫日誌中的資料進行恢復。
需要注意的要點
1、kafka中的topic的partition,與spark中的rdd的partition是沒有關係的。所以,在kafkautils.createstream()中,提高partition的數量,只會增加乙個receiver中,讀取partition的執行緒的數量。不會增加spark處理資料的並行度。
2、可以建立多個kafka輸入dstream,使用不同的consumer group和topic,來通過多個receiver並行接收資料。
3、如果基於容錯的檔案系統,比如hdfs,啟用了預寫日誌機制,接收到的資料都會被複製乙份到預寫日誌中。因此,在kafkautils.createstream()中,設定的持久化級別是storagelevel.memory_and_disk_ser。
二、基於direct的方式
這種新的不基於receiver的直接方式,是在spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用receiver來接收資料後,這種方式會周期性地查詢kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用kafka的簡單consumer api來獲取kafka指定offset範圍的資料。
這種方式有如下優點:
1、簡化並行讀取:如果要讀取多個partition,不需要建立多個輸入dstream然後對它們進行union操作。spark會建立跟kafka partition一樣多的rdd partition,並且會並行從kafka中讀取資料。所以在kafka partition和rdd partition之間,有乙個一對一的對映關係。
2、高效能:如果要保證零資料丟失,在基於receiver的方式中,需要開啟wal機制。這種方式其實效率低下,因為資料實際上被複製了兩份,kafka自己本身就有高可靠的機制,會對資料複製乙份,而這裡又會複製乙份到wal中。而基於direct的方式,不依賴receiver,不需要開啟wal機制,只要kafka中作了資料的複製,那麼就可以通過kafka的副本進行恢復。
3、一次且僅一次的事務機制:
對比:基於receiver的方式,是使用kafka的高階api來在zookeeper中儲存消費過的offset的。這是消費kafka資料的傳統方式。這種方式配合著wal機制可以保證資料零丟失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為spark和zookeeper之間可能是不同步的。
基於direct的方式,使用kafka的簡單api,spark streaming自己就負責追蹤消費的offset,並儲存在checkpoint中。spark自己一定是同步的,因此可以保證資料是消費一次且僅消費一次。
在實際生產環境中大都用direct方式
org.apache.spark
spark-core_2.11
1.6.1
org.apache.spark
spark-streaming_2.11
1.6.1
org.apache.spark
spark-streaming-kafka_2.11
1.6.1
完整測試**
import kafka.serializer.stringdecoder
import org.apache.spark.sparkconf
import org.apache.spark.streaming.
import org.apache.spark.streaming.kafka._
/** * created by administrator on 2017/6/16.
*/object kafkaconsumer
/***bin/kafka-console-producer.sh –broker-list localhost:9092 –topic mytest
*/def createstream()=
def direct()=
}
spark兩種kafka偏移量維護方式
1.spark可以通過checkpoint的方式來維護kafka的偏移量,配置簡單,只需要配置checkpoint的路徑就可以完成偏移量的維護,如果本身spark業務就採用了state狀態,那麼既不需要額外配置即可確保偏移量的維護。原理 spark會將kafka spark straming處理的t...
RocketMQ的兩種消費模式和重置消費位點
消費模式 第一種 消費者為同乙個組下的,訂閱的是同乙個topic和tag的情況 這樣的就是消費者組成了乙個集群,有多個例項,之後就是topic中的訊息只會被這些例項中的其中乙個消費 第二種 多個不同組的消費者訂閱同乙個topic和tag的情況 這樣的就是所謂的廣播模式,每個消費者都會接收到topic...
兩種消費觀念
兩種消費觀念 2006年10月 大學同班同學在北京的很多,大約十幾個人。十一期間大學同學聚會,許多人都是久未謀面。北京真的是太大了,以前租房的時候,大家都住在城裡,來往頻繁些。漸漸地,大家都買房成家,於是大都住在郊區,更甚者,五環之外佔大多數。我住京西,你住京東,他住京北.於是見一次面不亞於城際旅行...