這裡用到的bolt可能會多一些,乙個spout負責推送資料,乙個bolt負責切詞,再來乙個bolt負責統計。
最關鍵的是,相同的單詞應該交給同乙個bolt來處理,分發策略的選用就得嚴謹一些了,依據分發的單詞來分發(field)。
這個類就負責將準備的資料向後傳送,除此之外,什麼都不做。
public
class
wcspout
extends
baserichspout
; random r=
newrandom()
;@override
public
void
open
(map conf, topologycontext context, spoutoutputcollector collector)
/** * 隨機向後傳送每一行字串
* */
@override
public
void
nexttuple()
@override
public
void
declareoutputfields
(outputfieldsdeclarer declarer)
}
這個類負責對上乙個spout傳遞過來的資料進行切分,並將處理結果繼續向後傳送。需要注意的是,我們接受到的就是乙個單詞,我們還要指定向後傳送的資料的欄位名稱。
public class wcsplitbolt extends baserichbolt
/*** 獲取tuple中每一行資料,並切割
* @param input
*/@override
public void execute(tuple input)
}/**
* 繼續向後宣告
*/@override
public void declareoutputfields(outputfieldsdeclarer declarer)
}
2.3 wccountbolt
這個類負責統計單詞個數,使用指定的欄位名稱來接受,類似於android中intent傳值。使用map來存放資料、做判斷,最後求得結果。
public
class
wccountbolt
extends
baserichbolt
/** * 獲取tuple中每乙個單詞,且按照單詞統計輸出出現的次數
*/@override
public
void
execute
(tuple input)
map.
put(word, count)
;//輸出該單詞、出現次數
system.out.
println
(word+
"出現了***************"
+count);}
@override
public
void
declareoutputfields
(outputfieldsdeclarer declarer)
}
2.4 測試類
需要注意的是,最後一次的分組,不能按照shufflegrouping了,只能採用fieldsgrouping策略,將相同的詞分到一組中。並且對args進行了判斷,使測試類不但能在本地執行,還可以提交到storm集群中執行。
public
class
test
catch
(alreadyaliveexception
| invalidtopologyexception e)
}else
}}
打成jar包,上傳到集群中。
本地模式執行jar:# storm jar wc.jar com.husky.storm.wc.text(包名+類名)
集群模式執行:# storm jar wc.jar com.husky.storm.wc.text wc(包名+類名,這個wc是隨便起的名字,只要有這個wc,就證明args陣列裡有值,走的就是集群提交的方式)
如果沒有配置環境變數,就cd到bin目錄下,執行的是./bin/storm jar ~/wc.jar com.husky.storm.wc.text
成功之後,重新整理web頁面,就有了乙個worker,多出來的乙個執行緒是acker訊息完整性保障線程在執行
使用Storm實現WordSum
3.2 mybolt 類 3.3 測試類 spout類extends baserichspout,baserichspout extends basecomponent implements irichspout,irichspout extends ispout 分析ispout幾個方法 1 vo...
storm使用範例
此案例實現從陣列中隨機讀取字串傳送到bolt,bolt將字串變成大寫傳送到下乙個bolt,bolt將字串加上時間戳然後寫到檔案中 public class randomwordspout extends baserichspout 初始化方法,在spout元件例項化時呼叫一次 override pu...
storm 使用streamid的例子
有時同乙個spout或者bolt需要發出多類不同的訊息。如對乙個字串拆分為單詞後,將各單詞一一傳送給各節點,傳送完後再傳送一條結束的任務。使用方法如下 1.spout的declareoutputfields方法中定義多組stream override publicvoid declareoutput...