Spark機器學習

2021-08-21 00:13:42 字數 3015 閱讀 8539

spark機器學習

注:spark簡介

spark是乙個分布式計算框架,旨在簡化執行於計算集群上的並行程式的編寫。該框架對資源排程,任務的提交/執行和跟蹤,節點間的通訊以及資料並行處理的內在底層操作都進行了抽象。它提供了乙個更高階別的api用於處理分布式資料。

spark支援的四種執行模式:

本地單機模式:所有spark程序都執行在同乙個jvm中。

集群單機模式:使用spark自己內建的任務排程框架。

基於mesos:mesos是乙個流行的開源集群計算框架。

基於yarn:與hadoop關聯的集群計算和資源排程框架。

spark python安裝

2、 在cmd模式下執行pyspark命令,驗證是否安裝成功

3、 將spark安裝包下的pyhon/pyspark包拷貝到python安裝包下的lib/site-packages下

spark程式設計模式

sparkcontext類與sparkconf類

sparkconf類:包含了spark集群配置的各種引數(比如主節點的url)

sparkcontext類:可用sparkcontext物件所包含的各種方法來建和操作分布式資料集和共享變數。

例如scala**:

val sc = new sparkcontext(conf)

spark shell

spark支援用scala或python repl(read-eval-print-loop,即互動式shell)來進行互動式的程式編寫。由於輸入的**會被立即計算,shell能在輸入**時給出實時反饋。

通過scala來使用spark shell,只需從spark的主目錄執行./bin/spark-shell。命令執行結果的值與型別在**執行完後也會顯示出來。

在python shell中使用spark,直接執行./bin/pyspark命令即可,如果配置了pyspark的環境變數,則直接執行pyspark命令即可。

啟動scala shell/python shell並初始化乙個sparkcontext物件。我們可以通過sc這個scala值來呼叫這個物件。

彈性分布式資料集rdd

rdd(resilient distributed dataset,彈性分布式資料集)是spark的核心概念之一。乙個rdd代表一系列的「記錄」(嚴格來說,某種型別的物件)。這些記錄被分配或分割槽到乙個集群的多個節點上(在本地模式下,可以類似地理解為單個程序裡的多個執行緒上)。spark中的rdd具備容錯性,即當某個節點或任務失敗時(因非使用者**錯誤的原因而引起,如硬體故障、網路不通等),rdd會在餘下的節點上自動重建,以便任務能最終完成。

rdd也可以基於hadoop的輸入源建立,比如本地檔案系統、hdfs和amazon s3。基於hadoop的rdd可以使用任何實現了hadoop inputformat介面的輸入格式,包括文字檔案、其他hadoop標準格式、hbase

、cassandra等。

建立rdd:

集合建立

collection = list(["a", "b", "c", "d", "e"])

rddfromcollection = sc.parallelize(collection)

用乙個本地檔案系統裡的檔案建立rdd:

rddfromtextfile = sc.textfile("license")

spark操作:

建立rdd後,便有了乙個可供操作的分布式記錄集。在spark程式設計模式下,所有的操作分為兩種:

1、 轉換操作(最常用的轉換操作便是map操作):對乙個資料集裡的所有記錄執行某種函式,從而使記錄發生改變。

2、 執行:執行某些計算或聚合操作,並將結果返回執行sparkcontext的那個驅動程式。

map操作:對乙個rdd裡的每一條記錄都執行某個函式,從而將輸入對映成為新的輸出。例如:將每乙個字串都轉換為乙個整數,從而返回乙個由若干int構成的rdd物件。

intsfromstringsrdd = rddfromtextfile.map(lambda line: line.size)

rdd快取策略

呼叫rdd的cache函式把資料快取在集群的記憶體裡。例如:rddfromtextfile.cache

呼叫乙個rdd的cache函式將會告訴spark將這個rdd快取在記憶體中。在rdd首次呼叫乙個執行操作時,這個操作對應的計算會立即執行,資料會從資料來源裡讀出並儲存到記憶體。因此,首次呼叫cache函式所需要的時間會部分取決於spark從輸入源讀取資料所需要的時間。但是,當下一次訪問該資料集的時候,資料可以直接從記憶體中讀出從而減少低效的i/o操作,加快計算。多數情況下,這會取得數倍的速度提公升。

spark的另乙個核心功能是能建立兩種特殊型別的變數:廣播變數和累加器。

1、 廣播變數(broadcast variable)為唯讀變數,它由執行sparkcontext的驅動程式建立後傳送給會參與計算的節點。對那些需要讓各工作節點高效地訪問相同資料的應用場景,比如機器學習,這非常有用。例如:broadcastalist = sc.broadcast(list(["a", "b", "c", "d", "e"]))//建立廣播變數

廣播變數也可以被非驅動程式所在的節點(即工作節點)訪問,訪問的方法是呼叫該變數的value方法:

sc.parallelize(list(["1", "2", "3"])).map(lambda x: broadcastalist.value).collect()

2、 累加器(accumulator)也是一種被廣播到工作節點的變數。累加器與廣播變數的關鍵不同,是後者只能讀取而前者卻可累加。但支援的累加操作有一定的限制。具體來說,這種累加必須是一種有關聯的操作,即它得能保證在全域性範圍內累加起來的值能被正確地平行計算以及返回驅動程式。每乙個工作節點只能訪問和操作其自己本地的累加器,全域性累加器則只允許驅動程式訪問。累加器同樣可以在spark**中通過value訪問。

Spark機器學習過程梳理

最近半個月開始研究spark的機器學習演算法,由於工作原因,其實現在還沒有真正開始機器學習演算法的研究,只是做了前期大量的準備,現在把早年學習的,正在學習的和將要學習的一起做個梳理,整理乙個spark機器學習完整流程。本文推薦的書籍注重通俗和實戰。linux的學習推薦 鳥哥的linux私房菜 基礎篇...

spark機器學習實現之fpgrowth

很久之前就像寫一些關於資料探勘演算法的東西,因為懶現在才開始動手,因為fpgrowth演算法在mlib中的實現 相對比較簡單,所以打算先拿它下手。關於fpgrowth的原理本人說的也不專業,推薦 這裡主要寫一下在mlib當中,實現的乙個過程 先上 logger.getlogger org setle...

spark機器學習 七 推薦模型

協同過濾簡單來說就是利用某個興趣相投 擁有共同經驗之群體的喜好來為使使用者推薦其感興趣的資訊,個人通過合作的機制給予諮詢相當程度的回應並記錄下來達到過濾的目的,進而幫助別人篩選。import org.apache.spark.ml.pipeline import org.apache.spark.m...