RDD行動運算元

2021-09-27 21:33:24 字數 3623 閱讀 6364

作用:通過func函式聚集rdd中的所有元素,先聚合分區內資料,再聚合分區間資料。

需求:建立乙個rdd,將所有元素聚合得到結果。

(1)建立乙個rdd[int]

scala> val rdd1 = sc.makerdd(1 to 10,2)

rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[85] at makerdd at :24

(2)聚合rdd[int]所有元素

scala> rdd1.reduce(_+_)

res50: int = 55

(3)建立乙個rdd[string]

scala> val rdd2 = sc.makerdd(array(("a",1),("a",3),("c",3),("d",5)))

rdd2: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[86] at makerdd at :24

(4)聚合rdd[string]所有資料

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))

res51: (string, int) = (adca,12)

作用:在驅動程式中,以陣列的形式返回資料集的所有元素。

需求:建立乙個rdd,並將rdd內容收集到driver端列印

(1)建立乙個rdd

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[0] at parallelize at :24

(2)將結果收集到driver端

scala> rdd.collect

res0: array[int] = array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

作用:返回rdd中元素的個數

需求:建立乙個rdd,統計該rdd的條數

(1)建立乙個rdd

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[0] at parallelize at :24

(2)統計該rdd的條數

scala> rdd.count

res1: long = 10

作用:返回rdd中的第乙個元素

需求:建立乙個rdd,返回該rdd中的第乙個元素

(1)建立乙個rdd

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[0] at parallelize at :24

(2)統計該rdd的條數

scala> rdd.first

res2: int = 1

作用:返回乙個由rdd的前n個元素組成的陣列

需求:建立乙個rdd,統計該rdd的條數

(1)建立乙個rdd

scala> val rdd = sc.parallelize(array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[2] at parallelize at :24

(2)統計該rdd的條數

scala> rdd.take(3)

res10: array[int] = array(2, 5, 4)

作用:返回該rdd排序後的前n個元素組成的陣列

需求:建立乙個rdd,統計該rdd的條數

(1)建立乙個rdd

scala> val rdd = sc.parallelize(array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[2] at parallelize at :24

(2)統計該rdd的條數

scala> rdd.takeordered(3)

res18: array[int] = array(2, 3, 4)

引數:(zerovalue: u)(seqop: (u, t) ⇒ u, combop: (u, u) ⇒ u)

作用:aggregate函式將每個分割槽裡面的元素通過seqop和初始值進行聚合,然後用combine函式將每個分割槽的結果和初始值(zerovalue)進行combine操作。這個函式最終返回的型別不需要和rdd中元素型別一致。

需求:建立乙個rdd,將所有元素相加得到結果

(1)建立乙個rdd

scala> var rdd1 = sc.makerdd(1 to 10,2)

rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[88] at makerdd at :24

(2)將該rdd所有元素相加得到結果

scala> rdd.aggregate(0)(_+_,_+_)

res22: int = 55

注:rdd1.aggregate(初始值)() 其中初始值分區內和每個元素加一次,分區間又得加一次

​ 而aggregatebykey只是分區內加初始值,而分區間不會加初始值。本次案例若使用aggregate答案會是75

scala> var rdd1 = sc.makerdd(1 to 10,2)

rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[5] at makerdd at :24

scala> rdd1.aggregate(10)(_+_,_+_)

res6: int = 85

作用:摺疊操作,aggregate的簡化操作,seqop和combop一樣。

需求:建立乙個rdd,將所有元素相加得到結果

(1)建立乙個rdd

scala> var rdd1 = sc.makerdd(1 to 10,2)

rdd1: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[88] at makerdd at :24

(2)將該rdd所有元素相加得到結果

scala> rdd.fold(0)(_+_)

res24: int = 55

作用:將資料集的元素以textfile的形式儲存到hdfs檔案系統或者其他支援的檔案系統,對於每個元素,spark將會呼叫tostring方法,將它裝換為檔案中的文字

作用:將資料集中的元素以hadoop sequencefile的格式儲存到指定的目錄下,可以使hdfs或者其他hadoop支援的檔案系統。

作用:用於將rdd中的元素序列化成物件,儲存到檔案中。

RDD運算元怎麼區分轉換運算元和行動運算元

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...

RDD轉換運算元和行動運算元的區別

textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...

RDD行動操作

行動操作是第二種型別的rdd操作,它們會把最終求得的結果返回到驅動器程式中,或者寫入外部儲存系統中。常見的rdd行動操作 1.reduce 它接收乙個函式作為引數,這個函式要操作兩個相同的元素型別的rdd資料並返回乙個同樣型別的新元素。乙個簡單的例子就是函式 可以用它來對我們的rdd進行累加。使用r...