1) 建立配置檔案flume-dir.conf
#1.定義agent a3
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# 2.定義source
a3.sources.r3.type = spooldir
a3.sources.r3.spooldir = /opt/module/flume1.8.0/upload
a3.sources.r3.filesuffix = .completed
a3.sources.r3.fileheader = true
#忽略所有以.tmp結尾的檔案,不上傳
a3.sources.r3.ignorepattern = ([^ ]*\.tmp)
# 3.sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%h
#上傳檔案的字首
a3.sinks.k3.hdfs.fileprefix = upload-
#是否按照時間滾動資料夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立乙個新的資料夾
a3.sinks.k3.hdfs.roundvalue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundunit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.uselocaltimestamp = true
#積攢多少個event才flush到hdfs一次
a3.sinks.k3.hdfs.batchsize = 100
#設定檔案型別,可支援壓縮
a3.sinks.k3.hdfs.filetype = datastream
#多久生成乙個新的檔案
a3.sinks.k3.hdfs.rollinterval = 600
#設定每個檔案的滾動大小大概是128m
a3.sinks.k3.hdfs.rollsize = 134217728
#檔案的滾動與event數量無關
a3.sinks.k3.hdfs.rollcount = 0
#最小副本數
a3.sinks.k3.hdfs.minblockreplicas = 1
#4.定義channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactioncapacity = 100
#5.鏈結
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2) 執行測試:執行如下指令碼後,請向upload資料夾中新增檔案試試
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a3 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf
啟動如圖
測試結果如下:
flume實時讀取檔案到kafka
背景 需要實時讀取log日誌檔案中的記錄到kafka 1.zookeeper服務需要開啟,檢視zookeeper的狀態,zookeeper的安裝及啟動過程可檢視 root master kafka 2.11 0.11 opt soft zookeeper 3.4.13 bin zkserver.sh...
flume監控目錄檔案到hdfs
agent1 name agent1.sources source1 agent1.sinks sink1 agent1.channels channel1 spooling directory set source1 agent1.sources source1.type spooldir age...
05案例 實時讀取目錄檔案
實時讀取目錄檔案到hdfs案例 案例需求 使用flume監聽整個目錄的檔案 1.建立配置檔案 flume dir hdfsconf touch flume dir hdfs.conf vim flume dir hdfs.conf a3.sources r3 a3.sinks k3 a3.chann...