首先保證flume與kafka正確安裝並啟動,這個比較簡單,直接在ambari中新增新服務即可,不多贅述。
配置flume
新建一配置檔案kafka.conf, 編輯,追加一下內容。
#掃瞄指定檔案配置可以看到,flume是蒐集/root/test.log裡面追加的內容。所以保證/root/test.log檔案存在。hdp 集群kafka broker的預設埠是6667,而不是9092agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type=exec
agent.sources.s1.command=tail -f /root/test.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactioncapacity=100
#設定kafka接收器
agent.sinks.k1.type= org.apache.flume.sink.kafka.kafkasink
#設定kafka的broker位址和埠號
agent.sinks.k1.brokerlist=10.0.13.72:6667
#設定kafka的topic
agent.sinks.k1.topic=test
#設定序列化方式
agent.sinks.k1.serializer.class=kafka.serializer.stringencoder
agent.sinks.k1.channel=c1
kafka配置
通過flume的配置,連線kafka的topic是test。
新建乙個名為test的topic。
bin/kafka-topics.sh –create –zookeeper hortonworks02:2181,hortonworks03:2181,hortonworks04:2181 –replication-factor 1 –partitions 1 –topic test檢視一下。
bin/kafka-topics.sh –list –zookeeper hortonworks02:2181,hortonworks03:2181,hortonworks04:2181然後建立個消費者來消費test裡面的資料。bootstrap-server這裡填主機名或者ip,而不是localhost。
bin/kafka-console-consumer.sh –bootstrap-server hortonworks02:6667 –topic test –from-beginning啟動flume
bin/flume-ng agent –conf-file conf/kafka.conf -c conf/ –name agent -dflume.root.logger=debug,console追加內容到test.log
編輯test.log,隨意新增一些資訊
#vim test.logkafka中已經顯示出test.log新加的東西(只看最後三行即可,上面的是以前topic裡面的資訊)hhhhh
fsoifhdisfhsif
this is a message
ok!
Kafka與flume的整合
為我們的source channel sink起名 a1.sources r1 a1.channels c1 a1.sinks k1 指定我們的source收集到的資料傳送到哪個管道 a1.sources r1.channels c1 指定我們的source資料收集策略 a1.sources r1....
Flume和Kafka的區別與聯絡
目錄 區別點一 區別點二 同樣是流式資料採集框架,flume一般用於日誌採集,可以定製很多資料來源,減少開發量,基本架構是乙個flume程序agent source 選擇器 channel sink 其中傳遞的是原子性的event資料 使用雙層flume架構可以實現一層資料採集,一層資料集合 flu...
Kafka(一) Kafka集群的搭建與使用
kafka基本概念 kafka是乙個分布式的,分割槽的訊息 官方稱之為commit log 服務。它提供乙個訊息系統應該具備的功能,但是確有著獨特的設計。可以這樣來說,kafka借鑑了jms規範的思想,但是確並沒有完全遵循jms規範。首先,讓我們來看一下基礎的訊息 message 相關術語 因此,從...