採集檔案到hdfs
採集需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去
根據需求,首先定義以下3大要素
配置檔案編寫
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
mkdir -p /export/servers/dirfile
vim spooldir.conf
# name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# describe/configure the source
##注意:不能往監控目中重複丟同名檔案
a1.sources.r1.type = spooldir
a1.sources.r1.spooldir = /export/servers/dirfile
a1.sources.r1.fileheader =
true
# describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%h%m/
a1.sinks.k1.hdfs.fileprefix = events-
a1.sinks.k1.hdfs.round =
true
a1.sinks.k1.hdfs.roundvalue = 10
a1.sinks.k1.hdfs.roundunit = minute
a1.sinks.k1.hdfs.rollinterval = 3
a1.sinks.k1.hdfs.rollsize = 20
a1.sinks.k1.hdfs.rollcount = 5
a1.sinks.k1.hdfs.batchsize = 1
a1.sinks.k1.hdfs.uselocaltimestamp =
true
#生成的檔案型別,預設是sequencefile,可用datastream,則為普通文字
a1.sinks.k1.hdfs.filetype = datastream
# use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactioncapacity = 100
# bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
channel引數解釋:
capacity:預設該通道中最大的可以儲存的event數量
trasactioncapacity:每次最大可以從source中拿到或者送到sink中的event數量
keep-alive:event新增到通道中或者移出的允許時間
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -dflume.root.logger=info,console
將不同的檔案上傳到下面目錄裡面去,注意檔案不能重名
cd /export/servers/dirfile
採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs
根據需求,首先定義以下3大要素
node03開發配置檔案
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim tail-file.conf
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# describe/configure tail -f source1
agent1.sources.source1.type =
exec
agent1.sources.source1.command =
tail -f /export/servers/taillogs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
#agent1.sources.source1.interceptors = i1
#agent1.sources.source1.interceptors.i1.type = host
#agent1.sources.source1.interceptors.i1.hostheader = hostname
# describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%h-%m
agent1.sinks.sink1.hdfs.fileprefix = access_log
agent1.sinks.sink1.hdfs.maxopenfiles = 5000
agent1.sinks.sink1.hdfs.batchsize= 100
agent1.sinks.sink1.hdfs.filetype = datastream
agent1.sinks.sink1.hdfs.writeformat =text
agent1.sinks.sink1.hdfs.rollsize = 102400
agent1.sinks.sink1.hdfs.rollcount = 1000000
agent1.sinks.sink1.hdfs.rollinterval = 60
agent1.sinks.sink1.hdfs.round =
true
agent1.sinks.sink1.hdfs.roundvalue = 10
agent1.sinks.sink1.hdfs.roundunit = minute
agent1.sinks.sink1.hdfs.uselocaltimestamp =
true
# use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactioncapacity = 600
# bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -dflume.root.logger=info,console
mkdir -p /export/servers/shells/
cd /export/servers/shells/
vim tail-file.sh
#!/bin/bash
while
true
dodate
>> /export/servers/taillogs/access_log;
sleep 0.5;
done
建立資料夾
mkdir -p /export/servers/taillogs
啟動指令碼
sh /export/servers/shells/tail-file.sh
Flask Celery 基本案例 01
win10 python3.7 flask 1.1.1 celery 4.4.1 virtualenv venv no site packages 啟用虛擬環境 venv scripts activate pip install flask 1.1.1 celery 4.4.1 redis 安裝ge...
shell指令碼案例
bin bash for迴圈的使用 for num in 1 2 3 4 5 6 do echo num done bin bash a whoami read p 請輸入想要驗證的使用者名稱 b if b a then echo 是當前使用者 else echo 不是當前使用者,需示警 fi bi...
shell指令碼案例賞析
bin bash 用法 rebatch.sh 截止到月份的日期 例如 rebatch.sh 2014 06 etc profile bashrc arg 1 start date 01 end date 01 count 1 cat dev null log file db2 o connect t...