spark2 4 sql 快速列去重(冗餘列檢查)

2021-10-01 17:45:58 字數 2209 閱讀 2773

一直想做乙個勤奮的人,筆耕方田,將自己在從事spark開發四年來積累的奇淫巧技分享出來。在給大家提供參考方案的同時也在總結和優化之前的設計。如果在有幸碰到大牛忍不住提出更好的優化方案能從中受益,也不枉碼了這麼多字。每當設計出乙個很好的計算方案,就會忍不住開啟部落格想分享出來。然後會一直琢磨該以什麼樣的文字描述出來,想著想著就放棄了。總是想構思乙個比較完美的結構段落,結果到最後什麼也沒有寫出來。分享也就這樣一直拖下去了。可能也有很多勤奮的人,和我一樣毀於強迫症。

回歸正題

背景:最近在做的專案,需要處理大量的列。需要對這些列進行去除冗餘列,保留不相同的列。大概有15000+列左右。

第一版設計

使用了最原始的計算方式,兩兩組合進行比較。算倒是能夠計算的出來,只是效能實在試太差了,並且時不時的會爆出codegene buffer 超出了64k。調整了很久最終也能穩定的計算過去,只是這樣計算實在試太慢。

效能分析

1.5萬列,兩兩組合大概有15000*14999/2=112492500種組合,由於會邊比較邊丟失冗餘列所以組合數大概在0.9億左右。先不考慮行數,列組合比較就需要0.9億組合,甚至列組合比行數還要多。這種計算實在是太消耗資源和時間!

第二版設計

先計算每列的非空count和基元個數(count distinct),只有count和基元個數相同的情況下,再去兩兩組合比較。雖然在前期計算非空count和(count distinct)消耗了不少時間,但是整體計算時間變成了原來的三分之一。

效能分析

如何計算1.5w列,每列的非空count和基元個數,也經歷了兩個版本的調優。後期會寫部落格分享如何只用三次shuffle計算1.5萬列的count和基元數

第三版設計

從做第一版開始,就在思考乙個問題:如何將hash或者hashset應用到這個場景中。使用hashset或者hashmap可定會有一種更快的設計方案,可以快速算出冗餘列。後來慢慢就設計出了乙個方案,使用set自動去重的特性設計出了這一版方案

原理:使用udaf,眾所周知,在spark中編寫udaf需要實現比較重要的幾個函式:

update :相當於aggregatebykey的map端的操作,將每條資料做緣生意轉換並放到初始容器中

merge:相當於aggregatebykey的reduce端操作,做容器容器之間的合併

evaluate:最後轉換容器中資料,返回最最終資料

inputschema:輸入列的資料型別

bufferschema:中間容器的資料型別

datatype:最終返回資料的資料型別

initialize: 初始化容器

起始原理非常簡單,首先使用spark sql內建函式,把需要比較的列放到array中array(colarray:_*).alia("tmp_arr"),然後呼叫編寫的udaf。

udaf update函式:

偽**:array.zipwithpartition.tomap.keyset.toarray

描述:將傳入的值和下標配接在一起,然後把值相同的去掉,最後只留當前row,全部不相同的列和列下標。由於spark sql不支援set資料結構,所以最終需要將資料轉回array。由於在scala中tomap是將array中資料依次新增到map中,所有後面的資料如果和前面的相同,後面的值和下標會覆蓋已經出現過的。最終返回全部不一樣的列的下標。

udaf merge函式:

在merge函式中只需要將傳過來的下標陣列取交集就可以了

這樣最終返回的就是全部不相同的列下標陣列

計算過程演進:

如下為**:

class chckudaf extends userdefinedaggregatefunction 

override def update(buffer: mutableaggregationbuffer, input: row): unit =

} override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit =

} override def evaluate(buffer: row): any = buffer.getmap[int, string](0).keyset.toarray

}

快算24的解法

time limit 1000ms memory limit 65536k total submissions 4646 accepted 2851 description 給定4個不大於10的正整數 範圍1 10 要求在不改變資料先後順序的情況下,採用加減乘除四種運算,找到乙個表示式,使得最後的結...

spark比MapReduce快的原因

park比mapreduce快的原因 spark是基於記憶體的,而mapreduce是基於磁碟的迭代 mapreduce的設設計 中間結果儲存在檔案中,提高了可靠性,減少了記憶體占用。但是犧牲了效能。spark的設計 資料在記憶體中進行交換,要快一些,但是記憶體這個東西,可靠性不如磁碟。所以效能方面...

Spark 為什麼比Hadoop快

spark sql比hadoop hive快,是有一定條件的,而且不是spark sql的引擎比hive的引擎快,相反,hive的hql引擎還比spark sql的引擎更快。color red b 其實,關鍵還是在於spark 本身快。b color color red size large b s...