一、背景與架構改造
1.1 問題描述
有一塊業務主要是做爬蟲抓取與資料輸出,通過大資料這邊提供的soa服務入庫到hbase,架構大致如下:
架構改造之前
以對於以上的架構存在一些問題,我們可以看見資料在dubbox服務階段處理後直接通過hbase api入庫了hbase,中間並沒做任何緩衝,要是hbase出現了問題整個集群都完蛋,沒法寫入資料,資料還丟失,hbase這邊壓力也相當大,針對這一點,對入庫hbase這個階段做了一些改造。
1.2 架構改造
改造後的架構,爬蟲通過介面服務,入庫到kafka,spark streaming去消費kafka的資料,入庫到hbase.核心元件如下圖所示:
架構改造圖
為什麼不直接入庫到hbase,這樣做有什麼好處?
緩解了hbase這邊峰值的壓力,並且流量可控;
hbase集群出現問題或者掛掉,都不會照成資料丟失的問題;
增加了吞吐量。
1.3 為什麼選擇kafka和spark streaming
由於kafka它簡單的架構以及出色的吞吐量;
kafka與spark streaming也有專門的整合模組;
spark的容錯,以及現在技術相當的成熟。
二、通過**實現具體細節,並執行專案
然後就開始寫**了,總體思路就是:
put資料構造json資料,寫入kafka;
spark streaming任務啟動後首先去zookeeper中去讀取offset,組裝成fromoffsets;
spark streaming 獲取到fromoffsets後通過kafkautils.createdirectstream去消費kafka的資料;
讀取kafka資料返回乙個inputdstream的資訊,foreachrdd遍歷,同時記錄讀取到的offset到zk中;
寫入資料到hbase。
詳細一點的架構圖
2.1 初始化與配置載入
下面是一些接收引數,載入配置,獲取配置中的topic,還有初始化配置,**如下:
//接收引數
val array(kafka_topic, timewindow, maxrateperpartition) = args
//載入配置
val prop: properties = new properties()
prop.load(this.getclass().getresourceasstream("/kafka.properties"))
val groupname = prop.getproperty("group.id")
//獲取配置檔案中的topic
val kafkatopics: string = prop.getproperty("kafka.topic." + kafka_topic)
if (kafkatopics == null || kafkatopics.length <= 0) /$i"
if (zkclient.exists(path)) else $ $ $")
//topicandpartition 主構造引數第乙個是topic,第二個是kafka partition id
val topicandpartition = topicandpartition(offsetrange.topic, offsetrange.partition)
val either = kc.setconsumeroffsets(groupname, map((topicandpartition, offsetrange.untiloffset))) //是
if (either.isleft) ")
partitionrecords.foreach(data => catch catch $ $ $")
//topicandpartition 主構造引數第乙個是topic,第二個是kafka partition id
val topicandpartition = topicandpartition(offsetrange.topic, offsetrange.partition)
val either = kc.setconsumeroffsets(groupname, map((topicandpartition, offsetrange.untiloffset))) //是
if (either.isleft) ")
/** 解析partitionrecords資料 */
if (offsetrange.topic != null) catch error ", e)
false
4.5 執行
剛測試時給它相對很小的記憶體跑一跑:
[[email protected] ~]# /opt/cloudera/parcels/cdh/bin/spark-submit \
--master yarn-client --num-executors 1 \
--driver-memory 256m --conf spark.yarn.driver.memoryoverhead=256 \
--conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryoverhead=256 \
--executor-memory 256m --conf spark.yarn.executor.memoryoverhead=256 \
--executor-cores 1 \
--class com.creditease.streaming.kafkadatastream hspark-1.0.jar 1 3 30000
五六萬的插入沒什麼壓力,但是到10萬的時候,就有些卡頓了!!
yarn 容器、cpu、記憶體大小
五六萬的插入沒什麼壓力
當然是需要增大記憶體的,修改配置,都增加一倍:
yarn 容器、cpu、記憶體大小
90000的插入沒什麼壓力
檢視插入資料量,能看到修改後插入資料10萬是沒有什麼壓力的:
檢視插入資料量,能看到修改後插入資料10萬是沒有什麼壓力的
當我們再繼續加大壓力測試的時候,效能下降:
當我們再繼續加大壓力測試的時候,效能下降
檢視統計資訊:
檢視統計資訊
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...