spark關於分割槽和sortBy的學習

2021-09-26 03:38:52 字數 3705 閱讀 4730

學習目的

首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理

partitionindex1 26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50 

partitionindex3 76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100 

partitionindex0 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25 

partitionindex2 51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75

可以看到列印結果中有4個分割槽,每個分割槽的資料是有序的,與預期結果一致

直接列印全部資料

val rdd = sc.parallelize(1 to 100)

rdd.foreach(i => print(i + ","))12

輸出結果

26,76,51,27,1,2,3,4,5,28,52,77,53,29,6,7,8,30,54,78,55,31,9,10,11,32,56,57,79,58,59,33,12,13,14,15,16,17,34,60,80,61,62,63,64,35,18,19,20,21,36,65,81,66,37,22,23,24,38,67,82,68,83,39,40,41,42,43,44,45,46,47,48,25,49,84,69,85,50,86,70,87,71,88,72,89,73,90,74,91,75,92,93,94,95,96,97,98,99,100,

從輸出結果看,在每個分割槽裡的資料是有序的,但是整體輸出時是無序的,目前我所知道的原因為rdd的foreach會在每個executor執行,而不是driver,每個executor的執行是併發執行,所以看到的結果為無序

加上collect後輸出

val rdd = sc.parallelize(1 to 100)

rdd.collect().foreach(i => print(i + ","))12

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,

執行collect後,在driver端單執行緒執行,可以有序的資料

partitionindex3 123,134,152,126,171,105,99,131,172,183,125,148,178,141,174,94,147,103,101,162,153,192,102,101,167 

partitionindex2 127,129,115,77,140,150,94,124,79,124,116,143,70,86,131,74,142,77,71,153,153,155,124,84,146 

partitionindex1 46,119,69,40,95,84,128,71,51,68,76,131,67,50,103,93,121,46,127,115,109,93,124,75,136 

partitionindex0 37,86,63,98,36,30,90,79,69,28,91,95,16,53,27,56,66,41,29,23,76,78,114,84,32

將每個rdd的資料加上乙個隨機數,使得每個分割槽的資料無序

partitionindex2 98,98,98,101,102,105,106,108,108,108,109,110,110,111,118,119,121,122,124,124,126,126 

partitionindex3 128,129,135,138,139,141,141,143,149,151,153,154,158,158,161,161,161,162,164,168,172,173,175,177,179 

partitionindex1 66,66,69,70,71,72,73,75,75,75,77,78,78,79,80,81,84,84,84,86,87,87,88,90,90,92,93,93,95,95,96,97 

partitionindex0 27,29,29,30,32,33,34,39,42,43,44,44,45,46,47,48,56,59,62,62,64

可以看到使用sortby後每個分割槽的資料已經變成有序排列了

直接列印全部資料

val rdd = sc.parallelize(1 to 100)

val radmomrdd = rdd.map(i => i + random.nextint(100))//增加隨機數

radmomrdd.sortby(i => i, true).foreach(i => print(i + ","))12

3輸出結果

93,127,97,95,98,103,106,97,69,58,152,123,148,119,53,72,103,56,57,86,32,92,82,41,10,70,161,181,132,68,150,70,100,110,102,182,120,152,114,72,104,65,40,48,56,60,84,102,71,183,123,65,68,129,193,85,63,75,55,82,116,117,106,99,145,135,56,142,110,79,69,20,72,87,110,34,16,59,70,76,20,70,87,25,39,120,149,187,108,158,73,142,167,195,140,180,84,89,132,78,

可以看到整體輸出的結果是無序的,原因前面說過

加上collect後輸出

val rdd = sc.parallelize(1 to 100)

val radmomrdd = rdd.map(i => i + random.nextint(100))//增加隨機數

radmomrdd.sortby(i => i, true).collect().foreach(i => print(i + ","))12

3輸出結果

2,8,13,13,25,29,32,33,34,37,39,43,46,51,52,53,54,59,59,60,60,62,63,64,64,68,70,70,73,74,77,79,80,84,84,86,87,87,89,90,91,92,92,94,95,96,97,97,98,99,100,100,104,105,105,105,105,108,109,110,111,112,113,113,115,116,116,117,118,120,121,122,125,129,130,132,133,134,135,138,138,144,147,148,149,152,154,154,155,159,161,164,170,171,177,183,184,185,186,192,

得到了有序列表

注意點學習和測試時collect很重要,否則得到的資料可能跟預期的不一樣

在小資料集上驗證執行原理要容易些

spark關於分割槽和sortBy的學習

首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理 val sc new sparkcontext conf master設定為 local 4 利用4個執行緒 executor 來測試,模擬分布式環境 val ...

spark更改分割槽 Spark中的分割槽方法詳解

一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...

spark分割槽器

spark的分割槽器 只有涉及到 key value 型別的rdd才會用到分割槽器,因為分割槽是以key分割槽的 spark中分割槽器直接決定了rdd中分割槽的個數 rdd中每條資料經過shuffle過程屬於哪個分割槽和reduce的個數。a hashpartitioner 預設分割槽器 hash分...