flume和kafka的連線參考部落格:flume,kafka,storm,mysql的整合
相關資源在這flume2kafka相關jar包及配置檔案
若想連線起flume和kafka,需要在flume/conf目錄下,建立乙個.conf檔案,在lib目錄下新增相關jar包。
步驟:
1.在flume/conf目錄下建立相關.conf檔案,
(1)建立flume2kafka.conf檔案
vi flume2kafka.conf
############################################
# producer config
###########################################
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
#設定讀取方式
producer.sources
.s.type = exec
#設定讀取資料的位址及命令(tail -f -n+1 /home/storm/work/access.log)
producer.sources
.s.command = tail -f -n+1 /home/storm/work/access.log
producer.sources
.s.channels = c
# each sink's type must be defined
producer.sinks
.r.type = org.apache
.flume
.plugins
.kafkasink
producer.sinks
.r.metadata
.broker
.list=master:9092
producer.sinks
.r.partition
.key=0
producer.sinks
.r.partitioner
.class=org.apache
.flume
.plugins
.singlepartition
producer.sinks
.r.serializer
.class=kafka.serializer
.stringencoder
producer.sinks
.r.request
.required
.acks=0
producer.sinks
.r.max
.message
.size=1000000
producer.sinks
.r.producer
.type=sync
producer.sinks
.r.custom
.encoding=utf-8
#設定kafka的topic為:flume2kafka
producer.sinks
.r.custom
.topic
.name=flume2kafka
#specify the channel the sink should use
producer.sinks
.r.channel = c
# each channel's type is defined.
producer.channels
.c.type = memory
producer.channels
.c.capacity = 1000
2.在lib目錄下新增相關jar包
kafka_2.9
.2-0.8
.0-beta1.jar
metrics-annotation-2.2
.0.jar
scala-compiler-2.9
.2.jar
3.啟動該flume任務
bin/flume-ng agent -n producer -f conf/flume2kafka.conf -dflume
.root.logger=info,console >>logs/flume.
log2
>&
1&
4.啟動kafka及kafka的consumer任務(檢視是否有資料傳輸)
(1)啟動kafka
sbin/start-kafka.sh
(2)啟動consumer任務
bin/kafka
-console
-consumer.sh
--zookeeper
master:2181--
topic
flume2kafka--
from
-beginning
windows系統flume資料傳給kafka
1 安裝zookeeper 更改flume配置為kafka b.編輯系統變數中的path變數,增加 zookeeper home bin conf 新增檔案 zoo.cfg the number of milliseconds of each tick 心跳間隔 毫秒每次 ticktime 2000...
flume執行流程
flume執行流程 source執行channel的doput方法,將接受到的event先放入putlist裡面臨時快取起來,當到達一定的量 batchsize 的時候,執行docommit方法,將putlist中的event放入channel的queue queue的大小是capacity 中,當...
Hadoop元件Zookeeper和Kafka部署
解壓並改名 mv zookeeper 3.4.6 zookeeper 建立相關目錄,並修改許可權 root master zookeeper mkdir data root master zookeeper chmod 777 data 進入zookeeper目錄下的conf目錄修改zoo.cfg檔...