鍵值對rdd也叫做pair rdd
把乙個普通 rdd轉換為pair rdd的時候使用map()函式來實現
#使用第乙個單詞作為乙個鍵,建立乙個pair rdd
pairs=lines.map(lambda x:(x.split(" ")[0],x))
reducebykey(func)合併具有相同鍵的值
rdd.reducebykey((x, y) => x + y)
groupbykey()對具有相同鍵的值進行分組
rdd.groupbykey()
生成的資料示例如下
combinebykey(createcombiner,mergevalue,mergecombiners,partitioner)
使用不同的返回型別合併具有相同鍵的值
mapvalues(func)
對pair rdd 中的每個值應用乙個函式而不改變鍵
rdd.mapvalues(x => x+1)
flatmapvalues(func)對pair rdd 中的每個值應用乙個返回迭代器的函式,然後對返回的每個元素都生成乙個對應原鍵的鍵值對記錄。通常用於符號化
a.flatmapvalues(_.split(",")).collect()
(animal,pig), (animal,cat), (animal,dog), (animal,tiger))
keys()
返回乙個僅包含鍵的rdd
values()
返回乙個僅包含值的rdd
sortbykey()
返回乙個根據鍵排序的rdd
函式接受乙個ascending引數,表示是否按公升序排序,預設是true
也可以自定義排序順序
#比如雖然rdd是整數,但是利用字串排序
rdd.sortbykey(ascending=true, numpartitions=none, keyfunc = lambda x: str(x))
操作的兩個鍵
(rdd = other = )
subtractbykey
刪掉rdd 中鍵與other rdd 中的鍵相同的元素
rdd.subtractbykey(other)
結果
join
對兩個rdd 進行內聯結
普通的join操作符表示內連線,只有在兩個pair rdd都存在鍵時才輸出。當乙個輸入對應的鍵有多個值時,生成的pair rdd會包括來自兩個輸入rdd的每一組相對應的記錄
rdd.join(other)
結果
rightouterjoin
對兩個rdd 進行連線操作,確保第二個rdd 的鍵必須存在(右外連線)
leftouterjoin
對兩個rdd 進行連線操作,確保第乙個rdd 的鍵必須存在(左外連線)
var rdd1 = sc.makerdd(array(("a","1"),("b","2"),("c","3")),2)
var rdd2 = sc.makerdd(array(("a","a"),("c","c"),("d","d")),2)
rdd1.rightouterjoin(rdd2).collect
結果
array[(string, (option[string], string))] = array((d,(none,d)), (a,(some(1),a)), (c,(some(3),c)))
cogroup
將兩個rdd 中擁有相同鍵的資料分組到一起
rdd.cogroup(other)
結果
reducebykey()
reducebykey()與reduce()類似,reducebykey() 會為資料集中的每個鍵進行並行的歸約操作,每個歸約操作會將鍵相同的值合併起來
foldbykey()
與fold() 一樣,foldbykey() 操作所使用的合併函式對零值與另乙個元素進行合併,結果仍為該元素。
rdd
.mapvalues(lambda
x: (x, 1)).reducebykey(lambda
x, y: (x
[0] + y
[0], x
[1] + y
[1]))
#利用reducebykey實現單詞計數
sc = sparkcontext("local","test")
rdd=sc.textfile("file:///g:/reference/source/spark/text.txt")
words=rdd.flatmap(lambda x:x
.split(" "))
print(words.map(lambda x:(x,1)).take(5))
result=words.map(lambda x:(x,1)).reducebykey(lambda x,y:x+y)
#也可以利用words.countbyvalue()直接進行單詞計數
combinebykey()
#完整的函式定義
defcombinebykey
(self, createcombiner, mergevalue, mergecombiners,
numpartitions=none, partitionfunc=portable_hash):
combinebykey() 會遍歷分割槽中的所有元素,每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。遇到新元素時,combinebykey() 會使用乙個叫作createcombiner() 的函式來建立那個鍵對應的累加器的初始值,如果這是乙個在處理當前分割槽之前已經遇到的鍵,它會使用mergevalue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併,如果有多個分割槽,需要使用mergecombiners()將各個分割槽結果合併
#利用combinebykey()求平均值
#第乙個匿名函式用於生產累加器,第二個匿名函式則處理已有累加器key的value,第三個則用於處理跨分割槽
spark中很多函式都是都是在combinebykey()基礎上實現的
也可以為rdd指定分割槽數進行調優
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reducebykey(lambda x, y: x + y, 10)
也可以使用repartition()或者coalesce()在除分組操作和聚合操作之外也能改變分割槽,coalesce()是repartition()優化版
groupbykey()
使用rdd 中的鍵來對資料進行
分組。對於乙個由型別k 的鍵和型別v 的值組成的rdd,所得到的結果rdd 型別會是[k, iterable[v]]。
groupby()
用於未成對資料上,可以根據除鍵以外的條件分組,它可以接收乙個函式,對源rdd中的每個元素使用該函式將返回結果作為鍵再進行分組
cogroup()
對多個共享同乙個鍵的rdd 進行分組,如果鍵值不是同一種型別則生成如下的的rdd
[(k, (iterable[v], iterable[w]))]
countbykey()對每個鍵對應的元素分別計數
collectasmap()
將結果以對映表的形式返回,以便查詢
lookup(key)
返回給定鍵對應的所有值
Spark學習筆記 鍵值對操作
鍵值對 rdd是 spark 中許多操作所需要的常見資料型別 鍵值對 rdd 通常用來進行聚合計算。我們一般要先通過一些初始etl 抽取 轉化 裝載 操作來將資料轉化為鍵值對形式。spark 為包含鍵值對型別的 rdd 提供了一些專有的操作。1.建立pair rdd val input sc.par...
Spark學習之鍵值對(pair RDD)操作
1 讀取本身就是鍵值對的資料 2 乙個普通的rdd通過map 轉為pair rdd,傳遞的函式需要返回鍵值對。python中使用第乙個單詞作為鍵建立出乙個pair rddpairs lines.amp lambda x x.split 0 x scala中使用第乙個單詞作為鍵建立出乙個pair rd...
Spark學習筆記 (3 鍵值對操作 )
spark學習筆記3 鍵值對操作 鍵值對rdd通常用來進行聚合計算,spark為包含鍵值對型別的rdd提供了一些專有的操作spark中建立pair rdd的方法 1 儲存鍵值對的資料格式會在讀取時直接返回由其鍵值對資料組成的pair rdd 2 還可以使用map 函式將乙個普通的rdd轉為pair ...