flink 建立dataset的幾種方式
package woaixuexi
import aa.myclass
import org.apache.flink.api.scala._
import org.apache.flink.configuration.configuration
/** * @author: 帥逼 * @description:
* @date: create in 20:36 2019/7/17
*/object woaixuexi
//從collection中建立
def fromcollection(env: executionenvironment): unit =
//從檔案中建立
def fromtextfile(env: executionenvironment): unit =
//從csv檔案中建立
def fromcsvfile(env: executionenvironment): unit =
case class myclas1(a:int,b:int)
//遞迴讀取資料夾下檔案建立dataset
def fromrefile(env: executionenvironment): unit =
//從壓縮檔案中讀取
/***
** 讀壓縮檔案
* flink目前支援輸入檔案的透明解壓縮,如果它們標有適當的副檔名。
* 特別是,這意味著不需要進一步配置輸入格式,並且任何fileinputformat支援壓縮,
* 包括自定義輸入格式。請注意,壓縮檔案可能無法並行讀取,從而影響作業可伸縮性。
** 下表列出了當前支援的壓縮方法。**
** 壓縮方法 副檔名 可並行
* deflate .deflate 沒有
* gzip壓縮 .gz, .gzip 沒有
* bzip2的 .bz2 沒有
* xz .xz 沒有**
**/
def fromcompressionfile(env: executionenvironment): unit =
}
強型別的DataSet
dataset大家都比較熟悉了,它就是我們所說的離線資料集。但是不能儲存大量的資料,我們可以使用強型別的dataset也可以使用弱型別的dataset.弱型別的dataset使用如下 dataset dataset new dataset 建立乙個dataset 取得連線字串 string conn...
Flink原理與實現 詳解Flink中的狀態管理
上面flink原理與實現的文章中,有引用word count的例子,但是都沒有包含狀態管理。也就是說,如果乙個task在處理過程中掛掉了,那麼它在記憶體中的狀態都會丟失,所有的資料都需要重新計算。從容錯和訊息處理的語義上 at least once,exactly once flink引入了stat...
Flink學習筆記(六) flink的運算元與富函式
一 flink中的的transformation運算元 flink常用運算元就不自己詳細記錄了,看這裡就夠了。二 富函式 在呼叫datastream的運算元例如map filter時,可以傳入乙個函式,也可以傳入乙個function類,就像這樣 val filterstream stream.fil...