1、採集目錄到hdfs
採集需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去
根據需求,首先定義以下3大要素
採集源,即source——監控檔案目錄 : spooldir
下沉目標,即sink——hdfs檔案系統 : hdfs sink
source和sink之間的傳遞通道——channel,可用file channel 也可以用記憶體channel
配置檔案編寫:
#定義三大元件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 配置source元件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spooldir = /home/hadoop/logs/
agent1.sources.source1.fileheader = false
#配置***
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostheader = hostname
# 配置sink元件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%h-%m
agent1.sinks.sink1.hdfs.fileprefix = access_log
agent1.sinks.sink1.hdfs.maxopenfiles = 5000
agent1.sinks.sink1.hdfs.batchsize= 100
agent1.sinks.sink1.hdfs.filetype = datastream
agent1.sinks.sink1.hdfs.writeformat =text
agent1.sinks.sink1.hdfs.rollsize = 102400
agent1.sinks.sink1.hdfs.rollcount = 1000000
agent1.sinks.sink1.hdfs.rollinterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundvalue = 10
#agent1.sinks.sink1.hdfs.roundunit = minute
agent1.sinks.sink1.hdfs.uselocaltimestamp = true
# use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactioncapacity = 600
# bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
channel引數解釋:
capacity:預設該通道中最大的可以儲存的event數量
trasactioncapacity:每次最大可以從source中拿到或者送到sink中的event數量
keep-alive:event新增到通道中或者移出的允許時間
2、採集檔案到hdfs
採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs
根據需求,首先定義以下3大要素
採集源,即source——監控檔案內容更新 : exec 『tail -f file』
下沉目標,即sink——hdfs檔案系統 : hdfs sink
source和sink之間的傳遞通道——channel,可用file channel 也可以用 記憶體channel
配置檔案編寫:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# describe/configure tail -f source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostheader = hostname
# describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%h-%m
agent1.sinks.sink1.hdfs.fileprefix = access_log
agent1.sinks.sink1.hdfs.maxopenfiles = 5000
agent1.sinks.sink1.hdfs.batchsize= 100
agent1.sinks.sink1.hdfs.filetype = datastream
agent1.sinks.sink1.hdfs.writeformat =text
agent1.sinks.sink1.hdfs.rollsize = 102400
agent1.sinks.sink1.hdfs.rollcount = 1000000
agent1.sinks.sink1.hdfs.rollinterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundvalue = 10
agent1.sinks.sink1.hdfs.roundunit = minute
agent1.sinks.sink1.hdfs.uselocaltimestamp = true
# use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactioncapacity = 600
# bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
flume支援眾多的source和sink型別,詳細手冊可參考官方文件
Flume(03) Flume採集案例
需求分析 採集需求 某伺服器的某特定目錄 export servers dirfile下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去。需求分析 通過flume採集資料,最重要的就是配置三大元件。這裡可以通過source來監控檔案目錄。通過channel,來將source採集...
flume案例 網路資料採集 Flume的配置
開發配置檔案 根據資料採集的需求配置採集方案,描述在配置檔案中 檔名可任意自定義 配置我們的網路收集的配置檔案 在flume的conf目錄下新建乙個配置檔案 採集方案 vim export servers apache flume 1.8.0 bin conf netcat logger.conf ...
Flume採集目錄及檔案到HDFS案例
使用flume採集目錄需要啟動hdfs集群 vi spool hdfs.conf name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 describe configure the source 注意 不...