1.2. 環境配置
hive根目錄
/opt/cloudera/parcels/cdh-6.0.0-1.cdh6.0.0.p0.537114/lib/hive-hcatalog/share/hcatalog
中的4個jar包匯入flume_home/lib;
hive-hcatalog-core-2.3.0.jar
hive-hcatalog-ping-adapter-2.3.0.jar
hive-hcatalog-server-extensions-2.3.0.jar
hive-hcatalog-streaming-2.3.0.jar
hive_home/lib下的所有jar匯入flume_home/lib中;
1.3. hive端建表
注:必須分桶 + orc事務表
注:netcat-flume-hive 方式create
table flume_user(
user_id string,user_name string,age string
)clustered
by(user_id)
into
2 buckets
stored as orc
tblproperties(
'transactional'
='true'
)#開啟hive支援併發和事務
set hive.support.concurrency=
true
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager;
注:mysql-flume-hive 方式#netcat-flume-hive(埠傳送資料 sink_hive 測試)
#flume-ng agent --conf-file sink_hive.conf -c conf/ --name a1 -dflume.root.logger=debug,console
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.
type
=netcat
a1.sources.r1.bind=hadoop3
a1.sources.r1.port=
44444
a1.sinks.s1.
type
=hive
a1.sinks.s1.hive.metastore=thrift://hadoop3:9083
a1.sinks.s1.hive.
database
=default
a1.sinks.s1.hive.
table
=flume_user
a1.sinks.s1.serializer=delimited
#a1.sinks.s1.hive.partition=%y-%m-%d
#a1.sinks.s1.autocreatepartitions = false
#a1.sinks.s1.uselocaltimestamp=false
a1.sinks.s1.serializer.
delimiter
="\t"
a1.sinks.s1.serializer.serdeseparator=
'\t'
a1.sinks.s1.serializer.fieldnames=user_id,user_name,age
a1.channels.c1.
type
=memory
a1.channels.c1.capacity=
1000
a1.channels.c1.transactioncapacity=
100a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
#mysql-flume-hive(測試)
#flume-ng agent --conf-file sink_hive.conf -c conf/ --name a1 -dflume.root.logger=debug,console
a1.sources=sqlsource
a1.channels=c1
a1.sinks=s1
#宣告source型別
a1.sources.sqlsource.
type
=org.keedio.flume.source.sqlsource
a1.sources.sqlsource.hibernate.connection.url=jdbc:mysql://xx-xx-xx-xx:3306/test?usessl=false
a1.sources.sqlsource.hibernate.connection.
user
=***x
a1.sources.sqlsource.hibernate.connection.password=***x
#這個引數很重要,預設false,如果設為false就不會自動查詢
a1.sources.sqlsource.hibernate.connection.autocommit=
true
#宣告mysql的hibernate方言
a1.sources.sqlsource.hibernate.dialect=org.hibernate.dialect.mysql5dialect
#宣告mysql驅動
a1.sources.sqlsource.hibernate.connection.driver_class=com.mysql.jdbc.driver
#查詢間隔,單位毫秒
a1.sources.sqlsource.run.query.delay=
10000
#宣告儲存flume狀態的資料夾位置
a1.sources.sqlsource.
status
.file
.path=
/var/lib/flume
a1.sources.sqlsource.
status
.file
.name=test_mysql_hive.
status
#宣告從第一條資料開始查詢
a1.sources.sqlsouce.
start
.from=0
a1.sources.s1.custom.query =
select user_id,user_name,age from flume_hive_test where user_id > $@$
order
by user_id asc
#a1.sources.sqlsource.columns.to.select = *
#a1.sources.sqlsource.incremental.column.name = user_id
#a1.sources.sqlsource.incremental.value = 0
#設定分批引數
a1.sources.sqlsource.batch.size=
1000
a1.sources.sqlsource.max.
rows
=1000
#設定c3p0連線池引數
a1.sources.sqlsource.hibernate.connection.provider_class=org.hibernate.connection.c3p0connectionprovider
a1.sources.sqlsource.hibernate.c3p0.min_size=
1a1.sources.sqlsource.hibernate.c3p0.max_size=
10a1.sinks.s1.
type
=hive
a1.sinks.s1.hive.metastore=thrift://hadoop3:9083
a1.sinks.s1.hive.
database
=default
a1.sinks.s1.hive.
table
=flume_user
a1.sinks.s1.serializer=delimited
#a1.sinks.s1.hive.partition=%y-%m-%d
#a1.sinks.s1.autocreatepartitions = false
#a1.sinks.s1.uselocaltimestamp=true
a1.sinks.k1.round =
true
a1.sinks.k1.roundvalue =
10a1.sinks.k1.roundunit =
minute
a1.sinks.s1.serializer.
delimiter
="\t"
a1.sinks.s1.serializer.serdeseparator=
'\t'
a1.sinks.s1.serializer.fieldnames=user_id,user_name,age
a1.channels.c1.
type
=memory
a1.channels.c1.capacity=
1000
a1.channels.c1.transactioncapacity=
100a1.sources.sqlsource.channels=c1
a1.sinks.s1.channel=c1`
``
flume增量讀取mysql資料寫入到hdfs
宣告source,channel,sink a1.sources sqlsource a1.channels c1 a1.sinks s1 宣告source型別 a1.sources.sqlsource.type org.keedio.flume.source.sqlsource a1.source...
從mysql抽取資料到hive遇到的問題
資料遷移最大的影響是資料型別的不同導致資料不匹配,比如文字mysql中為text,到了hive就變成string,hive中int型別的都不帶長度 重點是時間型別的轉換 mysql中datetime型別的資料是這樣的,hive中需要用timestamp來進行轉換 mysql中date型別hive中也...
flume從kafka導資料到hdfs
flume是cloudera提供的乙個高可用的,高可靠的,分布式的海量日誌採集 聚合和傳輸的系統,flume支援在日誌系統中定製各類資料傳送方,用於收集資料 同時,flume提供對資料進行簡單處理,並寫到各種資料接受方 可定製 的能力.利用flume從kafka導資料到hdfs 配置檔案如下 flu...