flume增量讀取mysql資料寫入到hdfs

2021-08-28 02:41:40 字數 4085 閱讀 8901

#宣告source,channel,sink

a1.sources=sqlsource

a1.channels=c1

a1.sinks=s1

#宣告source型別

a1.sources.sqlsource.type=org.keedio.flume.source.sqlsource

a1.sources.sqlsource.hibernate.connection.url=jdbc:mysql:

a1.sources.sqlsource.hibernate.connection.user=root

a1.sources.sqlsource.hibernate.connection.password=root123

#這個引數很重要,預設false,如果設為false就不會自動查詢

a1.sources.sqlsource.hibernate.connection.autocommit=true

#宣告mysql的hibernate方言

a1.sources.sqlsource.hibernate.dialect=org.hibernate.dialect.mysql5dialect

#宣告mysql驅動

a1.sources.sqlsource.hibernate.connection.driver_class=com.mysql.jdbc.driver

#查詢間隔,單位毫秒

a1.sources.sqlsource.run.query.delay=10000

#宣告儲存flume狀態的資料夾位置

a1.sources.sqlsource.status.file.path=/usr/soft/apache-flume-1.5.2-bin/log

a1.sources.sqlsource.status.file.name=sqlsouce.status

#宣告從第一條資料開始查詢

a1.sources.sqlsouce.start.from=0

#sql語句自定義,但是要注意:增量只能針對id欄位即主鍵列,經測試系統預設如此.

#而且必須要將主鍵查詢出來,因為如果不查詢主鍵,flume無法記錄上一次查詢的位置.

#$@$表示增量列上一次查詢的值,記錄在status檔案中

a1.sources.sqlsource.custom.query=

select nsc.servicecheck_id, ns.display_name, nh.alias, nsc.state, nsc.start_time, nsc.end_time, nsc.output, nsc.perfdata

from nagios_servicechecks as nsc

left join nagios_services as ns on nsc.service_object_id = ns.service_object_id

left join nagios_hosts as nh on ns.host_object_id = nh.host_object_id

where ns.display_name = '901_cpu_load'

and nsc.servicecheck_id > $@$ order by nsc.servicecheck_id asc

#設定分批引數

a1.sources.sqlsource.batch.size=1000

a1.sources.sqlsource.max.rows=1000

#設定資料查詢出來後用什麼分隔符隔開,儲存時也用此分隔符

#此處一開始有效,後來沒有效果,未及測試原因,此處功能可以用***進行替換.

#a1.sources.sqlsource.delimiter.entry=|

#設定c3p0連線池引數

a1.sources.sqlsource.hibernate.connection.provider_class = org.hibernate.connection.c3p0connectionprovider

a1.sources.sqlsource.hibernate.c3p0.min_size=1

a1.sources.sqlsource.hibernate.c3p0.max_size=10

#配置***(替換)

a1.sources.sqlsource.interceptors=i1 i2 i3 i4 i5

a1.sources.sqlsource.interceptors.i1.type=search_replace

a1.sources.sqlsource.interceptors.i1.searchpattern="

a1.sources.sqlsource.interceptors.i1.replacestring=

a1.sources.sqlsource.interceptors.i2.type=search_replace

a1.sources.sqlsource.interceptors.i2.searchpattern=ok - cpu used=

a1.sources.sqlsource.interceptors.i2.replacestring=

a1.sources.sqlsource.interceptors.i3.type=search_replace

#注意:此處有空格需要匹配,因為直接寫空格無法直接匹配,外層需要加上小括號

a1.sources.sqlsource.interceptors.i3.searchpattern=( idle=)

a1.sources.sqlsource.interceptors.i3.replacestring=

a1.sources.sqlsource.interceptors.i4.type=search_replace

a1.sources.sqlsource.interceptors.i4.searchpattern=,

a1.sources.sqlsource.interceptors.i4.replacestring=|

a1.sources.sqlsource.interceptors.i5.type=search_replace

a1.sources.sqlsource.interceptors.i5.searchpattern=(% )

a1.sources.sqlsource.interceptors.i5.replacestring=%

#設定通道為記憶體模式

a1.channels.c1.type=memory

a1.channels.c1.capacity=10000

a1.channels.c1.transactioncapacity=10000

a1.channels.c1.bytecapacitybufferpercentage=20

a1.channels.c1.bytecapacity=800000

a1.sinks.s1.type=hdfs

#ns為namenode的命名空間,兩個作用,乙個是防止集群坍塌,另乙個是改引數只能作用在active的namenode節點上

a1.sinks.s1.hdfs.path=hdfs://ns/nagios/901_cpu_load

a1.sinks.s1.hdfs.filetype=datastream

a1.sinks.s1.hdfs.writeformat=text

#設定滾動時間,每隔多少時間生成乙個檔案.如果設定成0,則禁止滾動,可以使所有資料被寫到乙個檔案中.

a1.sinks.s1.hdfs.rollinterval=0

#設定檔案儲存資料多大的時候生成下乙個檔案,建議設定成128m和塊大小相同

a1.sinks.s1.hdfs.rollsize=134217728

#設定檔案多少行時,滾動生成下乙個檔案,設定成0時禁止滾動

a1.sinks.s1.hdfs.rollcount=0

#連線source,channel,sink

Flume監聽oracle表增量

需求 獲取oracle表增量資訊,傳送至udp514埠,支援ip配置 步驟 1 需要的jar oracle的 odbc5.jar oracle安裝目錄 jdbc lib下查詢 這兩個jar 都拷貝到flume的lib下 3 flume配置檔案 4 遞增欄位要放在select的第一位 切記 a1.so...

flume 增量上傳日誌檔案到HDFS中

1.採集日誌檔案時乙個很常見的現象 採集需求 比如業務系統使用log4j生成日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs中。1.1.根據需求,首先定義一下3大要素 採集源,即source 監控日誌檔案內容更新 exec tail f file 下沉目標,即sink hdfs...

python實現增量讀取檔案

最近在做乙個小工具,生產者不斷的往乙個txt檔案寫入資料,需要解析增量寫入的資料。實現如下 def read file filename global location location 0 with open filename as fd while true cur location fd.te...