資料
a.txt:
u1 12 zs
u2 15 xx
u3 18 aaa
u4 20 xa1
u5 22 xa2
b.txt
u1 2016 9 m1
u2 2017 12 m2
u3 2017 1 m3
u3 2014 2 m4
u3 2012 3 m5
2. 需求
兩個資料集:
資料集a id,age,name
資料集b id,year,month,movie
資料集都是使用空格來分割的
要求,輸出:id,age,name,year,month,movie(同乙個使用者,按year公升序,沒有資料b的id,用null補
邏輯
import org.apache
.spark
.rdd
.rdd
import org.apache
.spark.
import scala.collection
.mutable
object dealdataexample1_3
//輸入引數
val array(input1,input2,output)=args
val conf = new sparkconf()
conf.setmaster("local")
.get******name)
val sc = new sparkcontext(conf)
//獲取資料
val data1: rdd[string] = sc.textfile(input1)
val data2: rdd[string] = sc.textfile(input2)
//切割資料
val cutres1: rdd[(string, string)] = data1.map(,values=$")
(id, values)
})val cutres2: rdd[(string, string)] = data2.map(,values=$")
(id, values)
})//println(cutres2.collect().tobuffer)
//組合資料
val combyres: rdd[(string, (iterable[string], iterable[string]))] = cutres1.cogroup(cutres2)
//println(combyres.collect().tobuffer)
//(u4,(compactbuffer(20 xa1),compactbuffer()))
//資料預處理
val dealres: rdd[(string, string)] = combyres.map()
//println(dealres.collect().tobuffer)
//以key分組
val groupres: rdd[(string, iterable[string])] = dealres.groupbykey(1)
val selectres: rdd[(string, string)] = groupres.mapvalues( else
usermsg.concat(",").concat(moviedata)
})println(selectres.collect().tobuffer)
sc.stop()
}}
結果
u1 12 zs,2016 9 m1
u2 15 xx,2017 12 m2
u3 18 aaa,2012 3 m5 2014 2 m4 2017 1 m3
u4 20 xa1,null,null,null
u5 22 xa2,null,null,null
資料處理案例,資料質量案例
1 名稱錯誤 北京海定區世紀城 北京海淀區 2 全半形替換 北京 餐飲 改為 北京 餐飲 3 空格替換 海淀區世紀城金源時代商務中心 c座 4 資料補齊 樊城區建華路 出現這個位址,應該加上湖北省襄陽市。a 做一張地區mapping表 省份 市 縣 鎮鄉 如果有郵編號,首先我們可以根據郵編號找到對應...
大資料處理平台與案例
大資料能夠在國內得到快速發展,甚至是國家層面的支援,最為重要的一點就是我們純國產大資料處理技術的突破以及跨越式發展。在網際網路深刻改變我們的生活 工作方式的當下,資料就成為了最為重要的資料。尤其是資料安全問題就更為突出,前階段的facebook使用者資料洩漏所引發產生的一系列問題,就充分的說明了資料...
資料處理之資料型別(一)
型別signed char short int和long統稱為符號整型 他們的無符號版本統稱為無符號整型。bool char wchar t 符號整型和無符號整型統稱為整型。float double和long double統稱為浮點型。整數和浮點型統稱算術 arithmetic 型別 一 int s...