學習目的
首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理
partitionindex1 26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50
partitionindex3 76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100
partitionindex0 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25
partitionindex2 51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75
可以看到列印結果中有4個分割槽,每個分割槽的資料是有序的,與預期結果一致
直接列印全部資料
val rdd = sc.parallelize(1 to 100)
rdd.foreach(i => print(i + ","))12
輸出結果
26,76,51,27,1,2,3,4,5,28,52,77,53,29,6,7,8,30,54,78,55,31,9,10,11,32,56,57,79,58,59,33,12,13,14,15,16,17,34,60,80,61,62,63,64,35,18,19,20,21,36,65,81,66,37,22,23,24,38,67,82,68,83,39,40,41,42,43,44,45,46,47,48,25,49,84,69,85,50,86,70,87,71,88,72,89,73,90,74,91,75,92,93,94,95,96,97,98,99,100,
從輸出結果看,在每個分割槽裡的資料是有序的,但是整體輸出時是無序的,目前我所知道的原因為rdd的foreach會在每個executor執行,而不是driver,每個executor的執行是併發執行,所以看到的結果為無序
加上collect後輸出
val rdd = sc.parallelize(1 to 100)
rdd.collect().foreach(i => print(i + ","))12
1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,
執行collect後,在driver端單執行緒執行,可以有序的資料
partitionindex3 123,134,152,126,171,105,99,131,172,183,125,148,178,141,174,94,147,103,101,162,153,192,102,101,167
partitionindex2 127,129,115,77,140,150,94,124,79,124,116,143,70,86,131,74,142,77,71,153,153,155,124,84,146
partitionindex1 46,119,69,40,95,84,128,71,51,68,76,131,67,50,103,93,121,46,127,115,109,93,124,75,136
partitionindex0 37,86,63,98,36,30,90,79,69,28,91,95,16,53,27,56,66,41,29,23,76,78,114,84,32
將每個rdd的資料加上乙個隨機數,使得每個分割槽的資料無序
partitionindex2 98,98,98,101,102,105,106,108,108,108,109,110,110,111,118,119,121,122,124,124,126,126
partitionindex3 128,129,135,138,139,141,141,143,149,151,153,154,158,158,161,161,161,162,164,168,172,173,175,177,179
partitionindex1 66,66,69,70,71,72,73,75,75,75,77,78,78,79,80,81,84,84,84,86,87,87,88,90,90,92,93,93,95,95,96,97
partitionindex0 27,29,29,30,32,33,34,39,42,43,44,44,45,46,47,48,56,59,62,62,64
可以看到使用sortby後每個分割槽的資料已經變成有序排列了
直接列印全部資料
val rdd = sc.parallelize(1 to 100)
val radmomrdd = rdd.map(i => i + random.nextint(100))//增加隨機數
radmomrdd.sortby(i => i, true).foreach(i => print(i + ","))12
3輸出結果
93,127,97,95,98,103,106,97,69,58,152,123,148,119,53,72,103,56,57,86,32,92,82,41,10,70,161,181,132,68,150,70,100,110,102,182,120,152,114,72,104,65,40,48,56,60,84,102,71,183,123,65,68,129,193,85,63,75,55,82,116,117,106,99,145,135,56,142,110,79,69,20,72,87,110,34,16,59,70,76,20,70,87,25,39,120,149,187,108,158,73,142,167,195,140,180,84,89,132,78,
可以看到整體輸出的結果是無序的,原因前面說過
加上collect後輸出
val rdd = sc.parallelize(1 to 100)
val radmomrdd = rdd.map(i => i + random.nextint(100))//增加隨機數
radmomrdd.sortby(i => i, true).collect().foreach(i => print(i + ","))12
3輸出結果
2,8,13,13,25,29,32,33,34,37,39,43,46,51,52,53,54,59,59,60,60,62,63,64,64,68,70,70,73,74,77,79,80,84,84,86,87,87,89,90,91,92,92,94,95,96,97,97,98,99,100,100,104,105,105,105,105,108,109,110,111,112,113,113,115,116,116,117,118,120,121,122,125,129,130,132,133,134,135,138,138,144,147,148,149,152,154,154,155,159,161,164,170,171,177,183,184,185,186,192,
得到了有序列表
注意點學習和測試時collect很重要,否則得到的資料可能跟預期的不一樣
在小資料集上驗證執行原理要容易些
spark關於分割槽和sortBy的學習
首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理 val sc new sparkcontext conf master設定為 local 4 利用4個執行緒 executor 來測試,模擬分布式環境 val ...
spark更改分割槽 Spark中的分割槽方法詳解
一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...
spark分割槽器
spark的分割槽器 只有涉及到 key value 型別的rdd才會用到分割槽器,因為分割槽是以key分割槽的 spark中分割槽器直接決定了rdd中分割槽的個數 rdd中每條資料經過shuffle過程屬於哪個分割槽和reduce的個數。a hashpartitioner 預設分割槽器 hash分...