架構
nimbus 分配任務、資源排程、上傳jar包
zookeeper **協調、健康檢查(心跳)
supervisor 接收nimbus任務、開啟/關閉自己管理的worker程序(可以開啟n個woker)
worker 執行具體處理運算元件的程序(每個worker對應執行乙個topology的子集)
執行任務(可以執行n個task(spolt–推送資料/bolt–邏輯單元處理)任務)
附:每個worker中有1-n個程序,乙個程序可以執行1-n個task任務(spout,blot)
集群出現的問題以及解決辦法
集群節點宕機--物理問題
nimbus
在交付任務給zk之後短時間內沒有更多實質性任務,此時nimbus可以閒置
當supervisor 和nimbus同時宕機,此時nimbus需要重啟
當客戶端推送新任務時,nimbus宕機,此時nimbus需要重啟
supervisor
當supervisor宕機時,zk會通知nimbus,再重新將任務分配給其他supervisor。當宕機的supervisor恢復後,會心跳到zk尋找自己之前的任務,如果任務被分配則執行快速自毀機制(殺死未完成原任務的程序),重新處於等待
worker
當worker宕掉時,supervisor先發現,並重新啟動worker,如果長時間啟動不了,zk會發現當前worker宕掉,此時zk會將宕掉的worker任務交由其他worker完成。當宕機的worker恢復後,會執行快速自毀機制(殺死未完成原任務的程序),重新處於等待
程序掛掉
當程序掛掉時,由zk將該程序的任務重新分配新的程序
訊息的完整性
將包裝的資料流做16位的二進位制streamid,每發生stream流互動時將前後的id進行異或運算,最終異或結果為0,則資料傳遞成功沒有故障;若傳遞故障則推送給上乙個節點重新傳送。
tips: 與 全1為1,有0則0 要求嚴格
或 有1為1,全0為0 要求不嚴
異或 相異為1,相同為0
ack無法保證資料不被重複計算,但是可以保證資料至少被正確使用一次
分布式遠端過程呼叫 —drpcserver在客戶端和storm之間承擔橋梁作用
客戶端通過drpcserver將所需args傳送給spout,再和bolt進行計算後,經過resultbolt彙總後將結果交由drpcserver,最終由drpcserver將結果展示給客戶端
tops:drpcserver與spout、resultbolt互動時都帶有request-id用於分別不同客戶端需求
定義drpc拓撲:
方法1:
通過lineardrpctopologybuilder (該方法也過期,不建議使用)
該方法會自動為我們設定spout、將結果返回給drpc server等,我們只需要將topology實現
方法2:
直接通過普通的拓撲構造方法topologybuilder來建立drpc拓撲
需要手動設定好開始的drpcspout以及結束的returnresults
集群的分布式訊息佇列系統,遵循先進先出,資料有序原則
角色: producers(生產者)、broker(快取**)、consumers(消費者)
storm處於消費者的地位
在生產資料之前先建立topic,在topic下面生產資料,生產出同乙個主題的東西可以根據實際的業務需求放到不同的partitions(分割槽)上,在同乙個分割槽上生產和消費的順序是有序的(依據偏移量)
架構原理:kafka給producer和consumer提供註冊的介面,資料從producer傳送到broker,broker承擔乙個中間快取和分發的作用,負責分發註冊到系統中的consumer。
乙個分割槽屬於乙個broker,broker是分割槽的master
zk管理broker
集群規劃:
zookeeper集群共三颱伺服器,分別為:node1、node2、node3。
kafka集群共三颱伺服器,分別為:node1、node2、node3。
1、zookeeper集群準備
kafka是乙個分布式訊息佇列,需要依賴zookeeper,請先安裝好zk集群。
zookeeper集群安裝步驟略。
2、安裝kafka
解壓:
tar zxvf kafka_2.10-0.9.0.1.tgz -c /opt/
mv kafka_2.10-0.9.0.1/ kafka
修改配置檔案:config/server.properties
核心配置引數說明:
broker.id: broker集群中唯一標識id,0、1、2、3依次增長(broker即kafka集群中的一台伺服器)
注:當前kafka集群共三颱節點,分別為:node1、node2、node3。對應的broker.id分別為0、1、2。
zookeeper.connect: zk集群位址列表
將當前node1伺服器上的kafka目錄複製到其他node2、node3伺服器上:
scp -r /opt/kafka/ node2:/opt
scp -r /opt/kafka/ node3:/opt
修改node2、node3上kafka配置檔案中的broker.id(分別在node2、3伺服器上執行以下命令修改broker.id)
sed -i -e 『s/broker.id=.*/broker.id=1/』 /opt/kafka/config/server.properties
sed -i -e 『s/broker.id=.*/broker.id=2/』 /opt/kafka/config/server.properties
3、啟動kafka集群
a、啟動zookeeper集群。
b、啟動kafka集群。
分別在三颱伺服器上執行以下命令啟動:
bin/kafka-server-start.sh config/server.properties
4、測試
建立話題(kafka-topics.sh --help檢視幫助手冊)
建立topic:
bin/kafka-topics.sh --zookeeper sxt002:2181,sxt003:2181,sxt004:2181 --create --replication-factor 2 --partitions 3 --topic test
(引數說明:
–replication-factor*:指定每個分割槽的複製因子個數,預設1個*
–partitions*:指定當前建立的kafka分割槽數量,預設為1個*
–topic*:指定新建topic的名稱)*
檢視topic列表:
bin/kafka-topics.sh --zookeeper node4:2181,node2:2181,node3:2181 --list
檢視「test」topic描述:
bin/kafka-topics.sh --zookeeper node4:2181,node2:2181,node3:2181 --describe --topic test
建立生產者:
bin/kafka-console-producer.sh --broker-list sxt002:9092,sxt003:9092,sxt004:9092 --topic test2
建立消費者:
bin/kafka-console-consumer.sh --zookeeper node4:2181,node2:2181,node3:2181 --from-beginning --topic test
檢視幫助手冊:
bin/kafka-console-consumer.sh help
Storm容錯機制
1.bolt任務crash引起的訊息未被應答。此時,acker中所有與此bolt任務關聯的訊息都會因為超時而失敗,對應的spout的fail方法將被呼叫。2.acker任務失敗。如果acker任務本身失敗了,它在失敗之前持有的所有訊息都將超時而失敗。spout的fail方法將被呼叫。3.spout任...
Storm訊息容錯機制(ack fail機制)
storm訊息容錯機制 ack fail 1 介紹 給每個tuple指定 id告訴 storm 系統,無論處理成功還是失敗,spout 都要接收 tuple 樹上所有節點返回的通知。如果處理成功,spout 的ack 方法將會對編號是 msgid 的訊息應答確認 如果處理失敗或者超時,會呼叫 fai...
Spark Spark容錯機制
一般來說,分布式資料集的容錯性有兩種方式 資料檢查點和記錄資料的更新。面向大規模資料分析,資料檢查點操作成本很高,需要通過資料中心的網路連線在機器之間複製龐大的資料集,而網路頻寬往往比記憶體頻寬低得多,同時還需要消耗更多的儲存資源。因此,spark選擇記錄更新的方式。但是,如果更新粒度太細太多,那麼...