ODPS功能介紹之資料匯入

2021-09-23 23:29:36 字數 4311 閱讀 4011

在使用odps強大的資料處理能力之前,大家最關心的是自己的資料如何匯入到odps中。 下面介紹一款向odps匯入資料的工具-fluentd。

使用這款軟體,向odps匯入資料,需要具備如下環境:

ruby 2.1.0 或更新

gem 2.4.5 或更新

fluentd-0.10.49 或從fluentd 官網 查詢最新,fluentd為不同的os提供了不同的版本,詳見

protobuf-3.5.1 或更新(ruby protobuf)

接下來可以通過以下兩種方式中的任意一種來安裝odps fluentd 匯入外掛程式。

方式一:通過ruby gem安裝:

$ gem install fluent-plugin-aliyun-odps

odps已經將這個外掛程式發布到gem庫中, 名稱為 fluent-plugin-aliyun-odps,只需要通過gem install 命令來安裝即可(大家在使用gem 時在國內可能會遇到gem庫無法訪問,可以在網上搜一下更改gem 庫源來解決)。

方式二:通過外掛程式原始碼安裝:

$ gem install protobuf

$ gem install fluentd –no-ri –no-rdoc

$ git clone

$ cp aliyun-odps-fluentd-plugin/lib/fluent/plugin/* /lib/fluent/plugin/ -r

其中第二條命令是安裝fluentd,如果已經安裝可以省略。 odps fluentd外掛程式原始碼在github上,clone下來之後直接放到fluentd的plugin目錄中即可。

使用fluentd匯入資料時,最主要的是配置fluentd的conf檔案,更多conf檔案 的介紹請參見:

示例一:匯入nginx日誌 。conf中source的配置如下:

type tail

path /opt/log/in/in.log

pos_file /opt/log/in/in.log.pos

refresh_interval 5s

tag in.log

format /^(?[^ ]*) – - \[(?[^\]]*)\] 「(?\s+)(?: +(?[^\"]*?)(?: +\s*)?)?」 (?[^ ]*) (?[^ ]*) 「-」 「(?[^\"]*)」$/

time_format %y%b%d %h:%m:%s %z

fluentd 以tail方式監控指定的檔案內容是否有變化,更多的tail配置參見:

match 配置如下:

type aliyun_odps

aliyun_access_id ************

aliyun_access_key *********

aliyun_odps_endpoint

aliyun_odps_hub_endpoint

buffer_chunk_limit 2m

buffer_queue_limit 128

flush_interval 5s

project projectforlog

table nginx_log

fields remote,method,path,code,size,agent

partition ctime=$

time_format %d/%b/%y:%h:%m:%s %z

資料會匯入到projectforlog project的nginx_log表中,其中會以源中的datetime欄位作為分割槽,外掛程式遇到不同的值時會自動建立分割槽;

示例二:匯入mysql中的資料。匯入mysql中資料時,需要安裝fluent-plugin-sql外掛程式作為source:

$ gem install fluent-plugin-sql

配置conf中的source:

type sql

host 127.0.0.1

database test

adapter mysql

username ***x

password ***x

select_interval 10s

select_limit 100

state_file /path/sql_state

table test_table

tag in.sql

update_column id

這個例子是從test_table中select資料,每間隔10s去讀取100條資料出來,select 時將id列作為主鍵(id欄位是自增型)。關於fluent-plugin-sql的更多說明參見:

match 配置如下:

type aliyun_odps

aliyun_access_id ************

aliyun_access_key *********

aliyun_odps_endpoint

aliyun_odps_hub_endpoint

buffer_chunk_limit 2m

buffer_queue_limit 128

flush_interval 5s

project your_projectforlog

table mysql_data

fields id,field1,field2,fields3

資料會匯出到odps projectforlog project的mysql_data表中,匯入的字段包括id,field1,field2,field3。

通過fluentd匯入資料是走的odps實時資料流入通道-datahub,這個通道需要乙個特殊的odps表,這個表在建立時需要指定為hub table。建立表時可以使用如下語名:

create table shards hublifecycle ;

其中:n1 是指shards數量,有效值為1-20。在匯入資料時,每個shard的流入量是10m/秒。n2是指資料在datahub上的保留期,有效值1-7,主要用於流計算場景中使用歷史資料。 例如:

create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7 string) partitioned by(ctime string) into 5 shards hublifecycle 7;

如果向已經存在的表匯入資料,也需要將表修改為hub表, 其命令為:

alter table table_name enable huttable with shards hublifecycle ;

向odps匯入資料,需要將odps外掛程式配置在conf檔案中match項中。外掛程式支援的引數說明如下:

type(fixed): 固定值 aliyun_odps.

aliyun_access_id(required):雲賬號access_id.

aliyun_access_key(required):雲賬號access key.

aliyun_odps_hub_endpoint(required):如果你的服務部署在esc上,請把本值設定為  否則設定為.

aliyunodps_endpoint(required):如果你的服務部署在esc上,請把本值設定為  否則設定為 .

buffer_chunk_limit(optional): 塊大小,支援「k」(kb),「m」(mb),「g」(gb)單位,預設 8mb,建議值2mb.

buffer_queue_limit(optional): 塊佇列大小,此值與buffer_chunk_limit共同決定整個緩衝區大小。

flush_interval(optional): 強制傳送間隔,達到時間後塊資料未滿則強制傳送, 預設 60s.

project(required): project名稱.

table(required): table名稱.

fields(required): 與source對應,欄位名必須存在於source之中.

partition(optional):若為分割槽表,則設定此項.

分割槽名支援的設定模式:

固定值: partition ctime=20150804

關鍵字: partition ctime=$ (其中remote為source中某欄位)

時間格式關鍵字: partition ctime=$ (其中datetime為source中某時間格式字段,輸出為%y%m%d格式作為分割槽名稱)

time_format(optional):如果使用時間格式關鍵字為, 請設定本引數. 例如: source[datetime]=」29/aug/2015:11:10:16 +0800″,則設定為」%d/%b/%y:%h:%m:%s %z」

ODPS功能介紹之CLT

odpscmd 2 分別執行下述命令 檢視幫助資訊 help 切換專案 use aca21104 demo 檢視當前專案的詳細資訊 desc project aca21104 demo 列出表資訊 ls tables 檢視某個表的具體資訊 desc dual 檢視表中記錄數 count dual s...

資料匯入功能實現心得

剛接到這個需求感覺很簡單,看大致意思主要是,從excel中讀取資料並寫入到資料庫中,唯一特殊之處是有一列大字段 超過4000個字元,在資料庫中需要用clob型別儲存 需特殊處理。想了下把 寫好了。開始測試發現,匯入一條資料需要8秒左右,按這速度導20萬資料的話,不敢往下想了,於是想著 調優,優化以後...

資料庫之ODPS中sql語句指南

持續更新中 一 增1 增加一列 向csp hsy count info表中增加sale qty列 alter table csp hsy count info add columns sale qty bigint 2.增加一張表 表為fact hsy panter pay org,分割槽為p da...