RDD詳解課堂筆記

2021-10-12 17:18:59 字數 4293 閱讀 4359

sc.textfile(「裡面放路徑」) 這兩個都是建立乙個rdd

sc.parallelize(裡面放集合)

val arr =

arry(1

,2,3

,4,5

,6,7

,8,9

)val rdd = sc.

parallelize

(arr)

rdd.partitions.length 這是檢視有幾個分割槽

用parallelize建立分割槽的時候不指定分割槽,分割槽的數量跟total-executor-cores指定幾個核有關

分割槽數量決定了任務的並行度 並行度越高task越多

rdd.collect 提交結果 就會觸發一次action 觸發一次action就會有乙個job

也可以在建立rdd的時候指定分割槽

val rdd = sc.

parallelize

(arr,

100) 這邊在第二個引數指定了有100個分割槽

也可以用makerdd的方式建立rdd,當然也可以指定分割槽

val rdd = sc.

parallelice

(arr)

怎麼進行分割槽的嫩

先totalsize(所有檔案大小都加在一起)goalsize(指定的話就是1)不指定就是傳進來的預設值2,所以如果不是指定的goalsize就是totalsize除以2,如果這個檔案除以goalsize大於1.1就切

之前的對資料的map,split操作路徑都可以不存在因為並沒有真正的對資料進行操作,reducebykey要路徑存在因為要聚合從別的機器上抓取資料,sortby會生成乙個job,因為他要排序就要這道資料的資訊,對資料進行取樣構建乙個新的特殊的分割槽器

記住:對rdd進行操作就是對rdd裡的每乙個分割槽進行操作,對rdd進行map本質上就是對rdd裡每乙個分割槽對應的迭代器進行map

transformation: 即轉換運算元,呼叫轉換運算元會生成乙個新的rdd, transformation是 lazy 的,不會觸發job執行

action: 行動運算元,呼叫行動運算元會觸發job執行, 本質上是呼叫了sc.runjob方法, 該方法從最後乙個rdd,根據

其依賴關係,從後往前,劃分stage,生成乙個taskset

1.建立rdd的方法

(1)通過並行化方式,將dirver端的集合轉成rdd ,可以指定分割槽的數量

val rdd: rdd[int]

= sc.

parallelize

(arr)

rdd.partitions.length //檢視分割槽數量

(2)從hdfs指定目錄建立rdd ,也可以指定分割槽的數量

val lines: rdd[string]

= sc.

textfile

("hdfs://linux01:8020/log"

)rdd.partitions.length //檢視分割槽數量

(2.1)從hdfs中讀取資料分割槽數量 : 預設情況下 目錄檔案下所有檔案的 totalsize(檔案總大小)/numsplits (切片數量) 得到 goalsize()

使用 檔案大小/goalsize 結果大於1.1 就分為多個切片.

如果想要幾個block塊(檔案)就有幾個分割槽, 在建立rdd時指定計算goalsize的除數為1就可以了

val rdd1 = sc.textfile(「hdfs://linux01:8020/wc」,1)

srage

任務執行階段

乙個stage對應乙個taskset

乙個taskset中的task的數量取決於stage中最後乙個rdd分割槽的數量

dependency

依賴關係,指的是父rdd和子rdd之間的依賴關係

窄依賴:沒有shfuffle產生,多個運算元會被合併到乙個task中,即在乙個pipeline中

寬依賴:有shuffle產生,是劃分stage的依據

job

觸發一次action形成乙個完整的dag,乙個dag對應乙個job

乙個job中有一到多個stage,乙個stage對應乙個taskset,乙個taskset中有一到多個task

使用sparksubmit提交的任務

package cn.doit.spark.day02

/* hashpartitioner(分割槽器)原始碼分析

*/object hashpartitionerdemo

def main

(args: array[string]

): unit =

}

rdd運算元之transformation 轉換運算元, lazy的,呼叫後生成乙個新的rdd

不產生shuffle的

map filter

map filter

package org.apache.spark.day02

import org.apache.spark.

/* map filter

對rdd進行操作,其實是對rdd中每個分割槽對應的迭代器進行操作,迭代器會對每乙個分區內的資料進行操作

*/ def main

(args: array[string]

): unit =

}

reducebykey groupbykey groupby

reducebykey

呼叫reducebykey底層呼叫的是shuffledrdd 特殊情況:呼叫reducebykey不一定進行shuffle,如果前面的資料已經分好區了,

那就不需要在進行分割槽了(使用同樣的分割槽器(列如都是hashpartitioner),並且分割槽數量相同)

package cn.doit.spark.day02.demo02

import org.apache.spark.rdd.rdd

import org.apache.spark.

/* reducebykey 有shuffle

*/object reducebykeydemo

}groupbykey

(分組) 底層也是先區域性再全域性 效率高於groupby 推薦使用

package cn.doit.spark.day02.demo03

import org.apache.spark.rdd.

import org.apache.spark.

import scala.collection.mutable.arraybuffer

object groupbykeydemo

}

package cn.doit.spark.day02.demo03

import org.apache.spark.rdd.rdd

import org.apache.spark.

object groupby )

//// val rdd3: rdd[(string, iterable[(string, string, double)])] = rdd2.groupby(t => t._1)

// val tuples = rdd3.collect()

// println(tuples.tobuffer)

//groupbykey效率高於groupby, 推薦使用

val rdd2: rdd[

(string,

(string, double))]

= rdd1.

map(e =

>

) val rdd3: rdd[

(string, iterable[

(string, double)])

]= rdd2.

groupbykey()

val result = rdd3.

collect()

println

(result.tobuffer)

}}

package cn.doit.spark.day02.demo04

import org.apache.spark.rdd.rdd

import org.apache.spark.

object distinctdemo

}

RDD原理詳解

transformations型別的操作 action型別的操作 rdd中提供的cache 方法只是簡單的把該rdd放到cache列表中。當rdd的iterator被呼叫時,通過cachemanager把rdd計算出來,並儲存到blockmanager中,下次獲取該rdd的資料時便可直接通過cach...

RDDAPI詳解rdd資料模型及rdd的sql實現

5個元素 1.rdd返回的partition物件集合 2.資料本地性 driver master 資料本地性 3.返回依賴關係,只需要關注parents 簡化了模型計算 4.迭代器,不同框架和計算時讀取父rdd都是一樣的,基於同一種型別的迴圈非常高效 5.partitioner rdd不變性儲存,也...

spark底層核心 RDD詳解

spark底層核心rdd 是什麼?彈性分布式資料集 簡單點就理解成乙個list集合 rdd 1,2,3 有什麼屬性?用idea中注釋的話來解釋有5大屬性 1 乙個分割槽列表,資料集的基本組成單位 rdd以分割槽為單位,乙個分割槽乙個task任務來處理執行,可以在建立rdd時指定rdd的分割槽個數,如...