datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務,共同構建一站式的資料處理平台。實時計算 flink通常使用datahub作為流式資料儲存頭和輸出目的端。同時,上游眾多流式資料,包括dts、iot等均選擇datahub作為大資料平台的資料入口。datahub本身是流資料儲存,實時計算只能將其作為流式資料輸入。示例如下:
create
table datahub_stream
(
name varchar
,age bigint
,birthday bigint
)
with
(type
='datahub'
,endpoint
='',
project
='blink_datahub_test'
,topic
='test_topic_1'
,accessid
='0i70******xxs'
,accesskey
='yf60ew*********nvqpj2zhcfhu'
,starttime
='2017-07-21 00:00:00'
);
flink sql支援獲取datahub的屬性字段。能夠記錄每條資訊寫入datahub的系統時間。
如圖所示:
欄位名注釋說明
timestamp
每條記錄入datahub的systemtime
通過header
關鍵字獲取屬性字段。
例如,屬性欄位並不存在於datahub的字段宣告裡。想獲取每條記錄入datahub的systemtime,可以將timestamp作為欄位名,在後面加上header
就可以取出想要的屬性值。
name(varchat)
msgid(varchat)
ens_altar_flow
ems0a
create
table datahub_log
(
`timestamp`
varchar header
,result varchar
,
msgid
varchar
)
with
(
type
='datahub'
);
create
table rds_out
(
`timestamp`
varchar
,
msgid
varchar
,
version
varchar
)
with
(
type
='rds'
);
insert
into rds_out
select
`timestamp`
,result
,msgid
from
datahub_log
;time(varchat)
msgid(varchat)
version(varchat)
1522652455625
ems0a
0.0.1
目前只支援tuple模式的topic
引數注釋說明
備註endpoint
消費端點資訊
datahub的endpoint位址
accessid
讀取的accessid
無accesskey
讀取的金鑰
無project
讀取的專案
無topic
project下的具體的topic
無starttime
啟動位點的時間
格式為」yyyy-mm-dd hh:mm:ss」
maxretrytimes
讀取最大嘗試次數
可選,預設為20。
retryintervalms
重試間隔
可選,預設為1000。
batchreadsize
單次讀取條數
可選,預設為10,可設定的最大值為1000。
lengthcheck
單行字段條數檢查策略
可選,預設為skip,其它可選值為exception、pad。skip:字段數目不符合時跳過 。exception:字段數目不符合時丟擲異常。pad:按順序填充,不存在的置為null。
columnerrordebug
是否開啟除錯開關,如果開啟,會把解析異常的log列印出來
可選,預設為false。
datahub和實時計算字段型別對應關係如下,建議使用該對應關係時進行ddl宣告:
datahub欄位型別
實時計算字段型別
bigint
bigint
string
varchar
double
double
timestamp
bigint
boolean
boolean
decimal
decimal
注意:datahub的timestamp是精確到微妙級別的,在unix時間戳裡是16位的。而實時計算定義的timestamp是精確到毫秒級別的,在unix時間戳裡是13位的所以建議大家使用bigint來對映。如果一定是要用timestamp建議使用計算列來做轉換。本文**實時計算——
建立資料匯流排(datahub)源表
建立資料匯流排(DataHub)結果表
datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務。結合阿里雲眾多雲產品,可以構建一站式的資料處理平台。實時計算 flink通常使用datahub作為流式資料儲存頭和輸出目的端。同時,上游眾多流式資料,包括dts iot等均選擇datahub作為大資料平台的資料入口。dat...
大資料匯流排 DataHub
本頁目錄 datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務。結合阿里雲眾多雲產品,可以構建一站式的資料處理平台。流計算通常使用datahub作為流式資料儲存頭和輸出目的端。注意 datahub在公有雲使用需要使用者授予實時計算代為使用者訪問datahub許可權,具體請參看...
建立訊息佇列(Kafka)源表
本頁目錄 kafka源表的實現 於自社群的kafka版本實現。注意 本文件只適合獨享模式下使用。kafka需要定義的ddl如下。create table kafka stream messagekey varbinary message varbinary topic varchar partiti...