Spark實戰 如何進行選擇去重

2022-02-27 02:47:08 字數 3104 閱讀 6126

業務上有乙份行車軌跡的資料 carrecord.csv 如下:

id;carnum;orgid;captime

1;粵a321;0002;20200512 102010

2;雲a321;0001;20200512 102010

3;粵a321;0001;20200512 103010

4;雲a321;0002;20200512 103010

5;粵a321;0003;20200512 114010

6;京a321;0003;20200512 114011

其中各欄位含義分別為記錄id,車牌號,抓拍卡口,抓拍時間。現在需要篩選出所有車輛最後出現的一條記錄,得到每輛車最後經過的抓拍點資訊,也就是要將其他日期的資料過濾掉,我們可以使用選擇去重。下面分別展示通過 dataframe 和 rdd 如果實現。

具體實現:

匯入行車資料;

首先使用 withcolumn() 新增 num 字段,num 欄位是由 row_number() + window() + orderby() 實現的:開窗函式中進行去重,先對車牌carnum 進行分組,倒序排序,然後取視窗內排在第一位的則為最後的行車記錄,使用 where 做過濾,最後drop掉不再使用的 num 字段;

通過 explain 列印 dataframe 的物理執行過程,show() 作為 action運算元觸發了以上的系列運算。

val cardf = spark.read.format("csv")

.option("sep", ";")

.option("inferschema", "true")

.option("header", "true")

.csv(basepath + "/car.csv")

import org.apache.spark.sql.functions._

import org.apache.spark.sql.expressions.window

// this import is needed to use the $-notation

import spark.implicits._

val lastpasscar = cardf.withcolumn("num",

row_number().over(

window.partitionby($"carnum")

.orderby($"captime" desc)

)).where($"num" === 1).drop($"num")

lastpasscar.explain()

lastpasscar.show()

執行計畫如下:

== physical plan ==

*(3) project [id#10, carnum#11, orgid#12, captime#13]

+- *(3) filter (isnotnull(num#19) && (num#19 = 1))

+- window [row_number() windowspecdefinition(carnum#11, captime#13 desc nulls last, specifiedwindowframe(rowframe, unboundedpreceding$(), currentrow$())) as num#19], [carnum#11], [captime#13 desc nulls last]

+- *(2) sort [carnum#11 asc nulls first, captime#13 desc nulls last], false, 0

+- exchange hashpartitioning(carnum#11, 200)

+- *(1) filescan csv [id#10,carnum#11,orgid#12,captime#13]

結果如下:

// 獲得其中每輛車最後經過的卡口等資訊

+---+------+-----+---------------+

| id|carnum|orgid| captime|

+---+------+-----+---------------+

| 5|粵a321| 3|20200512 114010|

| 6|京a321| 3|20200512 114011|

| 4|雲a321| 2|20200512 103010|

+---+------+-----+---------------+

思路:

載入源資料並封裝到 carrecord 樣例類中,生成rdd;

首先通過 groupby 對 資料做分組後生成 rdd[(string, iterable[carrecord])]物件,隨即使用 map 對每個 key 對應的多組記錄(iterable[carrecord])進行reduce操作(maxby),最後在 maxby 運算元傳入乙個字面量函式(也可寫為x=>x.captime),即提取該carnum下每條記錄中的 captime 進行比對,然後選出最新時間記錄(maxby 為高階函式,依賴 reduceleft 實現);

case class carrecord(id: int, carnum: string, orgid: int, captime: string)

// 構造 schema rdd

val carrdd: rdd[carrecord] =

cardf.rdd.map(x =>

carrecord(x.getint(0), x.getstring(1), x.getint(2), x.getstring(3)))

val res = carrdd.groupby(_.carnum).map

}}res.todebugstring

res.collect.foreach(x => println(x))

實現選擇去重的兩種常用方法:

通過開窗函式 row_number+window+orderby 進行聚合後去重;

通過 groupby + maxby 等運算元進行聚合後去重。

hive 又如何實現選擇去重呢?與上文兩種方法一樣,請自行實現。

Spark實戰 如何進行選擇去重

業務上有乙份行車軌跡的資料 carrecord.csv 如下 id carnum orgid captime 1 粵a321 0002 20200512 102010 2 雲a321 0001 20200512 102010 3 粵a321 0001 20200512 103010 4 雲a321 ...

如何進行特徵選擇

前言 理論部分 乙個典型的機器學習任務是通過樣本的特徵來 樣本所對應的值。特徵過多會導致模型過於複雜,從而導致過擬合 而特徵過少則會導致模型過於簡單,從而導致欠擬合。事實上,如果特徵數大於樣本數,那麼過擬合就不可避免。特徵數比較少的時候,我們需要增加特徵。增加特徵方法很多 依照經驗 利用已有演算法提...

Spark如何進行動態資源分配

對於spark應用來說,資源是影響spark應用執行效率的乙個重要因素。當乙個長期執行的服務,若分配給它多個executor,可是卻沒有任何任務分配給它,而此時有其他的應用卻資源緊張,這就造成了很大的資源浪費和資源不合理的排程。動態資源排程就是為了解決這種場景,根據當前應用任務的負載情況,實時的增減...