使用Storm實現WordCount

2021-09-24 17:17:30 字數 2261 閱讀 4935

這裡用到的bolt可能會多一些,乙個spout負責推送資料,乙個bolt負責切詞,再來乙個bolt負責統計。

最關鍵的是,相同的單詞應該交給同乙個bolt來處理,分發策略的選用就得嚴謹一些了,依據分發的單詞來分發(field)。

這個類就負責將準備的資料向後傳送,除此之外,什麼都不做。

public

class

wcspout

extends

baserichspout

; random r=

newrandom()

;@override

public

void

open

(map conf, topologycontext context, spoutoutputcollector collector)

/** * 隨機向後傳送每一行字串

* */

@override

public

void

nexttuple()

@override

public

void

declareoutputfields

(outputfieldsdeclarer declarer)

}

這個類負責對上乙個spout傳遞過來的資料進行切分,並將處理結果繼續向後傳送。需要注意的是,我們接受到的就是乙個單詞,我們還要指定向後傳送的資料的欄位名稱。

public class wcsplitbolt extends baserichbolt

/*** 獲取tuple中每一行資料,並切割

* @param input

*/@override

public void execute(tuple input)

}/**

* 繼續向後宣告

*/@override

public void declareoutputfields(outputfieldsdeclarer declarer)

}

2.3 wccountbolt

這個類負責統計單詞個數,使用指定的欄位名稱來接受,類似於android中intent傳值。使用map來存放資料、做判斷,最後求得結果。

public

class

wccountbolt

extends

baserichbolt

/** * 獲取tuple中每乙個單詞,且按照單詞統計輸出出現的次數

*/@override

public

void

execute

(tuple input)

map.

put(word, count)

;//輸出該單詞、出現次數

system.out.

println

(word+

"出現了***************"

+count);}

@override

public

void

declareoutputfields

(outputfieldsdeclarer declarer)

}

2.4 測試類

需要注意的是,最後一次的分組,不能按照shufflegrouping了,只能採用fieldsgrouping策略,將相同的詞分到一組中。並且對args進行了判斷,使測試類不但能在本地執行,還可以提交到storm集群中執行。

public

class

test

catch

(alreadyaliveexception

| invalidtopologyexception e)

}else

}}

打成jar包,上傳到集群中。

本地模式執行jar:# storm jar wc.jar com.husky.storm.wc.text(包名+類名)

集群模式執行:# storm jar wc.jar com.husky.storm.wc.text wc(包名+類名,這個wc是隨便起的名字,只要有這個wc,就證明args陣列裡有值,走的就是集群提交的方式)

如果沒有配置環境變數,就cd到bin目錄下,執行的是./bin/storm jar ~/wc.jar com.husky.storm.wc.text

成功之後,重新整理web頁面,就有了乙個worker,多出來的乙個執行緒是acker訊息完整性保障線程在執行

使用Storm實現WordSum

3.2 mybolt 類 3.3 測試類 spout類extends baserichspout,baserichspout extends basecomponent implements irichspout,irichspout extends ispout 分析ispout幾個方法 1 vo...

storm使用範例

此案例實現從陣列中隨機讀取字串傳送到bolt,bolt將字串變成大寫傳送到下乙個bolt,bolt將字串加上時間戳然後寫到檔案中 public class randomwordspout extends baserichspout 初始化方法,在spout元件例項化時呼叫一次 override pu...

storm 使用streamid的例子

有時同乙個spout或者bolt需要發出多類不同的訊息。如對乙個字串拆分為單詞後,將各單詞一一傳送給各節點,傳送完後再傳送一條結束的任務。使用方法如下 1.spout的declareoutputfields方法中定義多組stream override publicvoid declareoutput...