這個手冊,首先就給了wordcount的例子。wordcount就特麼這麼好理解麼,介面文件上說明少的掉渣啊。吐槽完畢,例子主要分為兩個步驟:1. 構建資料來源;2. 構建拓撲。
1. 創造源資料
效果:產生乙個源資料流。處理過程中至多可以被分成3個batch,batch的name是「sentence」。不停的將上面的4個句子迴圈形成資料流。fixedbatchspout spout = new fixedbatchspout(new fields("sentence"), 3,
new values("the cow jumped over the moon"),
new values("the man went to the store and bought some candy"),
new values("four score and seven years ago"),
2. 構建topology
每行依次的效果:tridenttopology topology = new tridenttopology();
tridentstate wordcounts =
topology.newstream("spout1", spout)
.each(new fields("sentence"), new
split(), new fields("word"))
.groupby(new fields("word"))
.persistentaggregate(new memorymapstate.factory(), new count(), new fields("count"))
1. 以spout為源建立流;
2. 遍歷流中name為「sentence」的batch,作為split函式的輸入,輸出name為「word」的tuple。
3. 對name為「world」的tuple做groupby操作;
4. 對groupby的結果,做count操作,count為內建聚合方法,結果儲存在記憶體中,存為name為」count」的tuple。每個batch的計算結果再進行聚合(疊加)。
5. 併發度為6,也就是最多允許6個執行緒。即executor的數量。
