原創 2023年06月05日 16:30:15
程式的pom.xml檔案
org.apache.stormgroupid>
storm-coreartifactid>
1.0.2version>
providedscope>
dependency>
org.apache.stormgroupid>
storm-kafkaartifactid>
1.0.2version>
dependency>
org.apache.kafkagroupid>
kafka_2.10artifactid>
0.8.2.0version>
dependency>
log4jgroupid>
log4jartifactid>
1.2.14version>
dependency>
org.slf4jgroupid>
log4j-over-slf4jartifactid>
1.7.21version>
dependency>
dependencies>
3.spout**
public
class
mykafkaspout catch (alreadyaliveexception e) catch (invalidtopologyexception e)
}else
}}
4.bolt**,這裡為了簡化,只把資料列印出來
public
class
mykafkabolt
implements
ibasicbolt
@override
public
void
execute(tuple input, basicoutputcollector collector)
@override
public
void
prepare(map stormconf, topologycontext context)
@override
public
void
declareoutputfields(outputfieldsdeclarer declarer)
@override
public mapgetcomponentconfiguration()
}
5.如何確定spoutconfig中的zkroot,檢視kafka中的server.properties檔案,如果zookeeper.connect後面沒有跟/bc這種就是,直接為」「,否則zkroot為bc,就類似於zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181
/bc
# zookeeper connection string (see zookeeper docs for details).
# this is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
to the urls to specify the
# root directory for
all kafka znodes.
zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181
6.開始任務後,嘗試往kafka中寫入資料,資料就能馬上被storm所消費。 Storm 實時性分析
都說storm是乙個實時流處理系統,但storm的實時性體現在什麼方面呢?首先有乙個前提 這裡的實時性和我們通常所說的實時系統 晶元 彙編或c編寫的實時處理軟體 的實時性肯定是沒法比的,也不是同乙個概念。這裡的實時性應該是乙個相對的實時性 相對於hadoop之類 從網上找了一些資料 總結一下,sto...
kafka實時監控
在kafka的開發和維護中,我們經常需要了解kafka topic以及連線在其上的consumer的實時資訊,比如logsize,offset,owner等。為此kafka提供了consumeroffsetchecker,它的用法很簡單 bin kafka run class.sh kafka.to...
kafka消費原理
consumer 採用 pull 拉 模式從 broker 中讀取資料。push 推 模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pul...