1.http source到hdfs sink(根據傳入json不同分配道不同hive表,兩種方法) 注:hive表只是hdfs乙個資料夾
建立多個通道
hdfs sink:(不同hdfs.path對應不同sink)
agent.sinks.kafka2hive_general.type = hdfs
agent.sinks.kafka2hive_general.hdfs.rollsize = 10485760
agent.sinks.kafka2hive_general.hdfs.rollinterval= 0
agent.sinks.kafka2hive_general.hdfs.rollcount= 0
agent.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/db.db/table
agent.sinks.kafka2hive_general.channel = memorychannel
agent.sinks.kafka2hive_general.hdfs.filetype = datastream
agent.sinks.kafka2hive_general.hdfs.writeformat = text
agent.sinks.kafka2hive_general.hdfs.idletimeout = 600
agent.sinks.kafka2hive_general.type = hdfs
agent.sinks.kafka2hive_general.hdfs.rollsize = 10485760
agent.sinks.kafka2hive_general.hdfs.rollinterval= 0
agent.sinks.kafka2hive_general.hdfs.rollcount= 0
agent.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/db.db/table1
agent.sinks.kafka2hive_general.channel = memorychannel1
agent.sinks.kafka2hive_general.hdfs.filetype = datastream
agent.sinks.kafka2hive_general.hdfs.writeformat = text
agent.sinks.kafka2hive_general.hdfs.idletimeout = 600
(2)第二種方法(將http傳遞的header帶上引數table_name,在sink時獲取value值)
curl -x post -d '[, "body" : "random_body" }]' localhost:9000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/hive/warehouse/db.db/%
a1.sinks.k1.hdfs.fileprefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundvalue = 10
a1.sinks.k1.hdfs.roundunit = minute
2.kafka source 到hdfs sink
api_channel.sources = kafka2hive_general
api_channel.channels = kafka2hive_general
api_channel.sinks = kafka2hive_general
api_channel.sources.kafka2hive_general.type = org.apache.flume.source.kafka.kafkasource
api_channel.sources.kafka2hive_general.zookeeperconnect = *******
api_channel.sources.kafka2hive_general.topic = ***
api_channel.sources.kafka2hive_general.groupid = ****
api_channel.sources.kafka2hive_general.channels = kafka2hive_general
api_channel.sources.kafka2hive_general.kafka.consumer.auto.offset.reset = smallest
api_channel.sources.kafka2hive_general.interceptors = i1
api_channel.sources.kafka2hive_general.interceptors.i1.type = regex_extractor
api_channel.sources.kafka2hive_general.interceptors.i1.regex = ^(\\w*),.*$
api_channel.sources.kafka2hive_general.interceptors.i1.serializers = extract
api_channel.sources.kafka2hive_general.interceptors.i1.serializers.extract.name = table_name
api_channel.channels.kafka2hive_general.type = memory
api_channel.channels.kafka2hive_general.capacity = 10000
api_channel.channels.kafka2hive_general.transactioncapacity = 5000
api_channel.channels.kafka2hive_general.keep-alive = 60
api_channel.sinks.kafka2hive_general.type = hdfs
api_channel.sinks.kafka2hive_general.hdfs.rollsize = 10485760
api_channel.sinks.kafka2hive_general.hdfs.rollinterval= 0
api_channel.sinks.kafka2hive_general.hdfs.rollcount= 0
api_channel.sinks.kafka2hive_general.hdfs.path = /user/hive/warehouse/a.db/%/ds=%y-%m-%d-%h/
api_channel.sinks.kafka2hive_general.channel = kafka2hive_general
api_channel.sinks.kafka2hive_general.hdfs.filetype = datastream
api_channel.sinks.kafka2hive_general.hdfs.writeformat = text
api_channel.sinks.kafka2hive_general.hdfs.idletimeout = 600
3.其他一些比如監控資料夾的spool,監控檔案的exec tail和其他資料來源請看
4.重新定義source和sink的請看 和
Linux 常用的指令(一)
檢視記憶體 free 指令 檢視記憶體使用情況 其中當使引數 m時,顯示的單位是m 當使用的引數是 k 時,顯示的單位是k 例 free m 檢視硬碟使用情況 1 硬碟以及分割槽情況 fdisk 指令 例如 fdisk l 2 檢視檔案系統的磁碟空間占有情況 df指令 例如 df h 3 檢視硬碟的...
entityManager 的常用方法(一)
理解eneitymanager的這三個方法的作用和區別,首先需要分清楚persistencecontext 和 entitymanager.persistencecontext 是entity的乙個例項。entitymanager 是和persistencecontext聯絡在一起的,被用來建立 刪...
Linux常用的命令 一
1.伺服器之間檔案傳送 put usr relay relay.tar.gz usr pinlei put 原始檔 目標檔案 2.解壓縮檔案 壓縮指定資料夾 tar unzip zcvf usr relay relay.tar.gz usr relay tar unzip zcvf 壓縮後壓縮包路徑...