該文章記錄使用的spark的基本操作
import breeze.numerics.pow
import org.apache.spark.sparkconf
import org.apache.spark.sql.sparksession
object template
val dot_udf = udf((rateing: int, rateing_v: int) => rateing * rateing_v)
// 資料庫可以儲存集合(使用udf的返回集合)
// 使用udf一般結合withcolumn使用
val dot = udata.withcolumn("dot", kismet_udf(col("rating"), col("rating")))
//*************************===細節操作******************************
// 列轉行使用explode(其中col("itemsimrating")為陣列或列表)
val useritemscore = dot.select(dot("user_id"), explode(dot("dot"))as("dot_list"))
// 倒排使用
useritemscore.orderby(col("sum_score").desc)
// map(x=(x(0),x(1))可以直接tomap
// lit給統一的值要引入(增加一行lable值為1
useritemscore.withcolumn("label", lit(1))
// 啟動執行緒數,至少是兩個。乙個執行緒用於監聽資料來源,其他執行緒用於消費或列印。至少是2個
// rdd的排序中這是false即為倒敘
udata.rdd.map(x => (x(0).tostring, x(2).tostring)).map(x => (x._1, x._2.todouble)).sortby(_._2,false)
// 取出單行資料
udata.head().getas[string]("user_id")
//******************************=join相關*************************=
// join不同欄位時
val udata_v = udata.selectexpr("user_id as user_v")
udata.join(udata_v,udata("user_id")===udata_v("user_v"))
//多欄位join
val traindata = udata_v.join(udata, seq("user_id", "item_id"), "outer").na.fill(0)
//增加自增序列col("id")
val urdd = udata.rdd.zipwithindex()
val rowrdd = dfrdd.map(tp => row.merge(tp._1,row(tp._2)))
spark.createdataframe(rowrdd,udata.schema.add(structfield("id",longtype)))
}}
Spark操作 控制操作
cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...
Spark 常用運算元
官網rdd操作指南 2 key value資料型別的transfromation運算元 三 連線 3 action運算元 val list list 1 2,3 sc.parallelize list map 10 foreach println 輸出結果 10 20 30 這裡為了節省篇幅去掉了換...
Spark入門 常用Spark監控Tab
最近用spark做任務,中間來回配置集群環境,檢視配置後的效果,以及監測程式執行過程中的執行進度等,需要頻繁檢視webui的幾個tab。各個tab功能不一,從不同方面顯示了spark的各方面效能引數和執行進度。特意記錄一下,方便以後用得到的時候能夠快速回顧知識點。第乙個tab是在配置好hadoop之...