Flink SQL中Timestamp使用的坑

2021-10-05 12:51:00 字數 1927 閱讀 3072

flink版本為1.10。

flink sql消費kafka訊息,表定義為

create

table start_log_source(

mid_id varchar

, user_id int,.

.. ,-- 13位的時間戳(1587975971431)

interval

'5'second

-- 在ts上定義5 秒延遲的 watermark

)with

('connector.type'

='kafka'

,-- 使用 kafka connector

'connector.version'

='universal'

,-- kafka 版本,universal 支援 0.11 以上的版本

'connector.topic'

='start_log'

,-- kafka topic

'connector.properties.group.id'

='start_log_group'

,'connector.startup-mode'

='earliest-offset'

,-- 從起始 offset 開始讀取

'connector.properties.zookeeper.connect'

='192.168.1.109:2181'

,-- zookeeper 位址

'connector.properties.bootstrap.servers'

='192.168.1.109:9092'

,-- kafka broker 位址

'format.type'

='json'

-- 資料來源格式為 json

);

create

table start_log_source(

mid_id varchar

, user_id int,.

.. ,-- 13位的時間戳(1587975971431)

1000

,'yyyy-mm-dd hh:mm:ss'))

,-- 定義事件時間

watermark for ts as ts -

interval

'5'second

-- 在ts上定義5 秒延遲的 watermark

)with

('connector.type'

='kafka'

,-- 使用 kafka connector

'connector.version'

='universal'

,-- kafka 版本,universal 支援 0.11 以上的版本

'connector.topic'

='start_log'

,-- kafka topic

'connector.properties.group.id'

='start_log_group'

,'connector.startup-mode'

='earliest-offset'

,-- 從起始 offset 開始讀取

'connector.properties.zookeeper.connect'

='192.168.1.109:2181'

,-- zookeeper 位址

'connector.properties.bootstrap.servers'

='192.168.1.109:9092'

,-- kafka broker 位址

'format.type'

='json'

-- 資料來源格式為 json

);

flink sql 新增函式

修改 flink table common專案 org.apache.flink.table.functions.builtinfunctiondefinitions 類 增加乙個成員 public static final builtinfunctiondefinition nvl new bui...

flashback 閃回資料之 timestamp

一 誤刪除表 delete from oa.test 0504 commit 二 往前推大概刪除時間,查詢是否有對應資料 select count 1 from oa.test 0504 as of timestamp to date 20171121 14 45 00 yyyymmdd hh24 ...

Flink SQL 如何實現列轉行

在 sql 任務裡面經常會遇到一列轉多行的需求,今天就來總結一下在 flink sql 裡面如何實現列轉行的,先來看下面的乙個具體案例.需求原始資料格式如下 name data jasonlee data 格式化 現在希望得到的資料格式是這樣的 name content type urljasonl...