業務上有乙份行車軌跡的資料 carrecord.csv 如下:
id;carnum;orgid;captime
1;粵a321;0002;20200512 102010
2;雲a321;0001;20200512 102010
3;粵a321;0001;20200512 103010
4;雲a321;0002;20200512 103010
5;粵a321;0003;20200512 114010
6;京a321;0003;20200512 114011
其中各欄位含義分別為記錄id,車牌號,抓拍卡口,抓拍時間。現在需要篩選出所有車輛最後出現的一條記錄,得到每輛車最後經過的抓拍點資訊,也就是要將其他日期的資料過濾掉,我們可以使用選擇去重。下面分別展示通過 dataframe 和 rdd 如果實現。
具體實現:
匯入行車資料;
首先使用 withcolumn() 新增 num 字段,num 欄位是由 row_number() + window() + orderby() 實現的:開窗函式中進行去重,先對車牌carnum 進行分組,倒序排序,然後取視窗內排在第一位的則為最後的行車記錄,使用 where 做過濾,最後drop掉不再使用的 num 字段;
通過 explain 列印 dataframe 的物理執行過程,show() 作為 action運算元觸發了以上的系列運算。
val cardf = spark.read.format("csv")
.option("sep", ";")
.option("inferschema", "true")
.option("header", "true")
.csv(basepath + "/car.csv")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.window
// this import is needed to use the $-notation
import spark.implicits._
val lastpasscar = cardf.withcolumn("num",
row_number().over(
window.partitionby($"carnum")
.orderby($"captime" desc)
)).where($"num" === 1).drop($"num")
lastpasscar.explain()
lastpasscar.show()
執行計畫如下:
== physical plan ==
*(3) project [id#10, carnum#11, orgid#12, captime#13]
+- *(3) filter (isnotnull(num#19) && (num#19 = 1))
+- window [row_number() windowspecdefinition(carnum#11, captime#13 desc nulls last, specifiedwindowframe(rowframe, unboundedpreceding$(), currentrow$())) as num#19], [carnum#11], [captime#13 desc nulls last]
+- *(2) sort [carnum#11 asc nulls first, captime#13 desc nulls last], false, 0
+- exchange hashpartitioning(carnum#11, 200)
+- *(1) filescan csv [id#10,carnum#11,orgid#12,captime#13]
結果如下:
// 獲得其中每輛車最後經過的卡口等資訊
+---+------+-----+---------------+
| id|carnum|orgid| captime|
+---+------+-----+---------------+
| 5|粵a321| 3|20200512 114010|
| 6|京a321| 3|20200512 114011|
| 4|雲a321| 2|20200512 103010|
+---+------+-----+---------------+
思路:
載入源資料並封裝到 carrecord 樣例類中,生成rdd;
首先通過 groupby 對 資料做分組後生成 rdd[(string, iterable[carrecord])]物件,隨即使用 map 對每個 key 對應的多組記錄(iterable[carrecord])進行reduce操作(maxby),最後在 maxby 運算元傳入乙個字面量函式(也可寫為x=>x.captime),即提取該carnum下每條記錄中的 captime 進行比對,然後選出最新時間記錄(maxby 為高階函式,依賴 reduceleft 實現);
case class carrecord(id: int, carnum: string, orgid: int, captime: string)
// 構造 schema rdd
val carrdd: rdd[carrecord] =
cardf.rdd.map(x =>
carrecord(x.getint(0), x.getstring(1), x.getint(2), x.getstring(3)))
val res = carrdd.groupby(_.carnum).map
}}res.todebugstring
res.collect.foreach(x => println(x))
實現選擇去重的兩種常用方法:
通過開窗函式 row_number+window+orderby 進行聚合後去重;
通過 groupby + maxby 等運算元進行聚合後去重。
hive 又如何實現選擇去重呢?與上文兩種方法一樣,請自行實現。
Spark實戰 如何進行選擇去重
業務上有乙份行車軌跡的資料 carrecord.csv 如下 id carnum orgid captime 1 粵a321 0002 20200512 102010 2 雲a321 0001 20200512 102010 3 粵a321 0001 20200512 103010 4 雲a321 ...
如何進行特徵選擇
前言 理論部分 乙個典型的機器學習任務是通過樣本的特徵來 樣本所對應的值。特徵過多會導致模型過於複雜,從而導致過擬合 而特徵過少則會導致模型過於簡單,從而導致欠擬合。事實上,如果特徵數大於樣本數,那麼過擬合就不可避免。特徵數比較少的時候,我們需要增加特徵。增加特徵方法很多 依照經驗 利用已有演算法提...
Spark如何進行動態資源分配
對於spark應用來說,資源是影響spark應用執行效率的乙個重要因素。當乙個長期執行的服務,若分配給它多個executor,可是卻沒有任何任務分配給它,而此時有其他的應用卻資源緊張,這就造成了很大的資源浪費和資源不合理的排程。動態資源排程就是為了解決這種場景,根據當前應用任務的負載情況,實時的增減...