val input = sc.textfile(file_path)
val wordsrdd = input.map(x => x.split(
" ")
)# method 1
val result = wordsrdd.map(x =>
(x,1))
.reducebykey(
(x, y)
=> x + y)
# method 2
val result = wordsrdd.countbyvalue(
)
rdd.groupbykey(
).map(value => value.reduce(func)
) # 需要為每個鍵建立存放值的列表,占用記憶體
等價於
rdd.reducebykey(func) # 比前者更高效
例:
對於rdda.join(rddb)
操作,該操作會先將rdda和rddb中的值全部求出來,將雜湊值相同的記錄通過網路傳到同一臺機器上,再在各個機器上對鍵相同的元素進行連線操作,即歷經「rdda的hash值計算+跨資料混洗+相同鍵返回」與「rddb的hash值計算+跨資料混洗+相同鍵返回」兩大步驟,再比較低效。
但若先對rdda執行自定義分割槽,即
rdda = rdda.partitionbykey(
new hashpartitioner(n)
).persist(
)
其中n為分割槽個數,persist為持久化操作(必須有,否則後續每次用到rdda時都會重複對資料做分割槽操作,相當於把partitionbykey的作用抵消了,和之前未自定義分割槽效果一樣)。從而join操作時不會對rdda再進行混洗,只對rdda做本地引用,對rddb做混洗,網路傳輸僅存在於混洗rddb上。 spark 學習筆記
最近使用spark簡單的處理一些實際中的場景,感覺簡單實用,就記錄下來了。部門使用者業績表 1000w測試資料 使用者 部門 業績 資料載入 val context new sparkcontext conf var data context.textfile data.txt 場景1 求每個部門的...
spark學習筆記
1 缺省會寫成一堆小檔案,需要將其重新分割槽,直接指定幾個分割槽 spark.sql select row number over partition by depid order by salary rownum from emp repartition 2 write.parquet hdfs ...
Spark學習筆記
spark不僅僅支援mapreduce,還支援sql machine learning graph運算等,比起hadoop應用更靈活寬泛。spark 中的rdd 資料結構應對mapreduce中data replication disk io serialization引起的低效問題。rdd 類似於...