元件名稱
元件版本
flume
flume-ng-1.6.0-cdh5.7.0.tar.gz
zookeeper
zookeeper-3.4.5
kafka
kafka_2.11-0.10.0.0.tgz
zookeeper部署 參照第4部
flume的部署
#解壓[hadoop@hadoop001 soft]$ cd ~/soft
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ vim config/server.properties
#新增環境變數
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ vim ~/.bash_profile
export path=$kafka_home/bin:$path
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ source ~/.bash_profile
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ which kafka-topics.sh
#啟動[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ bin/kafka-server-start.sh config/server.properties
#測試:建立topic
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wsk_test
#測試:顯示topic列表
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
#測試:控制台生產者
[hadoop@hadoop001 kafka_2.
11-0.10.0.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wsk_test
#測試:控制台消費者
使用flume的taildir source採集資料傳送到kafka以及hdfs。具體配置如下:
taildir-hdfsandkafka-agnet.sources = taildir-sourcetaildir-hdfsandkafka-agnet.channels =c1 c2
taildir-hdfsandkafka-agnet.sinks = hdfs-sink kafka-sink
taildir-hdfsandkafka-agnet.sources.taildir-source.type =taildir
taildir-hdfsandkafka-agnet.sources.taildir-source.filegroups =f1
taildir-hdfsandkafka-agnet.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/hdfsandkafka/input/.*taildir-hdfsandkafka-agnet.sources.taildir-source.positionfile = /home/hadoop/data/flume/hdfsandkafka/taildir_position/taildir_position.json
taildir-hdfsandkafka-agnet.sources.taildir-source.selector.type =replicating
taildir-hdfsandkafka-agnet.channels.c1.type =memory
taildir-hdfsandkafka-agnet.channels.c2.type =memory
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.type =hdfs
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.path = hdfs://
hadoop001:9000/flume/hdfsandkafka/%y%m%d%h%m
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.uselocaltimestamp=true
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.fileprefix = wsktest-taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollinterval = 10
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollsize = 100000000
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollcount = 0
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.filetype=datastream
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.writeformat=text
taildir-hdfsandkafka-agnet.sinks.kafka-sink.type =org.apache.flume.sink.kafka.kafkasink
taildir-hdfsandkafka-agnet.sinks.kafka-sink.brokerlist = localhost:9092
taildir-hdfsandkafka-agnet.sinks.kafka-sink.topic =wsk_test
taildir-hdfsandkafka-agnet.sources.taildir-source.channels =c1 c2
taildir-hdfsandkafka-agnet.sinks.hdfs-sink.channel =c1
taildir-hdfsandkafka-agnet.sinks.kafka-sink.channel = c2
flume-ng agent \--name taildir-hdfsandkafka-agnet \
--conf $flume_home/conf \
--conf-file $flume_home/conf/taildir-hdfsandkafka-agnet.conf \
-dflume.root.logger=info,console
flume 如何使用flume將檔案存到hdfs
一 如何使用flume將檔案存到hdfs 簡單例子 set name agent1.sources source1 agent1.channels channel1 agent1.sinks sink1 link sources and sinks agent1.sources.source1.ch...
kafka和flume的對比
摘要 1 kafka和flume都是日誌系統。kafka是分布式訊息中介軟體,自帶儲存,提供push和pull訪問資料功能。flume分為agent 資料採集器 collector 資料簡單處理和寫入 storage 儲存器 三部分,每一部分都是可以定製的。比如agent採用 rpc thrift ...
storm流程 flume和kafka的連線
flume和kafka的連線參考部落格 flume,kafka,storm,mysql的整合 相關資源在這flume2kafka相關jar包及配置檔案 若想連線起flume和kafka,需要在flume conf目錄下,建立乙個.conf檔案,在lib目錄下新增相關jar包。步驟 1.在flume ...