sc.textfile(「裡面放路徑」) 這兩個都是建立乙個rdd
sc.parallelize(裡面放集合)
val arr =
arry(1
,2,3
,4,5
,6,7
,8,9
)val rdd = sc.
parallelize
(arr)
rdd.partitions.length 這是檢視有幾個分割槽
用parallelize建立分割槽的時候不指定分割槽,分割槽的數量跟total-executor-cores指定幾個核有關
分割槽數量決定了任務的並行度 並行度越高task越多
rdd.collect 提交結果 就會觸發一次action 觸發一次action就會有乙個job
也可以在建立rdd的時候指定分割槽
val rdd = sc.
parallelize
(arr,
100) 這邊在第二個引數指定了有100個分割槽
也可以用makerdd的方式建立rdd,當然也可以指定分割槽
val rdd = sc.
parallelice
(arr)
怎麼進行分割槽的嫩
先totalsize(所有檔案大小都加在一起)goalsize(指定的話就是1)不指定就是傳進來的預設值2,所以如果不是指定的goalsize就是totalsize除以2,如果這個檔案除以goalsize大於1.1就切
之前的對資料的map,split操作路徑都可以不存在因為並沒有真正的對資料進行操作,reducebykey要路徑存在因為要聚合從別的機器上抓取資料,sortby會生成乙個job,因為他要排序就要這道資料的資訊,對資料進行取樣構建乙個新的特殊的分割槽器
記住:對rdd進行操作就是對rdd裡的每乙個分割槽進行操作,對rdd進行map本質上就是對rdd裡每乙個分割槽對應的迭代器進行map
transformation: 即轉換運算元,呼叫轉換運算元會生成乙個新的rdd, transformation是 lazy 的,不會觸發job執行
action: 行動運算元,呼叫行動運算元會觸發job執行, 本質上是呼叫了sc.runjob方法, 該方法從最後乙個rdd,根據
其依賴關係,從後往前,劃分stage,生成乙個taskset
1.建立rdd的方法
(1)通過並行化方式,將dirver端的集合轉成rdd ,可以指定分割槽的數量
val rdd: rdd[int]
= sc.
parallelize
(arr)
rdd.partitions.length //檢視分割槽數量
(2)從hdfs指定目錄建立rdd ,也可以指定分割槽的數量
val lines: rdd[string]
= sc.
textfile
("hdfs://linux01:8020/log"
)rdd.partitions.length //檢視分割槽數量
(2.1)從hdfs中讀取資料分割槽數量 : 預設情況下 目錄檔案下所有檔案的 totalsize(檔案總大小)/numsplits (切片數量) 得到 goalsize()
使用 檔案大小/goalsize 結果大於1.1 就分為多個切片.
如果想要幾個block塊(檔案)就有幾個分割槽, 在建立rdd時指定計算goalsize的除數為1就可以了
val rdd1 = sc.textfile(「hdfs://linux01:8020/wc」,1)
srage
任務執行階段
乙個stage對應乙個taskset
乙個taskset中的task的數量取決於stage中最後乙個rdd分割槽的數量
dependency
依賴關係,指的是父rdd和子rdd之間的依賴關係
窄依賴:沒有shfuffle產生,多個運算元會被合併到乙個task中,即在乙個pipeline中
寬依賴:有shuffle產生,是劃分stage的依據
job
觸發一次action形成乙個完整的dag,乙個dag對應乙個job
乙個job中有一到多個stage,乙個stage對應乙個taskset,乙個taskset中有一到多個task
使用sparksubmit提交的任務
package cn.doit.spark.day02
/* hashpartitioner(分割槽器)原始碼分析
*/object hashpartitionerdemo
def main
(args: array[string]
): unit =
}
rdd運算元之transformation 轉換運算元, lazy的,呼叫後生成乙個新的rdd
不產生shuffle的
map filter
map filter
package org.apache.spark.day02
import org.apache.spark.
/* map filter
對rdd進行操作,其實是對rdd中每個分割槽對應的迭代器進行操作,迭代器會對每乙個分區內的資料進行操作
*/ def main
(args: array[string]
): unit =
}
reducebykey groupbykey groupby
reducebykey
呼叫reducebykey底層呼叫的是shuffledrdd 特殊情況:呼叫reducebykey不一定進行shuffle,如果前面的資料已經分好區了,
那就不需要在進行分割槽了(使用同樣的分割槽器(列如都是hashpartitioner),並且分割槽數量相同)
package cn.doit.spark.day02.demo02
import org.apache.spark.rdd.rdd
import org.apache.spark.
/* reducebykey 有shuffle
*/object reducebykeydemo
}groupbykey
(分組) 底層也是先區域性再全域性 效率高於groupby 推薦使用
package cn.doit.spark.day02.demo03
import org.apache.spark.rdd.
import org.apache.spark.
import scala.collection.mutable.arraybuffer
object groupbykeydemo
}
package cn.doit.spark.day02.demo03
import org.apache.spark.rdd.rdd
import org.apache.spark.
object groupby )
//// val rdd3: rdd[(string, iterable[(string, string, double)])] = rdd2.groupby(t => t._1)
// val tuples = rdd3.collect()
// println(tuples.tobuffer)
//groupbykey效率高於groupby, 推薦使用
val rdd2: rdd[
(string,
(string, double))]
= rdd1.
map(e =
>
) val rdd3: rdd[
(string, iterable[
(string, double)])
]= rdd2.
groupbykey()
val result = rdd3.
collect()
println
(result.tobuffer)
}}
package cn.doit.spark.day02.demo04
import org.apache.spark.rdd.rdd
import org.apache.spark.
object distinctdemo
}
RDD原理詳解
transformations型別的操作 action型別的操作 rdd中提供的cache 方法只是簡單的把該rdd放到cache列表中。當rdd的iterator被呼叫時,通過cachemanager把rdd計算出來,並儲存到blockmanager中,下次獲取該rdd的資料時便可直接通過cach...
RDDAPI詳解rdd資料模型及rdd的sql實現
5個元素 1.rdd返回的partition物件集合 2.資料本地性 driver master 資料本地性 3.返回依賴關係,只需要關注parents 簡化了模型計算 4.迭代器,不同框架和計算時讀取父rdd都是一樣的,基於同一種型別的迴圈非常高效 5.partitioner rdd不變性儲存,也...
spark底層核心 RDD詳解
spark底層核心rdd 是什麼?彈性分布式資料集 簡單點就理解成乙個list集合 rdd 1,2,3 有什麼屬性?用idea中注釋的話來解釋有5大屬性 1 乙個分割槽列表,資料集的基本組成單位 rdd以分割槽為單位,乙個分割槽乙個task任務來處理執行,可以在建立rdd時指定rdd的分割槽個數,如...