table 的建立方式
# dataset 和datastream 隱式轉化(case
class)
# sink table
# source table
# externalcatalog table
# table api 用法
val orders = tableenv.scan(
"orders"
)val revenue = orders .filter(
'ccountry === "france") .groupby('cid,
'cname'
).select(
'cid, 'cname,
'revenue.sum as 'revsum)
# sql 用法
tableenv.sqlquery(
"select name,sum(age) from t_table group by name"
)
注意
轉化dataset
datastream
tableapi
sql query
dataset--
totable隱式轉化,fromdataset
registerdataset登錄檔
datastream--
totable隱式轉化,fromdatastream
registerdatastream登錄檔
tableapi
todataset[row]
-registertable
sql query
todataset(string, int)
toretractstream
scan
-
# dataset和datastream 註冊成 sqlquery table
//register the datastream as table "mytable"
with fields "f0"
,"f1"
tableenv.registerdatastream(
"mytable"
, stream)
// register the datastream as table "mytable2"
with fields "mylong"
,"mystring"
tableenv.registerdatastream(
"mytable2"
, stream,
'mylong, 'mystring)
# dataset和datastream 轉化成table
// convert the datastream into a table with default fields '_1, '_2
val table1: table = tableenv.fromdatastream(stream)
// convert the datastream into a table with fields 'mylong, 'mystring
val table2: table = tableenv.fromdatastream(stream,
'mylong, 'mystring)
# table轉轉成dataset和datastream
// convert the table into a dataset of row
val dsrow: dataset[row]
= tableenv.todataset[row]
(table)
// convert the table into a dataset of tuple2[string, int]
val dstuple: dataset[
(string, int)
]= tableenv.todataset[
(string, int)
](table)
val dsrow: datastream[row]
(table)
val dstuple: datastream[
(string, int)
(string, int)
](table)
# 登錄檔轉和table api 轉化
// sql to table api
tableenv.registertable(
"table1",.
..) val tapiresult = tableenv.scan(
"table1"
).select(..
.)//from table api to sql
val projtable: table = tableenv.scan(
"x")
.select(..
.) tableenv.registertable(
"projectedtable"
, projtable)
測試案列
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.tableenvironment
import org.apache.flink.table.api.scala._
object tableapitest
case
class order(user:
long
,product:
string
,amount:
int)
}
Flink 運算元Operators總結
operator 作用流的轉換 map將乙個元素轉換成另外乙個元素 datastream datastream本 flapmap 將幾個的乙個元素轉換為零個,乙個或者多個 datastream datastream filter 保留集合中返回true的元素 datastream datastrea...
Flink學習筆記(六) flink的運算元與富函式
一 flink中的的transformation運算元 flink常用運算元就不自己詳細記錄了,看這裡就夠了。二 富函式 在呼叫datastream的運算元例如map filter時,可以傳入乙個函式,也可以傳入乙個function類,就像這樣 val filterstream stream.fil...
Flink的Aggregate運算元的用法
如果定義了window assigner 之後,下一步就可以定義視窗內資料的計算邏輯,這也就是 window function 的定義。flink 中提供了四種型別的 window function 分別為reducefunction aggregatefunction 以及 processwind...