在 sql 任務裡面經常會遇到一列轉多行的需求,今天就來總結一下在 flink sql 裡面如何實現列轉行的,先來看下面的乙個具體案例.
需求原始資料格式如下:
name
data
jasonlee
[,,]
data 格式化
, ,]}
現在希望得到的資料格式是這樣的:
name
content_type
urljasonlee
flink
111jasonlee
spark
222jasonlee
hadoop
333
這是乙個典型的列轉行或者一行轉多行的場景,需要將 data 列進行拆分成為多行多列,下面介紹兩種實現方式.
使用 flink 自帶的 unnest 函式解析
使用自定義 udtf 函式解析
建表 ddl
create table kafka_table (
name string,
`data` array>
)with (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'test',
'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092', -- broker連線資訊
'properties.group.id' = 'jason_flink_test', -- 消費kafka的group_id
'scan.startup.mode' = 'latest-offset', -- 讀取資料的位置
'format' = 'json', -- 資料來源格式為 json
'json.fail-on-missing-field' = 'false', -- 字段丟失任務不失敗
'json.ignore-parse-errors' = 'true' -- 解析失敗跳過
)
這裡在定義 data 字段型別的時候需要定義為 array 型別,因為 unnest 函式需要乙個陣列型別的引數.
unnest 解析
select name,content_type,url
from kafka_table cross join unnest(`data`) as t (content_type,url)
select name,content_type,url
from kafka_table, unnest(`data`) as t (content_type,url)
select name,content_type,url
from kafka_table left join unnest(`data`) as t (content_type,url) on true
自定義 udtf 解析
自定義錶值函式(udtf),自定義錶值函式,將 0 個、1 個或多個標量值作為輸入引數(可以是變長引數)。與自定義的標量函式類似,但與標量函式不同。錶值函式可以返回任意數量的行作為輸出,而不僅是 1 個值。返回的行可以由 1 個或多個列組成。呼叫一次函式輸出多行或多列資料。必須繼承 tablefunction 基類,並實現乙個或者多個名為 eval 的方法, 在使用 udtf 時,需要帶上 lateral table兩個關鍵字.
@functionhint(output = @datatypehint("row"))
public class parserjsonarraytest extends tablefunction
} catch (exception e) }}
自定義 udtf 解析的時候,就不需要把 data 字段定義成 array 型別了,直接定義成 string 型別就可以了,並且這種方式會更加的靈活,比如還需要過濾資料或者更複雜的一些操作時都可以在 udtf 裡面完成.
flink sql 使用 udtf
select name,content_type,url
from kafka_table cross join lateral table (parserjsonarraytest(`data`)) as t (content_type,url)
select name,content_type,url
from kafka_table, lateral table (parserjsonarraytest(`data`)) as t (content_type,url)
select name,content_type,url
from kafka_table left join lateral table (parserjsonarraytest(`data`)) as t (content_type,url) on true
注意:
unnest 和 自定義 udtf 函式在使用的時候都有 3 種寫法,前面兩種寫法的效果其實是一樣的,第三種寫法相當於 left join 的用法.區別在於 cross join/inner join: 對於左側表的每一行,右側 udtf 不輸出,則這一行不輸出.left join: 對於左側表的每一行,右側 udtf 不輸出,則這一行會輸出,右側 udtf 欄位為 null
列印的結果
2> jasonlee,flink,111
2> jasonlee,spark,222
2> jasonlee,hadoop,333
總結
在實際使用的時候如果 unnest 可以滿足需求就直接用 unnest 不需要帶來額外的開發,如果 unnest 函式滿足不了需求,那麼就自定義 udtf 去完成.
flink sql 解析巢狀的 json 如此簡單
利用TYPE 實現列轉行
首先建立 bject 物件 create or replacetypeobj tableasobject fieldname varchar2 44 fieldvalue number 28,6 create or replace typetype tableas table ofobj table...
SQL 列轉行的實現
列轉行,逗號拼接指定列的值 sql server中寫法 select stuff select field1 from tablea for xml path 1,1,oracle中寫法 方法一 wmsys.wm concat select wmsys.wm concat field1 from t...
MySQL 列轉行用法實現
需求 需要將如下所示原始表資料轉為結構化的資料按行顯示 轉為結構化資料 解決方法 如果是單條記錄通過substring index容易實現,sql語句如下 select name,substring index accounts,1 account from personaccounts where...