flink版本為1.10。
flink sql消費kafka訊息,表定義為
create
table start_log_source(
mid_id varchar
, user_id int,.
.. ,-- 13位的時間戳(1587975971431)
interval
'5'second
-- 在ts上定義5 秒延遲的 watermark
)with
('connector.type'
='kafka'
,-- 使用 kafka connector
'connector.version'
='universal'
,-- kafka 版本,universal 支援 0.11 以上的版本
'connector.topic'
='start_log'
,-- kafka topic
'connector.properties.group.id'
='start_log_group'
,'connector.startup-mode'
='earliest-offset'
,-- 從起始 offset 開始讀取
'connector.properties.zookeeper.connect'
='192.168.1.109:2181'
,-- zookeeper 位址
'connector.properties.bootstrap.servers'
='192.168.1.109:9092'
,-- kafka broker 位址
'format.type'
='json'
-- 資料來源格式為 json
);
create
table start_log_source(
mid_id varchar
, user_id int,.
.. ,-- 13位的時間戳(1587975971431)
1000
,'yyyy-mm-dd hh:mm:ss'))
,-- 定義事件時間
watermark for ts as ts -
interval
'5'second
-- 在ts上定義5 秒延遲的 watermark
)with
('connector.type'
='kafka'
,-- 使用 kafka connector
'connector.version'
='universal'
,-- kafka 版本,universal 支援 0.11 以上的版本
'connector.topic'
='start_log'
,-- kafka topic
'connector.properties.group.id'
='start_log_group'
,'connector.startup-mode'
='earliest-offset'
,-- 從起始 offset 開始讀取
'connector.properties.zookeeper.connect'
='192.168.1.109:2181'
,-- zookeeper 位址
'connector.properties.bootstrap.servers'
='192.168.1.109:9092'
,-- kafka broker 位址
'format.type'
='json'
-- 資料來源格式為 json
);
flink sql 新增函式
修改 flink table common專案 org.apache.flink.table.functions.builtinfunctiondefinitions 類 增加乙個成員 public static final builtinfunctiondefinition nvl new bui...
flashback 閃回資料之 timestamp
一 誤刪除表 delete from oa.test 0504 commit 二 往前推大概刪除時間,查詢是否有對應資料 select count 1 from oa.test 0504 as of timestamp to date 20171121 14 45 00 yyyymmdd hh24 ...
Flink SQL 如何實現列轉行
在 sql 任務裡面經常會遇到一列轉多行的需求,今天就來總結一下在 flink sql 裡面如何實現列轉行的,先來看下面的乙個具體案例.需求原始資料格式如下 name data jasonlee data 格式化 現在希望得到的資料格式是這樣的 name content type urljasonl...