data sources 是什麼呢?就字面意思其實就可以知道:資料**。 flink 做為一款流式計算框架,它可用來做批處理,即處理靜態的資料集、歷史的資料 集;也可以用來做流處理,即實時的處理些實時資料流,實時的產生資料流結果,只要資料 源源不斷 的過來,flink 就能夠一直計算下去,這個 data sources 就是資料的**地。 flink 在批處理中常見的 source 主要有兩大類。
基於本地集合的 source(collection-based-source)
基於檔案的 source(file-based-source)
在 flink 最常見的建立 dataset 方式有三種。
使用 env.fromelements(),這種方式也支援 tuple,自定義物件等復合形式。
使用 env.fromcollection(),這種方式支援多種 collection 的具體型別
使用 env.generatesequence()方法建立基於 sequence 的 dataset
package com.czxy.flink.batch.source.collection
import org.apache.flink.api.scala.executionenvironment
//使用 env.fromelements(), 這種方式也支援 tuple, 自定義物件等復合形式
object batchfromelementsdemo
}
常見的建立dataset方法
import org.apache.flink.api.scala.executionenvironment
import scala.collection.mutable
import scala.collection.mutable.
/**
* 讀取集合中的批次資料
*/object batchfromcollection
}
1.讀取本地檔案
2.讀取 hdfs 資料
3.讀取 csv 資料
4.讀取壓縮檔案
對於以下壓縮型別,不需要指定任何額外的 inputformat 方法,flink 可以自動識別並 且解 壓。但是,壓縮檔案可能不會並行讀取,可能是順序讀取的,這樣可能會影響作業的 可伸縮性。
樣例**
package com.czxy.flink.batch.source.file
import org.apache.flink.api.scala.
//讀取本地檔案
object batchfromlocalfilesource
}
**實現:
import org.apache.flink.api.scala.
import org.apache.flink.api.scala._
/*** 從檔案構建資料來源
* 1.本地txt檔案
* 2.本地csc檔案
* 3.從hdfs檔案
* 4.壓縮包檔案
*/object batchsourcefromfile
}
flink 支援對乙個檔案目錄內的所有檔案,包括所有子目錄中的所有檔案的遍歷訪問方 式。對於從檔案中讀取資料,當讀取的數個資料夾的時候,巢狀的檔案預設是不會被讀取的, 只會讀取第乙個檔案,其他的都會被忽略。所以我們需要使用 recursive.file.enumeration 進 行遞迴讀取。
import org.apache.flink.api.scala.
import org.apache.flink.configuration.configuration
/** 遍歷目錄的批次資料 */
object batchfromfolder
}
Keras打亂輸入資料集
實驗資料集 2096 351,第一列為y,餘下350列為特徵x 分割輸入x和輸出y x dataset 1 351 y dataset 0 打亂訓練集 index i for i in range len dataset 下面這種寫法也可以 index np.arange len dataset n...
Flink學習筆記(五) flink資料合流
上一章記錄了flink的分流操作,那麼有分流是不是應該有合流呢?當然是有這樣的操作啦 stream1和stream2流需要合併為stream流 1.union合流 2.connect合流 前置配置 streamexecutionenvironment env streamexecutionenvir...
大資料技術 Flink
它既能保證資料一致性 exactly once 又能實時快速的處理海量資料。與生俱來的 watermark 功能讓它能對複雜資料亂序場景應對自如,它充分體現了 批 流 一體的完美結合同時又代表著 流 表 二象性的和諧統一。兩種資料集 無邊界資料集 連續不斷追加 和有邊界資料集 兩種執行模式 流式傳輸...