spark鍵值對操作 一

2021-08-07 11:46:10 字數 4618 閱讀 7335

鍵值對rdd也叫做pair rdd

把乙個普通 rdd轉換為pair rdd的時候使用map()函式來實現

#使用第乙個單詞作為乙個鍵,建立乙個pair rdd

pairs=lines.map(lambda x:(x.split(" ")[0],x))

reducebykey(func)合併具有相同鍵的值

rdd.reducebykey((x, y) => x + y)

groupbykey()對具有相同鍵的值進行分組

rdd.groupbykey()
生成的資料示例如下

combinebykey(createcombiner,mergevalue,mergecombiners,partitioner)

使用不同的返回型別合併具有相同鍵的值

mapvalues(func)

對pair rdd 中的每個值應用乙個函式而不改變鍵

rdd.mapvalues(x => x+1)
flatmapvalues(func)對pair rdd 中的每個值應用乙個返回迭代器的函式,然後對返回的每個元素都生成乙個對應原鍵的鍵值對記錄。通常用於符號化

a.flatmapvalues(_.split(",")).collect()

(animal,pig), (animal,cat), (animal,dog), (animal,tiger))

keys()

返回乙個僅包含鍵的rdd

values()

返回乙個僅包含值的rdd

sortbykey()

返回乙個根據鍵排序的rdd

函式接受乙個ascending引數,表示是否按公升序排序,預設是true

也可以自定義排序順序

#比如雖然rdd是整數,但是利用字串排序

rdd.sortbykey(ascending=true, numpartitions=none, keyfunc = lambda x: str(x))

操作的兩個鍵

(rdd = other = )

subtractbykey

刪掉rdd 中鍵與other rdd 中的鍵相同的元素

rdd.subtractbykey(other)
結果

join

對兩個rdd 進行內聯結

普通的join操作符表示內連線,只有在兩個pair rdd都存在鍵時才輸出。當乙個輸入對應的鍵有多個值時,生成的pair rdd會包括來自兩個輸入rdd的每一組相對應的記錄

rdd.join(other)
結果

rightouterjoin

對兩個rdd 進行連線操作,確保第二個rdd 的鍵必須存在(右外連線)

leftouterjoin

對兩個rdd 進行連線操作,確保第乙個rdd 的鍵必須存在(左外連線)

var rdd1 = sc.makerdd(array(("a","1"),("b","2"),("c","3")),2)

var rdd2 = sc.makerdd(array(("a","a"),("c","c"),("d","d")),2)

rdd1.rightouterjoin(rdd2).collect

結果

array[(string, (option[string], string))] = array((d,(none,d)), (a,(some(1),a)), (c,(some(3),c)))

cogroup

將兩個rdd 中擁有相同鍵的資料分組到一起

rdd.cogroup(other)
結果

reducebykey()

reducebykey()與reduce()類似,reducebykey() 會為資料集中的每個鍵進行並行的歸約操作,每個歸約操作會將鍵相同的值合併起來

foldbykey()

與fold() 一樣,foldbykey() 操作所使用的合併函式對零值與另乙個元素進行合併,結果仍為該元素。

rdd

.mapvalues(lambda

x: (x, 1)).reducebykey(lambda

x, y: (x

[0] + y

[0], x

[1] + y

[1]))

#利用reducebykey實現單詞計數

sc = sparkcontext("local","test")

rdd=sc.textfile("file:///g:/reference/source/spark/text.txt")

words=rdd.flatmap(lambda x:x

.split(" "))

print(words.map(lambda x:(x,1)).take(5))

result=words.map(lambda x:(x,1)).reducebykey(lambda x,y:x+y)

#也可以利用words.countbyvalue()直接進行單詞計數

combinebykey()

#完整的函式定義

defcombinebykey

(self, createcombiner, mergevalue, mergecombiners,

numpartitions=none, partitionfunc=portable_hash):

combinebykey() 會遍歷分割槽中的所有元素,每個元素的鍵要麼還沒有遇到過,要麼就和之前的某個元素的鍵相同。遇到新元素時,combinebykey() 會使用乙個叫作createcombiner() 的函式來建立那個鍵對應的累加器的初始值,如果這是乙個在處理當前分割槽之前已經遇到的鍵,它會使用mergevalue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併,如果有多個分割槽,需要使用mergecombiners()將各個分割槽結果合併

#利用combinebykey()求平均值

#第乙個匿名函式用於生產累加器,第二個匿名函式則處理已有累加器key的value,第三個則用於處理跨分割槽

spark中很多函式都是都是在combinebykey()基礎上實現的

也可以為rdd指定分割槽數進行調優

data = [("a", 3), ("b", 4), ("a", 1)]

sc.parallelize(data).reducebykey(lambda x, y: x + y, 10)

也可以使用repartition()或者coalesce()在除分組操作和聚合操作之外也能改變分割槽,coalesce()是repartition()優化版

groupbykey()

使用rdd 中的鍵來對資料進行

分組。對於乙個由型別k 的鍵和型別v 的值組成的rdd,所得到的結果rdd 型別會是[k, iterable[v]]。

groupby()

用於未成對資料上,可以根據除鍵以外的條件分組,它可以接收乙個函式,對源rdd中的每個元素使用該函式將返回結果作為鍵再進行分組

cogroup()

對多個共享同乙個鍵的rdd 進行分組,如果鍵值不是同一種型別則生成如下的的rdd

[(k, (iterable[v], iterable[w]))]
countbykey()對每個鍵對應的元素分別計數

collectasmap()

將結果以對映表的形式返回,以便查詢

lookup(key)

返回給定鍵對應的所有值

Spark學習筆記 鍵值對操作

鍵值對 rdd是 spark 中許多操作所需要的常見資料型別 鍵值對 rdd 通常用來進行聚合計算。我們一般要先通過一些初始etl 抽取 轉化 裝載 操作來將資料轉化為鍵值對形式。spark 為包含鍵值對型別的 rdd 提供了一些專有的操作。1.建立pair rdd val input sc.par...

Spark學習之鍵值對(pair RDD)操作

1 讀取本身就是鍵值對的資料 2 乙個普通的rdd通過map 轉為pair rdd,傳遞的函式需要返回鍵值對。python中使用第乙個單詞作為鍵建立出乙個pair rddpairs lines.amp lambda x x.split 0 x scala中使用第乙個單詞作為鍵建立出乙個pair rd...

Spark學習筆記 (3 鍵值對操作 )

spark學習筆記3 鍵值對操作 鍵值對rdd通常用來進行聚合計算,spark為包含鍵值對型別的rdd提供了一些專有的操作spark中建立pair rdd的方法 1 儲存鍵值對的資料格式會在讀取時直接返回由其鍵值對資料組成的pair rdd 2 還可以使用map 函式將乙個普通的rdd轉為pair ...