sparks運算元總結:
lvalue型別
1) map
def map[u: classtag](f: t => u): rdd[u] //單值處理邏輯
將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換。
val datardd: rdd[int] = sparkcontext.makerdd(list(1,2,3,4))
val datardd1: rdd[int] = datardd.map(
num =>
)f: iterator[t] => iterator[u], //迭代方式
preservespartitioning: boolean = false): rdd[u] //是否保留父分割槽
將待處理的資料以分割槽為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料
datas =>
)f: (int, iterator[t]) => iterator[u], //(分割槽號,迭代方式)
preservespartitioning: boolean = false): rdd[u]
將待處理的資料以分割槽為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料,在處理時同時可以獲取當前分割槽索引。
(index, datas) =>
)4) flatmap
def flatmap[u: classtag](f: t => tr**ersableonce[u]): rdd[u]
將處理的資料先進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映
val datardd = sparkcontext.makerdd(list(
list(1,2),list(3,4)
),1)
val datardd1 = datardd.flatmap(
list => list
)5) glom
def glom(): rdd[array[t]]
將同乙個分割槽的資料直接轉換為相同型別的記憶體陣列進行處理,分割槽不變
val datardd = sparkcontext.makerdd(list(
1,2,3,4
),1)
val datardd1:rdd[array[int]] = datardd.glom()
6) groupby
def groupby[k](f: t => k)(implicit kt: classtag[k]): rdd[(k, iterable[t])]
//第乙個引數為分割槽規則,第二個引數預設的隱式轉換
將資料根據指定的規則進行分組, 分割槽預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為shuffle。極限情況下,資料可能被分在同乙個分割槽中
乙個組的資料在乙個分割槽中,但是並不是說乙個分割槽中只有乙個組
val datardd = sparkcontext.makerdd(list(1,2,3,4),1)
val datardd1 = datardd.groupby(
_%2)
7) filter
def filter(f: t => boolean): rdd[t]
將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。
當資料進行篩選過濾後,分割槽不變,但是分區內的資料可能不均衡,生產環境下,可能會出現資料傾斜。
val datardd = sparkcontext.makerdd(list(
1,2,3,4
),1)
val datardd1 = datardd.filter(_%2 == 0)
8) sample
def sample(
withreplacement: boolean, //是否放回
fraction: double, //不放回情況為(0~1代表每個元素抽取機率)放回(每個元素期望的抽取次數)
seed: long = utils.random.nextlong): rdd[t] //隨機數種子,種子確定後每次抽取的元素固定
根據指定的規則從資料集中抽取資料
val datardd = sparkcontext.makerdd(list(
1,2,3,4
),1)
// 抽取資料不放回(伯努利演算法)
// 伯努利演算法:又叫0、1分布。例如扔硬幣,要麼正面,要麼反面。
// 具體實現:根據種子和隨機演算法算出乙個數和第二個引數設定機率比較,小於第二個引數要,大於不要
// 第乙個引數:抽取的資料是否放回,false:不放回
// 第二個引數:抽取的機率,範圍在[0,1]之間,0:全不取;1:全取;
// 第三個引數:隨機數種子
val datardd1 = datardd.sample(false, 0.5)
// 抽取資料放回(泊松演算法)
// 第乙個引數:抽取的資料是否放回,true:放回;false:不放回
// 第二個引數:重複資料的機率,範圍大於等於0.表示每乙個元素被期望抽取到的次數
// 第三個引數:隨機數種子
val datardd2 = datardd.sample(true, 2)
9) distinct
def distinct()(implicit ord: ordering[t] = null): rdd[t]
def distinct(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] //去重後重新分割槽數
將資料集中重複的資料去重
val datardd = sparkcontext.makerdd(list(
1,2,3,4,1,2
),1)
val datardd1 = datardd.distinct()
val datardd2 = datardd.distinct(2)
10) coalesce
def coalesce(numpartitions: int, shuffle: boolean = false,
//第乙個引數為分割槽數,第二個引數是否進行shuffle操作 縮減分割槽使用預設值,擴大分割槽設定為true
partitioncoalescer: option[partitioncoalescer] = option.empty)
(implicit ord: ordering[t] = null)
: rdd[t]
根據資料量縮減分割槽,用於大資料集過濾後,提高小資料集的執行效率
當spark程式中,存在過多的小任務的時候,可以通過coalesce方法,收縮合併分割槽,減少分割槽的個數,減小任務排程成本
val datardd = sparkcontext.makerdd(list(
1,2,3,4,1,2
),6)
val datardd1 = datardd.coalesce(2)
思考乙個問題:我想要擴大分割槽,怎麼辦?
coalesce方法預設情況下無法擴大分割槽,因為預設不會將資料打亂重新組合。擴大分割槽是沒有意義。如果想要擴大分割槽,那麼必須使用shuffle,打亂資料,重新組合。
11) repartition
def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t]
該操作內部其實執行的是coalesce操作,引數shuffle的預設值為true。無論是將分割槽數多的rdd轉換為分割槽數少的rdd,還是將分割槽數少的rdd轉換為分割槽數多的rdd,repartition操作都可以完成,因為無論如何都會經shuffle過程。
val datardd = sparkcontext.makerdd(list(
1,2,3,4,1,2
),2)
val datardd1 = datardd.repartition(4)
思考乙個問題:coalesce和repartition區別?
repartition方法其實就是coalesce方法,只不過肯定使用了shuffle操作。讓資料更均衡一些,可以有效防止資料傾斜問題。
如果縮減分割槽,一般就採用coalesce, 如果擴大分割槽,就採用repartition
12) sortby
def sortby[k](
f: (t) => k, //函式處理函式
ascending: boolean = true, //true為預設值公升序 false 降序
numpartitions: int = this.partitions.length) //分割槽數
(implicit ord: ordering[k], ctag: classtag[k]): rdd[t]
該操作用於排序資料。在排序之前,可以將資料通過f函式進行處理,之後按照f函式處理的結果進行排序,預設為正序排列。排序後新產生的rdd的分割槽數與原rdd的分割槽數一致。
val datardd = sparkcontext.makerdd(list(
1,2,3,4,1,2
),2)
val datardd1 = datardd.sortby(num=>num, false, 4)
RDD的轉換運算元(Value型別)
value型別 map 每次處理一條資料。作用 將每乙個分割槽形成乙個陣列,形成新的rdd型別時rdd array t 需求 建立乙個4個分割槽的rdd,並將每個分割槽的資料放到乙個陣列 作用 分組,按照傳入函式的返回值進行分組。將相同的key對應的值放入乙個迭代器。需求 建立乙個rdd,按照元素模...
RDD運算元怎麼區分轉換運算元和行動運算元
textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...
RDD轉換運算元和行動運算元的區別
textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...