accumulator簡介
accumulator是spark提供的累加器,顧名思義,該變數只能夠增加。
只有driver能獲取到accumulator的值(使用value方法),task只能對其做增加操作(使用 +=)。你也可以在為accumulator命名(不支援python),這樣就會在spark web ui中顯示,可以幫助你了解程式執行的情況。
accumulator使用
使用示例
舉個最簡單的accumulator的使用例子:
//在driver中定義
val accum = sc.accumulator(0, "example accumulator")
//在task中進行累加
sc.parallelize(1 to 10).foreach(x=> accum += 1)
//在driver中輸出
accum.value
//結果將返回10
res: 10
累加器的錯誤用法
val accum= sc.accumulator(0, "error accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1
val newdata = data.mapelse 1
}}
看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的準確性。
事實上,還是有解決方案的,只要將任務之間的依賴關係切斷就可以了。什麼方法有這種功能呢?你們肯定都想到了,cache,persist。呼叫這個方法的時候會將之前的依賴切除,後續的累加器就不會再被之前的transfrom操作影響到了。
//
val accum= sc.accumulator(0, "error accumulator")
val data = sc.parallelize(1 to 10)
//**和上方相同
val newdata = data.map}
//使用cache快取資料,切斷依賴。
newdata.cache.count
//此時accum的值為5
accum.value
newdata.foreach(println)
//此時的accum依舊是5
accum.value
總結
使用accumulator時,為了保證準確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴。
Spark中executor memory引數詳解
我們知道,spark執行的時候,可以通過 executor memory 來設定executor執行時所需的memory。但如果設定的過大,程式是會報錯的,如下 555.png 那麼這個值最大能設定多少呢?本文來分析一下。文中安裝的是spark1.6.1,安裝在hadoop2.7上。1 相關的2個引...
Spark基礎(三)Spark中的任務執行
容錯機制 spark的架構特點 根據客戶端提交的jar包劃分出來乙個個的rdd,根據rdd之間的lineage關係劃分dag。劃分dag的目的是為了劃分stage。2 dag通過dagscheller劃分為stage 再劃分為taskset 根據劃分出來的dag,將dag送個dagscheduler...
spark更改分割槽 Spark中的分割槽方法詳解
一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...