sparksession.readstream()
返回乙個datastreamreader
介面物件,可以通過該物件對輸入源進行引數配置,最後返回dataframe/dataset物件。
val csvdf = spark
.readstream
.option("sep", ";")
.schema(userschema)
.csv("/path/to/directory")
val inputstream = spark.readstream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "testss")
.load()
val socketdf = spark
.readstream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
具體輸入配置參考建立
complete
模式:每次會把整個result table
輸出,所以只支援聚合操作。
update
模式:只有更新的資料才會輸出到輸出端(記憶體中維護了上次觸發後的結果)。
不同的流查詢操作支援不同的輸出模式,如下表所示:
查詢型別
支援的模式
原因非聚合操作
update
complete
模式不支援是因為需要在result table
中維護所有資料,這是不太現實的
基於watermark的視窗聚合操作
update
complete
其他聚合操作
update
complete
writestream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
writestream
.foreach(...)
.start()
writestream
.format("console")
.start()
writestream
.format("memory")
.queryname("tablename")
.start()
val query = wordcounts.writestream.trigger(processingtime(5.seconds))
.outputmode("complete")
.foreach(new foreachwriter[row]
override def
close
(errorornull: throwable): unit =
override def
open
(partitionid: long, version: long): boolean = "))
filewriter = new filewriter(new file(s"/tmp/example/$/temp"))
true
}}).start()
Structured Streaming 開發入門
structured streaming 作為 spark 家族的新成員,通過 spark sql dataframe 來處理 batch streaming 資料,基本的 sparksql api 即可實現離線處理和流式處理,大大的方便了流式計算的開發,另外還提供了豐富的功能。structured...
Linux shell shell的輸入與輸出
大多數使用標準輸入的命令都指定乙個檔案作為標準輸入 1.echo echo hello word 將輸出hello word 如果想把hello word輸出到檔案中中 使用重定向符號 下面命令將helloword字元寫入myfile檔案中 echo hello word myfile 2.read...
CPrimerPlus學習(十三) 檔案輸入輸出
程式清單13.1 count.c程式 count.c 使用標準 i o include include 提供 exit 的原型 intmain int argc,char ar if fp fopen ar 1 r null while ch getc fp eof fclose fp printf...