Storm容錯機制 Drpc kafka集群搭建

2021-09-24 18:02:55 字數 4003 閱讀 7818

架構

​ nimbus 分配任務、資源排程、上傳jar包

​ zookeeper **協調、健康檢查(心跳)

​ supervisor 接收nimbus任務、開啟/關閉自己管理的worker程序(可以開啟n個woker)

​ worker 執行具體處理運算元件的程序(每個worker對應執行乙個topology的子集)

​ 執行任務(可以執行n個task(spolt–推送資料/bolt–邏輯單元處理)任務)

​ 附:每個worker中有1-n個程序,乙個程序可以執行1-n個task任務(spout,blot)

集群出現的問題以及解決辦法

集群節點宕機--物理問題

nimbus

​ 在交付任務給zk之後短時間內沒有更多實質性任務,此時nimbus可以閒置

​ 當supervisor 和nimbus同時宕機,此時nimbus需要重啟

​ 當客戶端推送新任務時,nimbus宕機,此時nimbus需要重啟

supervisor

​ 當supervisor宕機時,zk會通知nimbus,再重新將任務分配給其他supervisor。當宕機的supervisor恢復後,會心跳到zk尋找自己之前的任務,如果任務被分配則執行快速自毀機制(殺死未完成原任務的程序),重新處於等待

worker

​ 當worker宕掉時,supervisor先發現,並重新啟動worker,如果長時間啟動不了,zk會發現當前worker宕掉,此時zk會將宕掉的worker任務交由其他worker完成。當宕機的worker恢復後,會執行快速自毀機制(殺死未完成原任務的程序),重新處於等待

程序掛掉

​ 當程序掛掉時,由zk將該程序的任務重新分配新的程序

訊息的完整性

​ 將包裝的資料流做16位的二進位制streamid,每發生stream流互動時將前後的id進行異或運算,最終異或結果為0,則資料傳遞成功沒有故障;若傳遞故障則推送給上乙個節點重新傳送。

tips: 與 全1為1,有0則0 要求嚴格

​ 或 有1為1,全0為0 要求不嚴

​ 異或 相異為1,相同為0

ack無法保證資料不被重複計算,但是可以保證資料至少被正確使用一次

分布式遠端過程呼叫 —drpcserver在客戶端和storm之間承擔橋梁作用

​ 客戶端通過drpcserver將所需args傳送給spout,再和bolt進行計算後,經過resultbolt彙總後將結果交由drpcserver,最終由drpcserver將結果展示給客戶端

tops:drpcserver與spout、resultbolt互動時都帶有request-id用於分別不同客戶端需求

定義drpc拓撲:

方法1:

​ 通過lineardrpctopologybuilder (該方法也過期,不建議使用)

​ 該方法會自動為我們設定spout、將結果返回給drpc server等,我們只需要將topology實現

方法2:

​ 直接通過普通的拓撲構造方法topologybuilder來建立drpc拓撲

​ 需要手動設定好開始的drpcspout以及結束的returnresults

集群的分布式訊息佇列系統,遵循先進先出,資料有序原則

角色: producers(生產者)、broker(快取**)、consumers(消費者)

​ storm處於消費者的地位

​ 在生產資料之前先建立topic,在topic下面生產資料,生產出同乙個主題的東西可以根據實際的業務需求放到不同的partitions(分割槽)上,在同乙個分割槽上生產和消費的順序是有序的(依據偏移量)

​ 架構原理:kafka給producer和consumer提供註冊的介面,資料從producer傳送到broker,broker承擔乙個中間快取和分發的作用,負責分發註冊到系統中的consumer。

​ 乙個分割槽屬於乙個broker,broker是分割槽的master

​ zk管理broker

集群規劃:

zookeeper集群共三颱伺服器,分別為:node1、node2、node3。

kafka集群共三颱伺服器,分別為:node1、node2、node3。

1、zookeeper集群準備

​ kafka是乙個分布式訊息佇列,需要依賴zookeeper,請先安裝好zk集群。

​ zookeeper集群安裝步驟略。

2、安裝kafka

​ 解壓:

​ tar zxvf kafka_2.10-0.9.0.1.tgz -c /opt/

​ mv kafka_2.10-0.9.0.1/ kafka

修改配置檔案:config/server.properties

核心配置引數說明:

​ broker.id: broker集群中唯一標識id,0、1、2、3依次增長(broker即kafka集群中的一台伺服器)

注:當前kafka集群共三颱節點,分別為:node1、node2、node3。對應的broker.id分別為0、1、2。

zookeeper.connect: zk集群位址列表

將當前node1伺服器上的kafka目錄複製到其他node2、node3伺服器上:

​ scp -r /opt/kafka/ node2:/opt

​ scp -r /opt/kafka/ node3:/opt

修改node2、node3上kafka配置檔案中的broker.id(分別在node2、3伺服器上執行以下命令修改broker.id)

​ sed -i -e 『s/broker.id=.*/broker.id=1/』 /opt/kafka/config/server.properties

​ sed -i -e 『s/broker.id=.*/broker.id=2/』 /opt/kafka/config/server.properties

3、啟動kafka集群

​ a、啟動zookeeper集群。

​ b、啟動kafka集群。

分別在三颱伺服器上執行以下命令啟動:

​ bin/kafka-server-start.sh config/server.properties

4、測試

建立話題(kafka-topics.sh --help檢視幫助手冊)

建立topic:

​ bin/kafka-topics.sh --zookeeper sxt002:2181,sxt003:2181,sxt004:2181 --create --replication-factor 2 --partitions 3 --topic test

(引數說明:

–replication-factor*:指定每個分割槽的複製因子個數,預設1個*

–partitions*:指定當前建立的kafka分割槽數量,預設為1個*

–topic*:指定新建topic的名稱)*

檢視topic列表:

​ bin/kafka-topics.sh --zookeeper node4:2181,node2:2181,node3:2181 --list

檢視「test」topic描述:

bin/kafka-topics.sh --zookeeper node4:2181,node2:2181,node3:2181 --describe --topic test

建立生產者:

​ bin/kafka-console-producer.sh --broker-list sxt002:9092,sxt003:9092,sxt004:9092 --topic test2

建立消費者:

​ bin/kafka-console-consumer.sh --zookeeper node4:2181,node2:2181,node3:2181 --from-beginning --topic test

檢視幫助手冊:

​ bin/kafka-console-consumer.sh help

Storm容錯機制

1.bolt任務crash引起的訊息未被應答。此時,acker中所有與此bolt任務關聯的訊息都會因為超時而失敗,對應的spout的fail方法將被呼叫。2.acker任務失敗。如果acker任務本身失敗了,它在失敗之前持有的所有訊息都將超時而失敗。spout的fail方法將被呼叫。3.spout任...

Storm訊息容錯機制(ack fail機制)

storm訊息容錯機制 ack fail 1 介紹 給每個tuple指定 id告訴 storm 系統,無論處理成功還是失敗,spout 都要接收 tuple 樹上所有節點返回的通知。如果處理成功,spout 的ack 方法將會對編號是 msgid 的訊息應答確認 如果處理失敗或者超時,會呼叫 fai...

Spark Spark容錯機制

一般來說,分布式資料集的容錯性有兩種方式 資料檢查點和記錄資料的更新。面向大規模資料分析,資料檢查點操作成本很高,需要通過資料中心的網路連線在機器之間複製龐大的資料集,而網路頻寬往往比記憶體頻寬低得多,同時還需要消耗更多的儲存資源。因此,spark選擇記錄更新的方式。但是,如果更新粒度太細太多,那麼...