在使用odps強大的資料處理能力之前,大家最關心的是自己的資料如何匯入到odps中。 下面介紹一款向odps匯入資料的工具-fluentd。
使用這款軟體,向odps匯入資料,需要具備如下環境:
ruby 2.1.0 或更新
gem 2.4.5 或更新
fluentd-0.10.49 或從fluentd 官網 查詢最新,fluentd為不同的os提供了不同的版本,詳見
protobuf-3.5.1 或更新(ruby protobuf)
接下來可以通過以下兩種方式中的任意一種來安裝odps fluentd 匯入外掛程式。
方式一:通過ruby gem安裝:
$ gem install fluent-plugin-aliyun-odps
odps已經將這個外掛程式發布到gem庫中, 名稱為 fluent-plugin-aliyun-odps,只需要通過gem install 命令來安裝即可(大家在使用gem 時在國內可能會遇到gem庫無法訪問,可以在網上搜一下更改gem 庫源來解決)。
方式二:通過外掛程式原始碼安裝:
$ gem install protobuf
$ gem install fluentd –no-ri –no-rdoc
$ git clone
$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* /lib/fluent/plugin/ -r
其中第二條命令是安裝fluentd,如果已經安裝可以省略。 odps fluentd外掛程式原始碼在github上,clone下來之後直接放到fluentd的plugin目錄中即可。
使用fluentd匯入資料時,最主要的是配置fluentd的conf檔案,更多conf檔案 的介紹請參見:
示例一:匯入nginx日誌 。conf中source的配置如下:
type tail
path /opt/log/in/in.log
pos_file /opt/log/in/in.log.pos
refresh_interval 5s
tag in.log
format /^(?[^ ]*) – - \[(?[^\]]*)\] 「(?\s+)(?: +(?[^\"]*?)(?: +\s*)?)?」 (?[^ ]*) (?[^ ]*) 「-」 「(?[^\"]*)」$/
time_format %y%b%d %h:%m:%s %z
fluentd 以tail方式監控指定的檔案內容是否有變化,更多的tail配置參見:
match 配置如下:
type aliyun_odps
aliyun_access_id ************
aliyun_access_key *********
aliyun_odps_endpoint
aliyun_odps_hub_endpoint
buffer_chunk_limit 2m
buffer_queue_limit 128
flush_interval 5s
project projectforlog
table nginx_log
fields remote,method,path,code,size,agent
partition ctime=$
time_format %d/%b/%y:%h:%m:%s %z
資料會匯入到projectforlog project的nginx_log表中,其中會以源中的datetime欄位作為分割槽,外掛程式遇到不同的值時會自動建立分割槽;
示例二:匯入mysql中的資料。匯入mysql中資料時,需要安裝fluent-plugin-sql外掛程式作為source:
$ gem install fluent-plugin-sql
配置conf中的source:
type sql
host 127.0.0.1
database test
adapter mysql
username ***x
password ***x
select_interval 10s
select_limit 100
state_file /path/sql_state
table test_table
tag in.sql
update_column id
這個例子是從test_table中select資料,每間隔10s去讀取100條資料出來,select 時將id列作為主鍵(id欄位是自增型)。關於fluent-plugin-sql的更多說明參見:
match 配置如下:
type aliyun_odps
aliyun_access_id ************
aliyun_access_key *********
aliyun_odps_endpoint
aliyun_odps_hub_endpoint
buffer_chunk_limit 2m
buffer_queue_limit 128
flush_interval 5s
project your_projectforlog
table mysql_data
fields id,field1,field2,fields3
資料會匯出到odps projectforlog project的mysql_data表中,匯入的字段包括id,field1,field2,field3。
通過fluentd匯入資料是走的odps實時資料流入通道-datahub,這個通道需要乙個特殊的odps表,這個表在建立時需要指定為hub table。建立表時可以使用如下語名:
create table shards hublifecycle ;
其中:n1 是指shards數量,有效值為1-20。在匯入資料時,每個shard的流入量是10m/秒。n2是指資料在datahub上的保留期,有效值1-7,主要用於流計算場景中使用歷史資料。 例如:
create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7 string) partitioned by(ctime string) into 5 shards hublifecycle 7;
如果向已經存在的表匯入資料,也需要將表修改為hub表, 其命令為:
alter table table_name enable huttable with shards hublifecycle ;
向odps匯入資料,需要將odps外掛程式配置在conf檔案中match項中。外掛程式支援的引數說明如下:
type(fixed): 固定值 aliyun_odps.
aliyun_access_id(required):雲賬號access_id.
aliyun_access_key(required):雲賬號access key.
aliyun_odps_hub_endpoint(required):如果你的服務部署在esc上,請把本值設定為 否則設定為.
aliyunodps_endpoint(required):如果你的服務部署在esc上,請把本值設定為 否則設定為 .
buffer_chunk_limit(optional): 塊大小,支援「k」(kb),「m」(mb),「g」(gb)單位,預設 8mb,建議值2mb.
buffer_queue_limit(optional): 塊佇列大小,此值與buffer_chunk_limit共同決定整個緩衝區大小。
flush_interval(optional): 強制傳送間隔,達到時間後塊資料未滿則強制傳送, 預設 60s.
project(required): project名稱.
table(required): table名稱.
fields(required): 與source對應,欄位名必須存在於source之中.
partition(optional):若為分割槽表,則設定此項.
分割槽名支援的設定模式:
固定值: partition ctime=20150804
關鍵字: partition ctime=$ (其中remote為source中某欄位)
時間格式關鍵字: partition ctime=$ (其中datetime為source中某時間格式字段,輸出為%y%m%d格式作為分割槽名稱)
time_format(optional):如果使用時間格式關鍵字為, 請設定本引數. 例如: source[datetime]=」29/aug/2015:11:10:16 +0800″,則設定為」%d/%b/%y:%h:%m:%s %z」
ODPS功能介紹之CLT
odpscmd 2 分別執行下述命令 檢視幫助資訊 help 切換專案 use aca21104 demo 檢視當前專案的詳細資訊 desc project aca21104 demo 列出表資訊 ls tables 檢視某個表的具體資訊 desc dual 檢視表中記錄數 count dual s...
資料匯入功能實現心得
剛接到這個需求感覺很簡單,看大致意思主要是,從excel中讀取資料並寫入到資料庫中,唯一特殊之處是有一列大字段 超過4000個字元,在資料庫中需要用clob型別儲存 需特殊處理。想了下把 寫好了。開始測試發現,匯入一條資料需要8秒左右,按這速度導20萬資料的話,不敢往下想了,於是想著 調優,優化以後...
資料庫之ODPS中sql語句指南
持續更新中 一 增1 增加一列 向csp hsy count info表中增加sale qty列 alter table csp hsy count info add columns sale qty bigint 2.增加一張表 表為fact hsy panter pay org,分割槽為p da...