apache flume 常用的一些配置

2021-08-19 08:14:14 字數 3700 閱讀 3902

1.http source到hdfs sink(根據傳入json不同分配道不同hive表,兩種方法) 注:hive表只是hdfs乙個資料夾

建立多個通道

hdfs sink:(不同hdfs.path對應不同sink)

agent.sinks.kafka2hive_general.type = hdfs

agent.sinks.kafka2hive_general.hdfs.rollsize = 10485760

agent.sinks.kafka2hive_general.hdfs.rollinterval= 0

agent.sinks.kafka2hive_general.hdfs.rollcount= 0

agent.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/db.db/table

agent.sinks.kafka2hive_general.channel = memorychannel

agent.sinks.kafka2hive_general.hdfs.filetype = datastream

agent.sinks.kafka2hive_general.hdfs.writeformat = text

agent.sinks.kafka2hive_general.hdfs.idletimeout = 600

agent.sinks.kafka2hive_general.type = hdfs

agent.sinks.kafka2hive_general.hdfs.rollsize = 10485760

agent.sinks.kafka2hive_general.hdfs.rollinterval= 0

agent.sinks.kafka2hive_general.hdfs.rollcount= 0

agent.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/db.db/table1

agent.sinks.kafka2hive_general.channel = memorychannel1

agent.sinks.kafka2hive_general.hdfs.filetype = datastream

agent.sinks.kafka2hive_general.hdfs.writeformat = text

agent.sinks.kafka2hive_general.hdfs.idletimeout = 600

(2)第二種方法(將http傳遞的header帶上引數table_name,在sink時獲取value值)

curl -x post -d '[,  "body" : "random_body"  }]' localhost:9000

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /user/hive/warehouse/db.db/%

a1.sinks.k1.hdfs.fileprefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

2.kafka source 到hdfs sink

api_channel.sources  = kafka2hive_general

api_channel.channels = kafka2hive_general

api_channel.sinks = kafka2hive_general

api_channel.sources.kafka2hive_general.type = org.apache.flume.source.kafka.kafkasource

api_channel.sources.kafka2hive_general.zookeeperconnect = *******

api_channel.sources.kafka2hive_general.topic = ***

api_channel.sources.kafka2hive_general.groupid = ****

api_channel.sources.kafka2hive_general.channels = kafka2hive_general

api_channel.sources.kafka2hive_general.kafka.consumer.auto.offset.reset = smallest

api_channel.sources.kafka2hive_general.interceptors = i1

api_channel.sources.kafka2hive_general.interceptors.i1.type = regex_extractor

api_channel.sources.kafka2hive_general.interceptors.i1.regex = ^(\\w*),.*$

api_channel.sources.kafka2hive_general.interceptors.i1.serializers = extract

api_channel.sources.kafka2hive_general.interceptors.i1.serializers.extract.name = table_name

api_channel.channels.kafka2hive_general.type = memory

api_channel.channels.kafka2hive_general.capacity = 10000

api_channel.channels.kafka2hive_general.transactioncapacity = 5000

api_channel.channels.kafka2hive_general.keep-alive = 60

api_channel.sinks.kafka2hive_general.type = hdfs

api_channel.sinks.kafka2hive_general.hdfs.rollsize = 10485760

api_channel.sinks.kafka2hive_general.hdfs.rollinterval= 0

api_channel.sinks.kafka2hive_general.hdfs.rollcount= 0

api_channel.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/a.db/%/ds=%y-%m-%d-%h/

api_channel.sinks.kafka2hive_general.channel = kafka2hive_general

api_channel.sinks.kafka2hive_general.hdfs.filetype = datastream

api_channel.sinks.kafka2hive_general.hdfs.writeformat = text

api_channel.sinks.kafka2hive_general.hdfs.idletimeout = 600

3.其他一些比如監控資料夾的spool,監控檔案的exec tail和其他資料來源請看

4.重新定義source和sink的請看   和

Linux 常用的指令(一)

檢視記憶體 free 指令 檢視記憶體使用情況 其中當使引數 m時,顯示的單位是m 當使用的引數是 k 時,顯示的單位是k 例 free m 檢視硬碟使用情況 1 硬碟以及分割槽情況 fdisk 指令 例如 fdisk l 2 檢視檔案系統的磁碟空間占有情況 df指令 例如 df h 3 檢視硬碟的...

entityManager 的常用方法(一)

理解eneitymanager的這三個方法的作用和區別,首先需要分清楚persistencecontext 和 entitymanager.persistencecontext 是entity的乙個例項。entitymanager 是和persistencecontext聯絡在一起的,被用來建立 刪...

Linux常用的命令 一

1.伺服器之間檔案傳送 put usr relay relay.tar.gz usr pinlei put 原始檔 目標檔案 2.解壓縮檔案 壓縮指定資料夾 tar unzip zcvf usr relay relay.tar.gz usr relay tar unzip zcvf 壓縮後壓縮包路徑...