Storm 分發策略 與Kafka整合

2022-06-29 05:45:11 字數 3036 閱讀 2832

storm當中的分組策略,一共有八種:

所謂的grouping策略就是在spout與bolt、bolt與bolt之間傳遞tuple的方式。總共有八種方式:

1)shufflegrouping(隨機分組)隨機分組;將tuple隨機分配到bolt中,能夠保證各task中處理的資料均衡;

2)fieldsgrouping(按照字段分組,在這裡即是同乙個單詞只能傳送給乙個bolt)

按欄位分組; 根據設定的字段相同值得tuple被分配到同乙個bolt進行處理;

舉例:builder.setbolt("mybolt", new mystorebolt(),5).fieldsgrouping("checkbolt",new fields("uid"));

說明:該bolt由5個任務task執行,相同uid的元組tuple被分配到同乙個task進行處理;該task接收的元祖欄位是mybolt發射出的字段資訊,不受uid分組的影響。

該分組不僅方便統計而且還可以通過該方式保證相同uid的資料儲存不重複(uid資訊寫入資料庫中唯一);

3)allgrouping(廣播傳送,即每乙個tuple,每乙個bolt都會收到)廣播傳送:所有bolt都可以收到該tuple

4)globalgrouping(全域性分組,將tuple分配到task id值最低的task裡面)全域性分組:tuple被傳送給bolt的同乙個並且最小task_id的任務處理,實現事務性的topology

5)nonegrouping(隨機分派)不分組:效果等同於shuffle grouping.

6)directgrouping(直接分組,指定tuple與bolt的對應傳送關係)

直接分組:由tuple的發射單元直接決定tuple將發射給那個bolt,一般情況下是由接收tuple的bolt決定接收哪個bolt發射的tuple。這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者指定由訊息接收者的哪個task處理這個訊息。 只有被宣告為direct stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitdirect方法來發射。訊息處理者可以通過topologycontext來獲取處理它的訊息的taskid (outputcollector.emit方法也會返回taskid)。

7)local or shuffle grouping本地或者隨機分組,優先將資料傳送到本機的處理器executor,如果本機沒有對應的處理器,那麼再傳送給其他機器的executor,避免了網路資源的拷貝,減輕網路傳輸的壓力

8)customgrouping(自定義的grouping)

第一步:匯入jar包

org.apache.storm

storm-kafka-client

1.1.3

org.apache.kafka

kafka-clients

1.0.0

org.apache.storm

storm-core

1.1.3

provided

第二步:編寫我們的主函式入口程式

publicclasskafkstormtopo

}第三步:開發我們的printlnbolt作為訊息處理

public class printlnbolt extends baserichbolt 

@override

public void execute(tuple input)

@override

public void declareoutputfields(outputfieldsdeclarer outputfieldsdeclarer)

}

建立topickafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic four

建立生產者

kafka-console-producer.sh --broker-list hadoop-001:9092,hadoop-002:9092,hadoop-003:9092 --topic four 

傳送訊息與接收訊息

傳送訊息

接收訊息

setfirstpolloffsetstrategy:允許你設定從**開始消費資料. 這在故障恢復和第一次啟動spout的情況下會被使用

earliest :無論之前的消費情況如何,spout會從每個kafka partition能找到的最早的offset開始的讀取

latest :無論之前的消費情況如何,spout會從每個kafka partition當前最新的offset開始的讀取

uncommitted_earliest

(預設值) :spout 會從每個partition的最後一次提交的offset開始讀取. 如果offset不存在或者過期, 則會依照 earliest進行讀取.

uncommitted_latest:spout 會從每個partition的最後一次提交的offset開始讀取, 如果offset不存在或者過期, 則會依照 latest進行讀取

Storm的訊息分發策略

1.shuffle grouping 隨機分組 tuple會被隨機分發到所有bolt,每個bolt會得到相同數量的tuple,使得負載均衡。2.fields grouping 按欄位分組 按field分發,只能傳送給相同field的bolt。例如 builder.setbolt mybolt new...

Storm學習記錄(二 分發策略與架構)

shuffle grouping 隨機分組,隨機派發 stream 裡面的tuple 保證每個 bolt task 接收到的 tuple 數目大致相同。輪詢,平均分配 fields grouping 按欄位分組,比如,按 user id 這個欄位來分組,那麼具有同樣 user id 的 tuple ...

kafka 生產者訊息分發策略

訊息是kafka中最基本的額資料單元,在kafka中,一條訊息由key value兩部分構成,在傳送一條訊息時,我們可以指定這個key,producer會根據key來判斷當前這條訊息應該 路由儲存到哪個partition。預設情況下,kafka採用的是對key進行hash取模計算出分割槽。如果key...