#宣告source,channel,sink
a1.sources=sqlsource
a1.channels=c1
a1.sinks=s1
#宣告source型別
a1.sources.sqlsource.type=org.keedio.flume.source.sqlsource
a1.sources.sqlsource.hibernate.connection.url=jdbc:mysql:
a1.sources.sqlsource.hibernate.connection.user=root
a1.sources.sqlsource.hibernate.connection.password=root123
#這個引數很重要,預設false,如果設為false就不會自動查詢
a1.sources.sqlsource.hibernate.connection.autocommit=true
#宣告mysql的hibernate方言
a1.sources.sqlsource.hibernate.dialect=org.hibernate.dialect.mysql5dialect
#宣告mysql驅動
a1.sources.sqlsource.hibernate.connection.driver_class=com.mysql.jdbc.driver
#查詢間隔,單位毫秒
a1.sources.sqlsource.run.query.delay=10000
#宣告儲存flume狀態的資料夾位置
a1.sources.sqlsource.status.file.path=/usr/soft/apache-flume-1.5.2-bin/log
a1.sources.sqlsource.status.file.name=sqlsouce.status
#宣告從第一條資料開始查詢
a1.sources.sqlsouce.start.from=0
#sql語句自定義,但是要注意:增量只能針對id欄位即主鍵列,經測試系統預設如此.
#而且必須要將主鍵查詢出來,因為如果不查詢主鍵,flume無法記錄上一次查詢的位置.
#$@$表示增量列上一次查詢的值,記錄在status檔案中
a1.sources.sqlsource.custom.query=
select nsc.servicecheck_id, ns.display_name, nh.alias, nsc.state, nsc.start_time, nsc.end_time, nsc.output, nsc.perfdata
from nagios_servicechecks as nsc
left join nagios_services as ns on nsc.service_object_id = ns.service_object_id
left join nagios_hosts as nh on ns.host_object_id = nh.host_object_id
where ns.display_name = '901_cpu_load'
and nsc.servicecheck_id > $@$ order by nsc.servicecheck_id asc
#設定分批引數
a1.sources.sqlsource.batch.size=1000
a1.sources.sqlsource.max.rows=1000
#設定資料查詢出來後用什麼分隔符隔開,儲存時也用此分隔符
#此處一開始有效,後來沒有效果,未及測試原因,此處功能可以用***進行替換.
#a1.sources.sqlsource.delimiter.entry=|
#設定c3p0連線池引數
a1.sources.sqlsource.hibernate.connection.provider_class = org.hibernate.connection.c3p0connectionprovider
a1.sources.sqlsource.hibernate.c3p0.min_size=1
a1.sources.sqlsource.hibernate.c3p0.max_size=10
#配置***(替換)
a1.sources.sqlsource.interceptors=i1 i2 i3 i4 i5
a1.sources.sqlsource.interceptors.i1.type=search_replace
a1.sources.sqlsource.interceptors.i1.searchpattern="
a1.sources.sqlsource.interceptors.i1.replacestring=
a1.sources.sqlsource.interceptors.i2.type=search_replace
a1.sources.sqlsource.interceptors.i2.searchpattern=ok - cpu used=
a1.sources.sqlsource.interceptors.i2.replacestring=
a1.sources.sqlsource.interceptors.i3.type=search_replace
#注意:此處有空格需要匹配,因為直接寫空格無法直接匹配,外層需要加上小括號
a1.sources.sqlsource.interceptors.i3.searchpattern=( idle=)
a1.sources.sqlsource.interceptors.i3.replacestring=
a1.sources.sqlsource.interceptors.i4.type=search_replace
a1.sources.sqlsource.interceptors.i4.searchpattern=,
a1.sources.sqlsource.interceptors.i4.replacestring=|
a1.sources.sqlsource.interceptors.i5.type=search_replace
a1.sources.sqlsource.interceptors.i5.searchpattern=(% )
a1.sources.sqlsource.interceptors.i5.replacestring=%
#設定通道為記憶體模式
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactioncapacity=10000
a1.channels.c1.bytecapacitybufferpercentage=20
a1.channels.c1.bytecapacity=800000
a1.sinks.s1.type=hdfs
#ns為namenode的命名空間,兩個作用,乙個是防止集群坍塌,另乙個是改引數只能作用在active的namenode節點上
a1.sinks.s1.hdfs.path=hdfs://ns/nagios/901_cpu_load
a1.sinks.s1.hdfs.filetype=datastream
a1.sinks.s1.hdfs.writeformat=text
#設定滾動時間,每隔多少時間生成乙個檔案.如果設定成0,則禁止滾動,可以使所有資料被寫到乙個檔案中.
a1.sinks.s1.hdfs.rollinterval=0
#設定檔案儲存資料多大的時候生成下乙個檔案,建議設定成128m和塊大小相同
a1.sinks.s1.hdfs.rollsize=134217728
#設定檔案多少行時,滾動生成下乙個檔案,設定成0時禁止滾動
a1.sinks.s1.hdfs.rollcount=0
#連線source,channel,sink
Flume監聽oracle表增量
需求 獲取oracle表增量資訊,傳送至udp514埠,支援ip配置 步驟 1 需要的jar oracle的 odbc5.jar oracle安裝目錄 jdbc lib下查詢 這兩個jar 都拷貝到flume的lib下 3 flume配置檔案 4 遞增欄位要放在select的第一位 切記 a1.so...
flume 增量上傳日誌檔案到HDFS中
1.採集日誌檔案時乙個很常見的現象 採集需求 比如業務系統使用log4j生成日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs中。1.1.根據需求,首先定義一下3大要素 採集源,即source 監控日誌檔案內容更新 exec tail f file 下沉目標,即sink hdfs...
python實現增量讀取檔案
最近在做乙個小工具,生產者不斷的往乙個txt檔案寫入資料,需要解析增量寫入的資料。實現如下 def read file filename global location location 0 with open filename as fd while true cur location fd.te...