flink讀取有界流時開時間窗遇到的問題

2021-10-07 16:19:52 字數 891 閱讀 4605

有界流:

不知道有沒有這個概念,我這裡用它表示以流處理的方式讀取的批資料,比如streamexecutionenvironment.fromcollection(...)

其實這種做法或需求是比較奇怪的,要用流處理,但讀的卻是批資料,最好用流處理api處理流資料,用批處理api處理批資料。

我這裡之所以有這樣***的設計,是出於批處理一次性讀取全部資料有可能會記憶體溢位的情況下考慮的。想通過流的方式讀取批資料來解決。但是後面想了想,這好像簡直是一廂情願。批量讀取資料後交給流處理api,這只是處理的過程按流的方式進行,但讀資料還是一次性讀取的,並不是流的方式一條條讀(不過這只是我個人的分析,沒有找到相關的資料,也沒有驗證),所以,這種想法太愚蠢。

上**:

public static void main(string args) throws exception 

}).timewindow(time.seconds(1))

.sum("money");

sum.print();

env.execute();

}

**很簡單,就是要根據studentid分組,然後開1s的時間窗對money進行聚合。

但現象是壓根沒有資料輸出。這裡資料量太小,當資料量大的時候,更好測試。

分析原因,是因為這種方式讀的資料是沒有時間概念的,是一次性批量讀取的。可能資料還沒有進入視窗,或還沒有達到觸發視窗的條件時,整個程式就結束了。因此不會輸出。(這可以在讀取大資料量的時候,逐步調大時間視窗大小來測試)

因此,這種讀取批資料後開時間窗進行計算的方式是絕對不可取的。

但如果不加時間直接聚合,則沒有問題。或者加計數窗countwindow,但這種需要解決最後乙個視窗資料不足時如何觸發,不然也會由於程式結束導致丟最後乙個視窗的資料。

Spring Batch CSV檔案讀取時的注意點

按照spring batch 之 sample csv檔案操作 四 的方式配置好csvitemreader,發現讀入的資料很是奇怪,通過修改配置檔案發現,commit interval 1 的時候,例程是沒有問題的.如果大於1,例如設為50,則會把第50條資料,讀50遍進來.跟蹤 的getbean ...

又到一年開堵時

今兒個真高興啊,今兒個真高興,可以回家抱寶寶去嘍。話說定的是下午兩點的飛機回北京,在虹橋機場只要聽見 飛往首都北京的旅客,我們很抱歉的通知您 心裡就咯噔一下,生怕是自己這趟晚點,還好這回是準時登機,基本上按點兒起飛了。心裡一踏實,上飛機就睡了,也沒吃加餐,也沒喝水,下午5點多降落北京t3航站樓 我心...

python讀取檔案時位址書寫

程式1 fromnumpyimport defcoatl filename converting array to list ve1 zeros 1,1024 file1 open filename foriinrange 32 line file1.readline forkinrange 32 ...