Spark優化一則 減少Shuffle

2021-07-09 15:56:00 字數 3674 閱讀 8734

破砂鍋用自己3節點的spark集群試驗了這個優化演算法,並進一步找到更快的演算法。測試資料是sogou實驗室的日誌檔案前10000000條資料。目標是對日誌第2列資料,按照第乙個字母合併,得到每個首字母有幾條記錄。

所有的方案都重新啟動spark shell,先用以下**把日誌第2列資料cache到記憶體裡,spark gui顯示cache有8個partition,約1gb記憶體。

val rdd = sc.textfile("

hdfs://hadoop1:8000/input/sogouq3.txt

").map(_.split("

\t")).map(_(1

))rdd.cache()

rdd.count()

//res1: long = 10000000

spark gui

rdd name

storage level

cached partitions

fraction cached

size in memory

size in tachyon

size on disk 3

memory deserialized 1x replicated

8100%

1089.4 mb

0.0 b

0.0 b

rdd.map(x => (x.charat(0), x)).groupbykey().mapvalues().collect()

//res2: array[(char, int)] = array((8,168189), (0,168338), (a,168228), (9,168018), (1,167647), (b,168404), (2,168731), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967), (6,167907), (f,168174), (7,168718))

spark stage gui顯示有關stage id是1-2,累計耗時5s,產生140mb shuffle read和208mb shuffle write。

stage id

description

submitted

duration

tasks: succeeded/total

shuffle read

shuffle write 1

collect at :15      

2014/09/03 20:51:58

3 s8/8                     

140.2 mb

2map at :15      

2014/09/03 20:51:55

2 s8/8                     

208.4 mb

0count at :15      

2014/09/03 20:51:46

8 s8/8                     

rdd.distinct(numpartitions = 6).map(x => (x.charat(0), 1)).reducebykey(_+_).collect()

//res2: array[(char, int)] = array((6,167907), (0,168338), (f,168174), (7,168718), (a,168228), (1,167647), (8,168189), (b,168404), (2,168731), (9,168018), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967))

spark stage gui顯示有關stage id是1-3,累計耗時4.2s,生成50mb shuffle read和75mb shuffle write。雖然多了1個stage,shuffle read/write比原始方案減少超過60%,從而速度加快16%。

stage id

description

submitted

duration

tasks: succeeded/total

shuffle read

shuffle write 1

collect at :15      

2014/09/03 20:24:17

0.2 s

6/6                     

4.9 kb

2reducebykey at :15      

2014/09/03 20:24:15

2 s6/6                     

50.4 mb

7.4 kb

3distinct at :15      

2014/09/03 20:24:13

2 s8/8                     

75.6 mb

0count at :15      

2014/09/03 20:23:55

7 s8/8                     

既然減少shuffle可以加快速度,破砂鍋想出以下的zero shuffle方案來。

rdd.map(x => (x.charat(0

), x)).countbykey()

//res2: scala.collection.map[char,long] = map(e -> 623689, 2 -> 623914, 5 -> 619840, b -> 626111, 8 -> 620738, d -> 623515, 7 -> 620222, 1 -> 616184, 4 -> 616628, a -> 641623, c -> 630514, 6 -> 621346, f -> 624447, 0 -> 632735, 9 -> 637770, 3 -> 620724)

spark stage gui顯示有關stage id是1,累計耗時只有0.3s,沒有shuffle read/write。這個方案有關的rdd只有narrow dependency,所以只有1個stage。

stage id

description

submitted

duration

tasks: succeeded/total

shuffle read

shuffle write 1

countbykey at :15      

2014/09/03 20:45:02

0.3 s

8/8                     

0count at :15      

2014/09/03 20:44:32

8 s比較3種方案方案

shuffle read

shuffle write

time

slides原始方案

140.2 mb

208.4 mb

5sslides優化方案

50.4 mb

75.6 mb

4.2s

zero shuffle優化方案00

0.3s

spark的優化之一是盡可能減少shuffle從而大幅減少緩慢的網路傳輸。熟悉rdd的函式對spark優化有很大幫助。

本文**所有權力歸原作者所有。

Spark優化一則 減少Shuffle

破砂鍋用自己3節點的spark集群試驗了這個優化演算法,並進一步找到更快的演算法。測試資料是sogou實驗室的日誌檔案前10000000條資料。目標是對日誌第2列資料,按照第乙個字母合併,得到每個首字母有幾條記錄。所有的方案都重新啟動spark shell,先用以下 把日誌第2列資料cache到記憶...

Mysql的優化一則

目的在於這麼乙個sql語句 select w.from wall w inner join wall category relation r on w.wall id r.wall id where r.category level1 id 39 and w.is online 1 order by...

Mysql的優化一則

目的在於這麼乙個sql語句 1 select w.from wall w inner join wall category relation r on w.wall id r.wall id where r.category level1 id 39 and w.is online 1 order ...