最近使用spark簡單的處理一些實際中的場景,感覺簡單實用,就記錄下來了。
部門使用者業績表(1000w測試資料)
使用者、部門、業績
資料載入:
val context = new sparkcontext(conf)
var data = context.textfile("data.txt")
場景1:求每個部門的總業績,並從大到小排序
data.map(_.split(",")).map(p => (p(1), p(2).toint)).reducebykey(_ + _).sortby(_._2, false).foreach(println)
//取出部門和業績字段,對其進行按部門彙總,並進行排序即可。
場景2:求每個部門中業績最高的人
data.map(_.split(",")).map(p => (p(0), p(1), p(2).toint)).groupby(_._2)
.map(p => (p._2.map(p => (p._1,p._3)).toarray.maxby(_._2),p._1))
.sortby(p => p._1._2, false).map .foreach(println)
//step1:先對部門進行分組
//step2:找出部門中最高業績和人
//step3:排序,並去除格式
載入使用者訂單表使用者、月份、訂單
var data1 = context.textfile("/userdt.txt")
場景3:求每季度的訂單金額
data1.map(_.split(",")).map(p => else
if (p(1).toint > 3 && p(1).toint <= 6) else
if (p(1).toint > 6 && p(1).toint <= 9) else
}).reducebykey(_ + _).foreach(println)
//這裡用if else 對每季度進行了分組,應該可以用match匹配的,個人對match不熟。
//分完組,按組進行彙總即可
場景4:求使用者每季度訂單金額
data1.map(_.split(",")).map(p => else
if (p(1).toint > 3 && p(1).toint <= 6) else
if (p(1).toint > 6 && p(1).toint <= 9) else
}).groupby(p => (p._1, p._2)).map(p => (p._1, p._2.map(p => p._3).reduce(_ + _)))
.map(p => (p._1._1, p._1._2, p._2)).sortby(p => (p._1, p._2)).foreach(println)
//這裡只有月份,有年也是一樣,先用filter過濾出那一年,再按季度進行彙總
//根據使用者和季度進行分組,再使用者和季度的組對金額進行彙總
//格式化資料,並按使用者和季度排序
場景5:緯度關聯,求全國各省份2023年各季度銷售業績資料表城市資訊表(城市id,城市,省份)
訂單資訊表(使用者,年份,月份,城市id,業績)
//把城市id和省份轉化成map形式,方便獲取
var dd = context.textfile("dingdan.txt").mapelse
if(quarter>3 && quarter<=6)else
if(quarter>6 && quarter<=9)else
( sp(1),quarter, sp(3), sp(4))
}.filter(_._1.toint.equals(2015))
//對月份按季度進行分組,並過濾出2015的訂單
var data2 = dd.map(p => (p._1, p._2, cy.getorelse(p._3.toint,"其他"),p._4)).groupby(p=>(p._1,p._2,p._3))
.map(p=>(p._1,p._2.map(_._4.toint).reduce(_+_))).sortby(p=>(p._1._3,p._1._1,p._1._2))
.map
.foreach(println)
//step1:兩表關聯,訂單的城市id通過城市緯度表轉換成省份
//step2:對年份、季度、省份進行聯合分組
//step3:對訂單金額進行合計,並按省份、年份、季度進行排序,格式化後輸出
通過sqlcontext(隱式轉換的方式)來處理場景,這裡只舉乙個例子場景:求每個部門中業績最高的人
val sqlcontext=new sqlcontext(context)
import sqlcontext.implicits._ //隱式轉換
case
class
temptable
(id:string,dept:string,sar:int)
var sql=data.map(_.split(",")).map(p=>temptable(p(0),p(1),p(2).toint)).todf()
sql.registertemptable("deptinfo")
val jh= sqlcontext.sql("select a.id,a.sar,a.dept from deptinfo a inner join (select max(sar) aaa,dept from deptinfo group by dept)b on a.sar=b.aaa and a.dept=b.dept order by a.sar ")
jh.foreach(println)
//把內容轉換成臨時表,並通過sql直接編寫
spark學習筆記
1 缺省會寫成一堆小檔案,需要將其重新分割槽,直接指定幾個分割槽 spark.sql select row number over partition by depid order by salary rownum from emp repartition 2 write.parquet hdfs ...
Spark學習筆記
spark不僅僅支援mapreduce,還支援sql machine learning graph運算等,比起hadoop應用更靈活寬泛。spark 中的rdd 資料結構應對mapreduce中data replication disk io serialization引起的低效問題。rdd 類似於...
Spark學習筆記
hadoop中mapreduce計算框架是基於磁碟的,每次計算結果都會直接儲存到磁碟,下一次計算又要從磁碟中讀取,因而io消耗大,迭代計算效率很低,且模型單一,不能適應複雜需求。spark是一種基於記憶體的開源計算框架,迭代計算效率非常高。另外,mapreduce的計算是一步一步來的,而spark將...