實時流式計算框架 JStorm

2022-08-19 07:24:09 字數 4262 閱讀 9741

1.本地除錯

a.步驟:生成topology——實現spout介面——實現bolt介面——編譯執行

b.加入依賴

<

dependency

>

<

groupid

>com.alibaba.jstorm

groupid

>

<

artifactid

>jstorm-core

artifactid

>

<

version

>2.1.1

version

>

<

exclusions

>

<

exclusion

>

<

groupid

>org.slf4j

groupid

>

<

artifactid

>slf4j-jdk14

artifactid

>

exclusion

>

<

exclusion

>

<

groupid

>org.slf4j

groupid

>

<

artifactid

>slf4j-nop

artifactid

>

exclusion

>

exclusions

>

dependency

>

注意:jstorm依賴包類含有slf4j包,可能與log4j依賴包中的slf4j包衝突,所有要掉衝突的依賴包

c.新建topology類

//

建立topology的生成器

topologybuilder builder = new

topologybuilder();

//建立spout,第乙個引數為名字,注意不要含有空格,第二個引數為spout實體

builder.setspout("wordreader",new

wordreader());

//建立bolt,第乙個引數為名字,第二個引數為bolt實體,第三個引數為bolt的併發數

//shufflegrouping表示以隨機的方式接受"wordreader"傳來的資料

builder.setbolt("wordnormalizer", new wordnormalizer()).shufflegrouping("wordreader");

//fieldsgrouping表示以欄位分組的方式接受"wordnormalizer"傳來額資料

builder.setbolt("wordcounter", new wordcounter(), 2).fieldsgrouping("wordnormalizer", new fields("word"));

//配置

config conf = new

config();

conf.put("wordsfile", "e:/test.txt");

conf.setdebug(

false

);

//設定並行數

conf.put(config.topology_max_spout_pending, 1);

//建立乙個本地模式cluster

localcluster cluster = new

localcluster();

//提交拓撲

cluster.submittopology("sequencetest", conf, builder.createtopology());

try

catch

(interruptedexception e)

//結束拓撲

cluster.killtopology("sequencetest");

cluster.shutdown();

d.實現spout介面

public

class wordreader implements

irichspout

@override

public

void

activate()

@override

public

void

close()

@override

public

void

deactivate()

@override

public

void

fail(object taskid)

@override

public

void

nexttuple()

catch

(interruptedexception e)

return

;

}

string str;

//open the reader

bufferedreader reader = new

bufferedreader(filereader);

try

} catch

(exception e)

finally

}@override

public

void

open(map conf, topologycontext context, spoutoutputcollector collector)

catch

(filenotfoundexception e)

//初始化發射器

this.collector =collector;

}@override

public

void

declareoutputfields(outputfieldsdeclarer declarer)

@override

public mapgetcomponentconfiguration()

e.實現bolt介面

public

class wordnormalizer implements

irichbolt

@override

public

void

execute(tuple input)

}

//確認成功處理乙個tuple

collector.ack(input);

}@override

public

void

prepare(map stormconf, topologycontext context, outputcollector collector)

@override

public

void

declareoutputfields(outputfieldsdeclarer declarer)

@override

public mapgetcomponentconfiguration()

}

public

class wordcounter implements

irichbolt

counters.clear();

}@override

public

void

execute(tuple input)

else

//確認成功處理乙個tuple

collector.ack(input);

}@override

public

void

prepare(map stormconf, topologycontext context, outputcollector collector)

@override

public

void

declareoutputfields(outputfieldsdeclarer arg0)

@override

public mapgetcomponentconfiguration()

}

實時流式計算是什麼 實時流式計算運用領域

實時流式計算,也就是realtime,streaming,analyse,在不同的領域有不同的定義,這裡我們說的是大資料領域的實時流式計算。實時流式計算,或者是實時計算,流式計算,在大資料領域都是差不多的概念。那麼,到底什麼是實時流式計算呢?谷歌大神tyler akidau在 the world b...

流式計算框架

s4s4會將資料裡的每一條記錄包裝成event事件,每個事件是乙個kv對,同時有eventtype來標示這個事件的型別。pe是s4中的基本運算單元。每個pe只負責處理自己所關心的eventtype,並且只處理自己所對應的key值的event。pe處理後可能輸出乙個或多個event。就像hadoop上...

高大上的介紹實時流式計算!

實時流式計算,也就是realtime,streaming,analyse,在不同的領域有不同的定義,這裡我們說的是大資料領域的實時流式計算。實時流式計算,或者是實時計算,流式計算,在大資料領域都是差不多的概念。那麼,到底什麼是實時流式計算呢?谷歌大神tyler akidau在 the world b...