基礎轉換操作
鍵值轉換操作
對rdd中的每個元素都應用乙個指定的函式,以此產生乙個新的rdd
scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")
scala> rdd.map(line => line.split(" ")).collect
res16: array[array[string]] = array(array(hello, world), array(hello, tom), array(hello, jerry))
去除rdd中重複的元素,返回所有元素不重複的rdd
scala> var rdd = sc.parallelize(list(1,2,2,3,3,3,4,5))
rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[15] at parallelize at :24
scala> rdd.distinct.collect
res18: array[int] = array(4, 1, 5, 2, 3)
scala> var rdd = sc.parallelize(list(1,2,2,3,3,3,4,5))
rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[15] at parallelize at :24
scala> var rdddistinct = rdd.distinct
scala> rdddistinct.partitions.size
res21: int = 4
scala> var rdddistinct = rdd.distinct(3)
scala> rdddistinct.partitions.size
res22: int = 3
scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")
scala> rdd.flatmap(line => line.split(" ")).collect
res23: array[string] = array(hello, world, hello, tom, hello, jerry)
兩者都是對rdd進行重新分割槽。coalesce使用hashpartitioner進行分割槽,第乙個引數為重分割槽數,第二個為是否進行shuffle,預設為false。repartition是coalesce操作shuffle為true的封裝。
scala> var rdd = sc.textfile("/users/lyf/desktop/test/data1.txt")
scala> rdd.partitions.size
res24: int = 2
scala> var rdd_1 = rdd.coalesce(1)
rdd_1: org.apache.spark.rdd.rdd[string] = coalescedrdd[36] at coalesce at :25
// 如果分割槽數大於原來的分割槽數,則第二個引數必須要true,否則分割槽數不變
scala> var rdd_2 = rdd.coalesce(3)
rdd_2: org.apache.spark.rdd.rdd[string] = coalescedrdd[37] at coalesce at :25
scala> rdd_2.partitions.size
res26: int = 2
scala> var rdd_2 = rdd.coalesce(5, true)
res37: array[array[int]] = array(array(1, 2), array(3, 4, 5), array(6, 7), array(8, 9, 10))
scala> rdd_2.partitions.size
res28: int = 5
scala> var rdd_3 = rdd.repartition(5)
scala> rdd_3.partitions.size
res29: int = 5
根據weights權重將乙個rdd分割為多個rdd,組成rdd陣列,權重越高,被劃分到的概率就越大。
scala> var rdd = sc.parallelize(1 to 10, 10)
rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[47] at parallelize at :24
// 將原rdd按照weights權重生成乙個新的rdd陣列
scala> var rddsplit = rdd.randomsplit(array(1.0, 2.0, 3.0, 4.0))
scala> rddsplit.size
res30: int = 4
scala> rddsplit(0).collect
res31: array[int] = array()
scala> rddsplit(1).collect
res32: array[int] = array(3, 8)
scala> rddsplit(2).collect
res33: array[int] = array(1, 2, 9)
scala> rddsplit(3).collect
res34: array[int] = array(4, 5, 6, 7, 10)
將rdd中每乙個分割槽中所有型別為t的資料轉變為元素型別為t的陣列[array[t]]
scala> var rdd = sc.parallelize(1 to 10, 4)
rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[52] at parallelize at :24
scala> rdd.collect
res36: array[int] = array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd.glom().collect
res37: array[array[int]] = array(array(1, 2), array(3, 4, 5), array(6, 7), array(8, 9, 10))
返回兩個rdd的並集,元素不進行去重
scala> var rdd1 = sc.makerdd(1 to 3, 1)
rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[54] at makerdd at :24
scala> var rdd2 = sc.makerdd(2 to 5, 1)
rdd2: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[55] at makerdd at :24
scala> rdd1.union(rdd2).collect
res38: array[int] = array(1, 2, 3, 2, 3, 4, 5)
返回兩個rdd的交集,元素不進行不去重。引數numpartitions指定分割槽數,引數partitioner指定分割槽函式
scala> rdd1.intersection(rdd2).collect
res39: array[int] = array(3, 2)
返回兩個rdd的差集,元素不進行去重
scala> rdd1.subtract(rdd2).collect
res40: array[int] = array(1)
參考:
[1] 郭景瞻. **spark:核心技術與案例實戰[m]. 北京:電子工業出版社, 2017.
Spark操作 行動操作 一
scala var rdd sc.makerdd array a 1 a 2 a 3 b 4 b 5 c 6 c 7 c 8 c 9 d 10 rdd org.apache.spark.rdd.rdd string,int parallelcollectionrdd 60 at makerdd at...
spark鍵值對轉換操作例子
題目 給定一組鍵值對 spark 2 hadoop 6 hadoop 4 spark 6 鍵值對的key表示圖書名稱,value表示某天圖書銷量,請計算每個鍵對應的平均值,也就是計算每種圖書的每天平均銷量。很顯然,對於上面的題目,結果是很顯然的,spark 4 hadoop 5 package cn...
Spark操作 控制操作
cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...