建立資料匯流排(DataHub)源表

2021-09-19 21:35:21 字數 3439 閱讀 3976

datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務,共同構建一站式的資料處理平台。實時計算 flink通常使用datahub作為流式資料儲存頭和輸出目的端。同時,上游眾多流式資料,包括dts、iot等均選擇datahub作為大資料平台的資料入口。datahub本身是流資料儲存,實時計算只能將其作為流式資料輸入。示例如下:

create

table datahub_stream

(name varchar,age bigint,birthday bigint)

with

(type

='datahub'

,endpoint

='',

project

='blink_datahub_test'

,topic

='test_topic_1'

,accessid

='0i70******xxs'

,accesskey

='yf60ew*********nvqpj2zhcfhu'

,starttime

='2017-07-21 00:00:00'

);

flink sql支援獲取datahub的屬性字段。能夠記錄每條資訊寫入datahub的系統時間。

如圖所示:

欄位名注釋說明

timestamp

每條記錄入datahub的systemtime

通過header關鍵字獲取屬性字段。

例如,屬性欄位並不存在於datahub的字段宣告裡。想獲取每條記錄入datahub的systemtime,可以將timestamp作為欄位名,在後面加上header就可以取出想要的屬性值。

name(varchat)

msgid(varchat)

ens_altar_flow

ems0a

create

table datahub_log

(`timestamp`

varchar header

,result varchar

msgid

varchar

)

with

(

type

='datahub'

);

create

table rds_out

(

`timestamp`

varchar

,

msgid

varchar

,

version

varchar

)

with

(

type

='rds'

);

insert

into rds_out

select

`timestamp`

,result

,msgid

from

datahub_log

;time(varchat)

msgid(varchat)

version(varchat)

1522652455625

ems0a

0.0.1

目前只支援tuple模式的topic

引數注釋說明

備註endpoint

消費端點資訊

datahub的endpoint位址

accessid

讀取的accessid

無accesskey

讀取的金鑰

無project

讀取的專案

無topic

project下的具體的topic

無starttime

啟動位點的時間

格式為」yyyy-mm-dd hh:mm:ss」

maxretrytimes

讀取最大嘗試次數

可選,預設為20。

retryintervalms

重試間隔

可選,預設為1000。

batchreadsize

單次讀取條數

可選,預設為10,可設定的最大值為1000。

lengthcheck

單行字段條數檢查策略

可選,預設為skip,其它可選值為exception、pad。skip:字段數目不符合時跳過 。exception:字段數目不符合時丟擲異常。pad:按順序填充,不存在的置為null。

columnerrordebug

是否開啟除錯開關,如果開啟,會把解析異常的log列印出來

可選,預設為false。

datahub和實時計算字段型別對應關係如下,建議使用該對應關係時進行ddl宣告:

datahub欄位型別

實時計算字段型別

bigint

bigint

string

varchar

double

double

timestamp

bigint

boolean

boolean

decimal

decimal

注意:datahub的timestamp是精確到微妙級別的,在unix時間戳裡是16位的。而實時計算定義的timestamp是精確到毫秒級別的,在unix時間戳裡是13位的所以建議大家使用bigint來對映。如果一定是要用timestamp建議使用計算列來做轉換。

本文**實時計算——

建立資料匯流排(datahub)源表

建立資料匯流排(DataHub)結果表

datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務。結合阿里雲眾多雲產品,可以構建一站式的資料處理平台。實時計算 flink通常使用datahub作為流式資料儲存頭和輸出目的端。同時,上游眾多流式資料,包括dts iot等均選擇datahub作為大資料平台的資料入口。dat...

大資料匯流排 DataHub

本頁目錄 datahub作為乙個流式資料匯流排,為阿里雲數加平台提供了大資料的入口服務。結合阿里雲眾多雲產品,可以構建一站式的資料處理平台。流計算通常使用datahub作為流式資料儲存頭和輸出目的端。注意 datahub在公有雲使用需要使用者授予實時計算代為使用者訪問datahub許可權,具體請參看...

建立訊息佇列(Kafka)源表

本頁目錄 kafka源表的實現 於自社群的kafka版本實現。注意 本文件只適合獨享模式下使用。kafka需要定義的ddl如下。create table kafka stream messagekey varbinary message varbinary topic varchar partiti...