map:對rdd中每個元素都執行乙個指定函式從而形成乙個新的rdd
from pyspark import sparkconf, sparkcontext
sc = sparkcontext(conf = conf)
def func(x):
return x*2
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
maprdd2 = rdd.map(func)
print(maprdd2.collect())# [2, 4, 6, 8, 10]
map依賴圖關係如下,紅框代表整個資料集,黑框代表乙個rdd分割槽,裡面是每個分割槽的資料集
flatmap:與map類似,但是每乙個輸入元素會被對映成0個或多個元素,最後達到扁平化效果
data = [[1,2],[3],[4],[5]]
rdd = sc.parallelize(data)
print(rdd.collect()) # [[1, 2], [3], [4], [5]]
flatmaprdd = rdd.flatmap(lambda x: x)
print(flatmaprdd.collect())# [1, 2, 3, 4, 5]
flatmap依賴關係圖如下
map和flatmap對比
rdd = sc.parallelize([("a",1),("b",2),("c",3)])
flatmaprdd = rdd.flatmap(lambda x:x)
print(flatmaprdd.collect()) # ['a', 1, 'b', 2, 'c', 3]
maprdd = rdd.map(lambda x:x)
print(maprdd.collect()) # [('a', 1), ('b', 2), ('c', 3)]
rdd = sc.parallelize([1, 2, 3, 4],2)
def f(iterator):
yield sum(iterator)
from pyspark import sparkcontext
rdd1 = sc.parallelize([("a",1), ("b",1), ("c",2), ("d",3),("e",6),("f",7),("g",8)])
rdd2 = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
rdd4 = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
rdd3 = rdd2.union(rdd1).union(rdd4)
print(rdd3.collect())
rdd5 = rdd2.union(rdd1).union(rdd4).repartition(3)
print(rdd5.getnumpartitions())
def myfunc(x):
res =
for item in x:
return res
print(rdd6.collect())
1.filter:按照條件進行過濾
rdd = sc.parallelize([("a",2), ("b",3), ("c",4), ("d",5)])
# 條件為true的元素留下,捨棄為false的元素
rdd = rdd.filter(lambda x: x[1]>=5)
對於已經排序好的rdd,配合zipwithindex(),可以使用 filter()來獲取前n個資料組成的rdd,而不是take()或者top()這些行動運算元。
1.sortby:排序
def sortby(self, keyfunc, ascending=true, numpartitions=none):
"""sorts this rdd by the given keyfunc
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortby(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortby(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
"""return self.keyby(keyfunc).sortbykey(ascending, numpartitions).values()
原始碼中,預設是正序排列,一般業務需要倒敘排列,引數為false,即大的數在前面。
排序條件可以設定多個,當第乙個相同時,以此按照後續條件進行排序
from pyspark import sparkcontext
rdd = sc.parallelize([(1,"a",1),(1,"b",2),(3,"d",4),(3,"c",3)])
rdd = rdd.sortby(lambda x:(x[0],x[1],x[2]),false)
print(rdd.collect())
2.sortbykey
針對 key-value資料,根據key進行排序
list = ["14", "134", "1244"]
rdd = sc.parallelize(list)
pairrdd = rdd.map(lambda word: (word, len(word)))
# x是key,x[1] 是key對應字串中索引為1的字母
aa = pairrdd.sortbykey(keyfunc=lambda x: x[1])
aa.foreach(print)
# ('1244', 4)
# ('134', 3)
# ('14', 2)
#注意上下的結果區別
list = ["14", "134", "1244"]
rdd = sc.parallelize(list)
pairrdd = rdd.map(lambda word: (word, len(word)))
aa = pairrdd.sortby(keyfunc=lambda x: x[1])
aa.foreach(print)
# ('14', 2)
# ('134', 3)
# ('1244', 4)
1、zip
兩個rdd具有相同個數的分割槽,並且每個分區內的個數相等
例子:
x=sc.parallelize(range(5),2)
y=sc.parallelize(range(1000,1005),2)
a=x.zip(y).glom().collect()
print(a)
a=x.zip(y).collect()
print(a)
2、zipwithindex()
給rdd的每個元素加上索引。排序後的rdd加上元素對應的順序序號
# zipwithindex()的結果為[((1,2),0),((1,3),1)]
sortby(lambda x: (x[0], x[1]), false).zipwithindex().map(lambda x: (x[0][0],x[1]+1))
3、zipwithuniqueid
返回k-v,與分割槽有關係
k, n+k, 2n+k,
n為分割槽總數,下例 n=2
k其屬於第幾個分割槽,從0開始計數。下例 k=0,1
對於k=0的分割槽:\(0+0*2,0+1*2,0+2*2\)
rdd=sc.parallelize(list('123456'),2)
print(rdd.glom().collect()) # [['1', '2', '3'], ['4', '5', '6']]
a=rdd.zipwithuniqueid().glom().collect()
print(a) # [[('1', 0), ('2', 2), ('3', 4)], [('4', 1), ('5', 3), ('6', 5)]]
pyspark特徵工程常用方法(一)
本文記錄特徵工程中常用的五種方法 minmaxscaler,normalization,onehotencoding,pca以及quantilediscretizer 用於分箱 原有資料集如下圖 首先將c2列轉換為vector的形式 vecassembler vectorassembler inpu...
PySpark入門三 常用的函式 上
在jupyter notebook中如何使用pyspark?開啟anaconda prompt 並使用pip 安裝好pyspark第三方庫。pip install pyspark 導包 from pyspark import sparkcontext 建立會話 sc sparkcontext.get...
pyspark中讀取檔案後的RDD操作
本文記錄下在python環境中對rdd的一些操作方法 1.建立rdd 關於讀取檔案建立rdd的方法在前面文章已經介紹過來,這裡就不做介紹了。還有一種自定義的 data rdd sc.parallelize alina 26 tom 22 sky 12 blue 21 2.lambda 表示式 在rd...