版本flume 1.6 kafka2.11
第二步:我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar
第三步:配置flume的conf配置檔案。
vi sql-kafka.conf 具體配置如下:
agenttest.channels = channeltest
agenttest.sources = sourcetest
agenttest.sinks = sinktest
###########sql source#################
# for each test of the sources, the type is defined
agenttest.sources.sourcetest.type = org.keedio.flume.source.sqlsource
agenttest.sources.sourcetest.hibernate.connection.url = jdbc:oracle:thin:@192.168.200.8:1521/orcl
# hibernate database connection properties
agenttest.sources.sourcetest.hibernate.connection.user = typpcits
agenttest.sources.sourcetest.hibernate.connection.password = typpcits
agenttest.sources.sourcetest.hibernate.connection.autocommit = true
agenttest.sources.sourcetest.hibernate.dialect = org.hibernate.dialect.oracle10gdialect
agenttest.sources.sourcetest.hibernate.connection.driver_class = oracle.jdbc.driver.oracledriver
agenttest.sources.sourcetest.run.query.delay=1
agenttest.sources.sourcetest.status.file.path = /opt/flume
agenttest.sources.sourcetest.status.file.name = agenttest.sqlsource.status
# custom query
agenttest.sources.sourcetest.custom.query = select * from its_base_area
agenttest.sources.sourcetest.batch.size = 6000
agenttest.sources.sourcetest.max.rows = 1000
agenttest.sources.sourcetest.hibernate.connection.provider_class = org.hibernate.connection.c3p0connectionprovider
agenttest.sources.sourcetest.hibernate.c3p0.min_size=1
agenttest.sources.sourcetest.hibernate.c3p0.max_size=10
##############################
agenttest.channels.channeltest.type = memory
agenttest.channels.channeltest.capacity = 10000
agenttest.channels.channeltest.transactioncapacity = 10000
agenttest.channels.channeltest.bytecapacitybufferpercentage = 20
agenttest.channels.channeltest.bytecapacity = 1600000
agenttest.sinks.sinktest.type = org.apache.flume.sink.kafka.kafkasink
agenttest.sinks.sinktest.topic = testtopic
agenttest.sinks.sinktest.brokerlist = 192.168.72.129:9092,192.168.72.130:9092,192.168.72.131:9092
agenttest.sinks.sinktest.requiredacks = 1
agenttest.sinks.sinktest.batchsize = 20
agenttest.sinks.sinktest.channel = channeltest
agenttest.sinks.sinktest.channel = channeltest
agenttest.sources.sourcetest.channels=channeltest
以上需要替換別忘記換了。
第四步:在flume bin路徑下執行命令:
./bin/flume-ng agent -n agenttest -c conf -f conf/sql-kafka.conf -dflume.root.logger=info,console
第五步:在kafka主題testtopic上看有沒有資料。在路徑kafka的bin下執行命令:
./kafka-console-consumer.sh --zookeeper 192.168.72.129:2181 --topic testtopic --from-beginning
如果成功的話,這時候就可以看見你查詢oracle的資料了。
flume 監控傳送數和成功數
使用flume實時收集日誌的過程中,儘管有事務機制保證資料不丟失,但仍然需要時刻關注source channel sink之間的訊息傳輸是否正常,比如,souce channel傳輸了多少訊息,channel sink又傳輸了多少,兩處的訊息量是否偏差過大等等。flume為我們提供了monitor的...
Flume監聽oracle表增量
需求 獲取oracle表增量資訊,傳送至udp514埠,支援ip配置 步驟 1 需要的jar oracle的 odbc5.jar oracle安裝目錄 jdbc lib下查詢 這兩個jar 都拷貝到flume的lib下 3 flume配置檔案 4 遞增欄位要放在select的第一位 切記 a1.so...
本地檔案到通過flume到kafka
配置檔案 agent1 name agent1.sources source1 agent1.sinks sink1 agent1.channels channel1 建立linux目錄建立kakfa的topicname 啟動flume的配置檔案 flume ng agent n agent1 c ...