核心概念
下面介紹kafka相關概念,以便執行下面例項的同時,更好地理解kafka.
接下來在ubuntu系統環境下測試簡單的例項。按順序執行如下命令:
進入kafka所在的目錄
命令執行後不會返回shell命令輸入狀態,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉當前終端.啟動新的終端,輸入如下命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服務端就啟動了,請千萬不要關閉當前終端。啟動另外乙個終端,輸入如下命令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zhaogw
topic是發布訊息發布的category,以單節點的配置建立了乙個叫zhaogw的topic.可以用list列出所有建立的topics,來檢視剛才建立的主題是否存在。
可以在結果中檢視到zhaogw這個topic存在。接下來用producer生產點資料:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zhaogw
hello world
hello hadoop
然後再次開啟新的終端或者直接按ctrl+c退出。然後使用consumer來接收資料,輸入如下命令:
便可以看到剛才產生的三條資訊。說明kafka安裝成功。
package com.zgw.spark.streaming
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.kafka.kafkautils
import org.apache.spark.streaming.
/** * created by zhaogw&lss on 2019/10/22.
*/object sparkstreaming_kafka
}
核心**
//kafka stream
val kafkadsream: receiverinputdstream[
(string, string)
]= kafkautils.
createstream
( streamcontext,
"dblab-virtualbox:2181"
,"zhaogw"
,map
("zhaogw"
->3)
)
檢視該方法的原始碼
def createstream
( ssc: streamingcontext,
zkquorum: string,
groupid: string,
topics: map[string, int]
, storagelevel: storagelevel = storagelevel.memory_and_disk_ser_2
)
這裡使用到了kafka
的工具類kafkautils
,第乙個引數是streamingcontext
物件,第二個引數是zk所在的主機名(與hosts檔案中的配置對應),第三個引數是groupid,第四個引數是topics。
啟動專案,並在linux主機傳送兩條訊息
然後再idea中就可以看見消費的資料了
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...