Flink 建立DataSet的幾種方式

2021-09-25 08:41:42 字數 1186 閱讀 6690

flink 建立dataset的幾種方式

package woaixuexi

import aa.myclass

import org.apache.flink.api.scala._

import org.apache.flink.configuration.configuration

/** * @author: 帥逼 * @description:

* @date: create in 20:36 2019/7/17

*/object woaixuexi

//從collection中建立

def fromcollection(env: executionenvironment): unit =

//從檔案中建立

def fromtextfile(env: executionenvironment): unit =

//從csv檔案中建立

def fromcsvfile(env: executionenvironment): unit =

case class myclas1(a:int,b:int)

//遞迴讀取資料夾下檔案建立dataset

def fromrefile(env: executionenvironment): unit =

//從壓縮檔案中讀取

/***

** 讀壓縮檔案

* flink目前支援輸入檔案的透明解壓縮,如果它們標有適當的副檔名。

* 特別是,這意味著不需要進一步配置輸入格式,並且任何fileinputformat支援壓縮,

* 包括自定義輸入格式。請注意,壓縮檔案可能無法並行讀取,從而影響作業可伸縮性。

** 下表列出了當前支援的壓縮方法。**

** 壓縮方法 副檔名 可並行

* deflate .deflate 沒有

* gzip壓縮 .gz, .gzip 沒有

* bzip2的 .bz2 沒有

* xz .xz 沒有**

**/

def fromcompressionfile(env: executionenvironment): unit =

}

強型別的DataSet

dataset大家都比較熟悉了,它就是我們所說的離線資料集。但是不能儲存大量的資料,我們可以使用強型別的dataset也可以使用弱型別的dataset.弱型別的dataset使用如下 dataset dataset new dataset 建立乙個dataset 取得連線字串 string conn...

Flink原理與實現 詳解Flink中的狀態管理

上面flink原理與實現的文章中,有引用word count的例子,但是都沒有包含狀態管理。也就是說,如果乙個task在處理過程中掛掉了,那麼它在記憶體中的狀態都會丟失,所有的資料都需要重新計算。從容錯和訊息處理的語義上 at least once,exactly once flink引入了stat...

Flink學習筆記(六) flink的運算元與富函式

一 flink中的的transformation運算元 flink常用運算元就不自己詳細記錄了,看這裡就夠了。二 富函式 在呼叫datastream的運算元例如map filter時,可以傳入乙個函式,也可以傳入乙個function類,就像這樣 val filterstream stream.fil...