如果乙個task在處理過程中掛掉了,那麼它在記憶體中的狀態都會丟失,所有的資料都需要重新計算。那麼我就需要乙個東西儲存歷史狀態state。
首先區分一下兩個概念,state一般指乙個具體的task/operator的狀態。而checkpoint則表示了乙個job,在乙個特定時刻的乙份全域性狀態快照,即包含了所有task/operator的狀態。我們在這裡討論的是state。
updatestatebykey
updatestatebykey會統計全域性的key的狀態,不管又沒有資料輸入,它會在每乙個批次間隔返回之前的key的狀態。updatestatebykey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函式操作。如果通過更新函式對state更新後返回來為none,此時刻key對應的state狀態會被刪除(state可以是任意型別的資料的結構)。
mapwithstate
mapwithstate也會統計全域性的key的狀態,但是如果沒有資料輸入,便不會返回之前的key的狀態,類似於增量的感覺。
updatestatebykey可以在指定的批次間隔內返回之前的全部歷史資料,包括新增的,改變的和沒有改變的。由於updatestatebykey在使用的時候一定要做checkpoint,當資料量過大的時候,checkpoint會佔據龐大的資料量,會影響效能,效率不高。
mapwithstate只返回變化後的key的值,這樣做的好處是,我們可以只是關心那些已經發生的變化的key,對於沒有資料輸入,則不會返回那些沒有變化的key的資料。這樣的話,即使資料量很大,checkpoint也不會像updatestatebykey那樣,占用太多的儲存,效率比較高(再生產環境中建議使用這個)。
updatestatebykey示例:
def updatefunction(currvalues:seq[int],prevalue:option[int]): option[int] =kafkastream.map(r => (r._2,1)).updatestatebykey(updatefunction _)
這裡的updatefunction方法就是需要我們自己去實現的狀態跟新的邏輯,currvalues就是當前批次的所有值,prevalue是歷史維護的狀態,updatestatebykey返回的是包含歷史所有狀態資訊的dstream。
mapwithstate示例:
val initialrdd =ssc.sparkcontext.parallelize(list[(string, int)]())
// val sum = count.getorelse(0) + state.getoption.getorelse(0)
val output =(word, sum)
state.update(sum)
output
}//呼叫mapwithstate進行管理流資料的狀態
vuex的state狀態倉庫管理
vuex,在官網上的解釋是 vuex是乙個專為vue.js應用程式開發的狀態管理模式。它採用集中式儲存管理應用的所有元件的狀態,並以相應的規則保證狀態以一種可 的方式發生變化 state就是vuex中的資料倉儲,用於儲存所有元件的公共資料,資料需初始化且不支援直接修改。直接獲取state中的資料只需...
狀態 State 模式
物件狀態影響物件行為 物件擁有不同的狀態,往往會行使不同的行為.1 動機 在軟體構建過程中,某些物件的狀態如果改變,其行為也會隨之而發生變化。比如文件處於唯讀狀態,其支援的行為和讀寫狀態支援的行為就可能完全不同。如何在執行時根據物件的狀態來透明地更改物件的行為?而不會為物件操作和狀態轉化之前引入緊耦...
狀態模式 State
個人理解 核心是context維護乙個當前狀態,並在invoke狀態方法時,將context維護的當前狀態更新至下一狀態 uml類圖 實現 using system namespace decoratormode public class agecontext public void printag...