首先因為hive的儲存是基於hdfs的,所以目標等同於,flume監控檔案並上傳hdfs上
hive建表
create table test(name string,gender string)row format delimited fields terminated by ',';
flume配置檔案如下
監控檔案 /usr/local/hive.log
上傳路徑 hdfs://hadoop:9000/user/hive/warehouse/driver.db/test
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# describe/configure the source
a3.sources.r3.type = exec
a3.sources.r3.command = tail -f /usr/local/hive.log
# describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/hive/warehouse/driver.db/test
#上傳檔案的字首
a3.sinks.k3.hdfs.fileprefix = upload-
#是否按照時間滾動資料夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立乙個新的資料夾
a3.sinks.k3.hdfs.roundvalue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundunit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.uselocaltimestamp = true
#積攢多少個event才flush到hdfs一次
a3.sinks.k3.hdfs.batchsize = 100
#設定檔案型別,可支援壓縮
a3.sinks.k3.hdfs.filetype = datastream
#多久生成乙個新的檔案
a3.sinks.k3.hdfs.rollinterval = 30
#設定每個檔案的滾動大小大概是128m
a3.sinks.k3.hdfs.rollsize = 134217700
#檔案的滾動與event數量無關
a3.sinks.k3.hdfs.rollcount = 0
# use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactioncapacity = 100
# bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
啟動flume 監控
flume-ng agent --name a3 --conf $flume_home/conf --conf-file $flume_home/conf/flume-dir-hdfs.conf -dflume.root.logger=info,console
向檔案中寫入資料
[root@hadoop local]# echo lisi,nan >> hive.log
[root@hadoop local]# echo wangwu,nan >> hive.log
[root@hadoop local]# echo xiaoming,nan >> hive.log
[root@hadoop local]# echo xiaohong,nv >> hive.log
檔案通過flume匯入到kafka
a1 agent flume三大元件 source channel sink a1.sources f1 a1.channels c1 a1.sinks k1 檔案 a1.sources.f1.type spooldir a1.sources.f1.channels c1 將users.csv檔案備...
上傳Excel檔案並匯入到資料
上傳檔案 上傳excel檔案 protected void btnfileload click object sender,eventargs e catch exception ex else 匯入檔案 將excel中的檔案匯入到資料庫 protected void btninsertdata c...
通過flume把oracle資料匯入到kafka
版本flume 1.6 kafka2.11 第二步 我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar 第三步 配置flume的conf配置檔案。vi sql kafka.conf 具體配置如下 agenttest.channels ch...