1.配置檔案編寫(在flume下新建tail-hdfs.conf)
pooldir:flume中自帶的讀取目錄的source,只要出現新檔案就會被讀走
#定義三大元件的名稱
ag1.sources = source1
ag1.sinks = sink1
ag1.channels = channel1
# 配置source元件
ag1.sources.source1.type = spooldir
ag1.sources.source1.spooldir = /root/log/
ag1.sources.source1.filesuffix=.finished
ag1.sources.source1.deserializer.maxlinelength=5129
# 配置sink元件 把讀到的檔案存放到哪個地方
ag1.sinks.sink1.type = hdfs
ag1.sinks.sink1.hdfs.path =hdfs://hdp-1:9000/access_log/%y-%m-%d/%h-%m
ag1.sinks.sink1.hdfs.filesuffix = .log
ag1.sinks.sink1.hdfs.batchsize= 100
ag1.sinks.sink1.hdfs.filetype = datastream
ag1.sinks.sink1.hdfs.writeformat = text
## roll:滾動切換:控制寫檔案的切換規則,sink在hdfs中生成檔案的時候
ag1.sinks.sink1.hdfs.rollsize = 512000
ag1.sinks.sink1.hdfs.rollcount = 1000000
ag1.sinks.sink1.hdfs.rollinterval = 60
## 控制生成目錄的規則 目錄多久切一次
ag1.sinks.sink1.hdfs.round =
true
ag1.sinks.sink1.hdfs.roundvalue = 10
ag1.sinks.sink1.hdfs.roundunit = minute
ag1.sinks.sink1.hdfs.uselocaltimestamp =
true
# channel元件配置
ag1.channels.channel1.type = memory
ag1.channels.channel1.capacity = 500000
ag1.channels.channel1.transactioncapacity = 600
# 繫結source、channel和sink之間的連線
ag1.sources.source1.channels = channel1
ag1.sinks.sink1.channel = channel1#spooldir:flume中自帶的讀取目錄的source,只要出現新檔案就會被讀走
#定義三大元件的名稱
ag1.sources = source1
ag1.sinks = sink1
ag1.channels = channel1
# 配置source元件
ag1.sources.source1.type = spooldir
ag1.sources.source1.spooldir = /root/log/
ag1.sources.source1.filesuffix=.finished
ag1.sources.source1.deserializer.maxlinelength=5129
# 配置sink元件 把讀到的檔案存放到哪個地方
ag1.sinks.sink1.type = hdfs
ag1.sinks.sink1.hdfs.path =hdfs://hdp-1:9000/access_log/%y-%m-%d/%h-%m
ag1.sinks.sink1.hdfs.filesuffix = .log
ag1.sinks.sink1.hdfs.batchsize= 100
ag1.sinks.sink1.hdfs.filetype = datastream
ag1.sinks.sink1.hdfs.writeformat = text
## roll:滾動切換:控制寫檔案的切換規則,sink在hdfs中生成檔案的時候
ag1.sinks.sink1.hdfs.rollsize = 512000
ag1.sinks.sink1.hdfs.rollcount = 1000000
ag1.sinks.sink1.hdfs.rollinterval = 60
## 控制生成目錄的規則 目錄多久切一次
ag1.sinks.sink1.hdfs.round =
true
ag1.sinks.sink1.hdfs.roundvalue = 10
ag1.sinks.sink1.hdfs.roundunit = minute
ag1.sinks.sink1.hdfs.uselocaltimestamp =
true
# channel元件配置
ag1.channels.channel1.type = memory
ag1.channels.channel1.capacity = 500000
ag1.channels.channel1.transactioncapacity = 600
# 繫結source、channel和sink之間的連線
ag1.sources.source1.channels = channel1
ag1.sinks.sink1.channel = channel1
[root@hdp-1 flume-1.6.0]
# cat tail-hdfs.conf
ag1.sources = source1
ag1.sinks = sink1
ag1.channels = channel1
ag1.sources.source1.type =
exec
ag1.sources.source1.command =
tail -f /usr/local/nginx/logs/log.frame.access.log
ag1.sinks.sink1.type = hdfs
ag1.sinks.sink1.hdfs.path =hdfs://hdp-1:9000/access_log/%y-%m-%d/%h-%m
ag1.sinks.sink1.hdfs.filesuffix = .log
ag1.sinks.sink1.hdfs.batchsize= 100
ag1.sinks.sink1.hdfs.filetype = datastream
ag1.sinks.sink1.hdfs.writeformat = text
ag1.sinks.sink1.hdfs.rollsize = 512000
ag1.sinks.sink1.hdfs.rollcount = 1000000
ag1.sinks.sink1.hdfs.rollinterval = 60
ag1.sinks.sink1.hdfs.round =
true
ag1.sinks.sink1.hdfs.roundvalue = 10
ag1.sinks.sink1.hdfs.roundunit = minute
ag1.sinks.sink1.hdfs.uselocaltimestamp =
true
ag1.channels.channel1.type = memory
ag1.channels.channel1.capacity = 500000
ag1.channels.channel1.transactioncapacity = 600
ag1.sources.source1.channels = channel1
ag1.sinks.sink1.channel = channel1
2.在log中生成指令碼檔案makelog.sh
while
true
doecho
'00000000'
>> access.log
sleep 0.1
done
3.給makelog.sh增加可執行許可權
增加許可權命令:chmod +x makelog.sh
4.執行makelog.sh(sh makelog.sh)
模擬生成日誌資訊,用命令tail-f access.log跟蹤執行結果
5.啟動flume採集程式
在flume的bin目錄下:
./flume-ng agent -c …/conf/ -f …/tail-hdfs.conf -n ag1 -dflume.root.logger=info.console
6.flume採集檔案成功
tmp為臨時檔案
Flume採集檔案到HDFS
在flume和hadoop安裝好的情況下 1.遇到的坑 在安裝hadoop時,配置 core site.xml 檔案一定要注意。fs.defaultfs name hdfs master 9000 value property 上述的value值使用的是主機名稱 master 或者ip位址,不能使用...
Flume採集目錄及檔案到HDFS案例
使用flume採集目錄需要啟動hdfs集群 vi spool hdfs.conf name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 describe configure the source 注意 不...
Flume 採集資料到hdfs 小檔案優化
眾所周知,從flume採集資料到hdfs的時候,需要避免小檔案的產生,太多的小檔案,在資料處理的過程中,會降低資料處理的效能,那麼在日常的flume採集到hdfs的檔案,如果避免小檔案的產生?在flume的sink操作時,有涉及到3個預設的引數,分別是 a1.sinks.k1.hdfs.rollin...