SparkStreaming讀取KafKa資料

2021-10-03 09:48:57 字數 890 閱讀 8926

import org.apache.kafka.common.serialization.stringdeserializer

import org.apache.spark.sparkconf

import org.apache.spark.streaming.kafka010.kafkautils

import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent

import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe

import org.apache.spark.streaming.

object getmsg )

val wordcount=lines.flatmap(x=>).reducebykey(_+_)

wordcount.print()

ssc.start()

ssc.awaittermination()

}}/* auto.offset.reset: 可理解為kafka consumer讀取資料的策略,本地用的kafka版本為0.10,因此該引數可填earliest|latest|none。

earliest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

latest: 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料

none: topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有乙個分割槽不存在已提交的offset,則丟擲異常

*/

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...