1.本地除錯
a.步驟:生成topology——實現spout介面——實現bolt介面——編譯執行
b.加入依賴
<
dependency
>
<
groupid
>com.alibaba.jstorm
groupid
>
<
artifactid
>jstorm-core
artifactid
>
<
version
>2.1.1
version
>
<
exclusions
>
<
exclusion
>
<
groupid
>org.slf4j
groupid
>
<
artifactid
>slf4j-jdk14
artifactid
>
exclusion
>
<
exclusion
>
<
groupid
>org.slf4j
groupid
>
<
artifactid
>slf4j-nop
artifactid
>
exclusion
>
exclusions
>
dependency
>
注意:jstorm依賴包類含有slf4j包,可能與log4j依賴包中的slf4j包衝突,所有要掉衝突的依賴包
c.新建topology類
//建立topology的生成器
topologybuilder builder = new
topologybuilder();
//建立spout,第乙個引數為名字,注意不要含有空格,第二個引數為spout實體
builder.setspout("wordreader",new
wordreader());
//建立bolt,第乙個引數為名字,第二個引數為bolt實體,第三個引數為bolt的併發數
//shufflegrouping表示以隨機的方式接受"wordreader"傳來的資料
builder.setbolt("wordnormalizer", new wordnormalizer()).shufflegrouping("wordreader");
//fieldsgrouping表示以欄位分組的方式接受"wordnormalizer"傳來額資料
builder.setbolt("wordcounter", new wordcounter(), 2).fieldsgrouping("wordnormalizer", new fields("word"));
//配置
config conf = new
config();
conf.put("wordsfile", "e:/test.txt");
conf.setdebug(
false
);
//設定並行數
conf.put(config.topology_max_spout_pending, 1);
//建立乙個本地模式cluster
localcluster cluster = new
localcluster();
//提交拓撲
cluster.submittopology("sequencetest", conf, builder.createtopology());
try
catch
(interruptedexception e)
//結束拓撲
cluster.killtopology("sequencetest");
cluster.shutdown();
d.實現spout介面
publicclass wordreader implements
irichspout
@override
public
void
activate()
@override
public
void
close()
@override
public
void
deactivate()
@override
public
void
fail(object taskid)
@override
public
void
nexttuple()
catch
(interruptedexception e)
return
;
}
string str;
//open the reader
bufferedreader reader = new
bufferedreader(filereader);
try
} catch
(exception e)
finally
}@override
public
void
open(map conf, topologycontext context, spoutoutputcollector collector)
catch
(filenotfoundexception e)
//初始化發射器
this.collector =collector;
}@override
public
void
declareoutputfields(outputfieldsdeclarer declarer)
@override
public mapgetcomponentconfiguration()
e.實現bolt介面
publicclass wordnormalizer implements
irichbolt
@override
public
void
execute(tuple input)
}
//確認成功處理乙個tuple
collector.ack(input);
}@override
public
void
prepare(map stormconf, topologycontext context, outputcollector collector)
@override
public
void
declareoutputfields(outputfieldsdeclarer declarer)
@override
public mapgetcomponentconfiguration()
}
publicclass wordcounter implements
irichbolt
counters.clear();
}@override
public
void
execute(tuple input)
else
//確認成功處理乙個tuple
collector.ack(input);
}@override
public
void
prepare(map stormconf, topologycontext context, outputcollector collector)
@override
public
void
declareoutputfields(outputfieldsdeclarer arg0)
@override
public mapgetcomponentconfiguration()
}
實時流式計算是什麼 實時流式計算運用領域
實時流式計算,也就是realtime,streaming,analyse,在不同的領域有不同的定義,這裡我們說的是大資料領域的實時流式計算。實時流式計算,或者是實時計算,流式計算,在大資料領域都是差不多的概念。那麼,到底什麼是實時流式計算呢?谷歌大神tyler akidau在 the world b...
流式計算框架
s4s4會將資料裡的每一條記錄包裝成event事件,每個事件是乙個kv對,同時有eventtype來標示這個事件的型別。pe是s4中的基本運算單元。每個pe只負責處理自己所關心的eventtype,並且只處理自己所對應的key值的event。pe處理後可能輸出乙個或多個event。就像hadoop上...
高大上的介紹實時流式計算!
實時流式計算,也就是realtime,streaming,analyse,在不同的領域有不同的定義,這裡我們說的是大資料領域的實時流式計算。實時流式計算,或者是實時計算,流式計算,在大資料領域都是差不多的概念。那麼,到底什麼是實時流式計算呢?谷歌大神tyler akidau在 the world b...