SparkSQL大資料實戰 揭開Join的神秘面紗

2021-09-25 13:25:07 字數 3848 閱讀 3790

join操作是資料庫和大資料計算中的高階特性,大多數場景都需要進行複雜的join操作,本文從原理層面介紹了sparksql支援的常見join演算法及其適用場景。

join是資料庫查詢永遠繞不開的話題,傳統查詢sql技術總體可以分為簡單操作(過濾操作-where、排序操作-limit等),聚合操作-groupby以及join操作等。其中join操作是最複雜、代價最大的操作型別,也是olap場景中使用相對較多的操作。因此很有必要對其進行深入研究。

另外,從業務層面來講,使用者在數倉建設的時候也會涉及join使用的問題。通常情況下,資料倉儲中的表一般會分為「低層次表」和「高層次表」。

所謂「低層次表」,就是資料來源匯入數倉之後直接生成的表,單表列值較少,一般可以明顯歸為維度表或事實表,表和表之間大多存在外健依賴,所以查詢起來會遇到大量join運算,查詢效率很差。而「高層次表」是在「低層次表」的基礎上加工轉換而來,通常做法是使用sql語句將需要join的表預先進行合併形成「寬表」,在寬表上的查詢不需要執行大量join,效率很高。但寬表缺點是資料會有大量冗餘,且相對生成較滯後,查詢結果可能並不及時。

為了獲得時效性更高的查詢結果,大多數場景都需要進行複雜的join操作。join操作之所以複雜,主要是通常情況下其時間空間複雜度高,且有很多演算法,在不同場景下需要選擇特定演算法才能獲得最好的優化效果。本文將介紹sparksql所支援的幾種常見的join演算法及其適用場景。

當前sparksql支援三種join演算法:shuffle hash join、broadcast hash join以及sort merge join。其中前兩者歸根到底都屬於hash join,只不過在hash join之前需要先shuffle還是先broadcast。其實,hash join演算法來自於傳統資料庫,而shuffle和broadcast是大資料的皮(分布式),兩者一結合就成了大資料的演算法了。因此可以說,大資料的根就是傳統資料庫。既然hash join是「核心」,那就刨出來看看,看完把「皮」再分析一下。

先來看看這樣一條sql語句:select * from order,item where item.id = order.i_id,很簡單乙個join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。現在假設這個join採用的是hash join演算法,整個過程會經歷三步:

確定build table以及probe table:這個概念比較重要,build table使用join key構建hash table,而probe table使用join key進行探測,探測成功就可以join在一起。通常情況下,小表會作為build table,大表作為probe table。此事例中item為build table,order為probe table。

構建hash table:依次讀取build table(item)的資料,對於每一行資料根據join key(item.id)進行hash,hash到對應的bucket,生成hash table中的一條記錄。資料快取在記憶體中,如果記憶體放不下需要dump到外存。

探測:再依次掃瞄probe table(order)的資料,使用相同的hash函式對映hash table中的記錄,對映成功之後再檢查join條件(item.id = order.i_id),如果匹配成功就可以將兩者join在一起。

hash join效能如何?很顯然,hash join基本都只掃瞄兩表一次,可以認為o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街。

為什麼build table選擇小表?道理很簡單,因為構建的hash table最好能全部載入在記憶體,效率最高;這也決定了hash join演算法只適合至少乙個小表的join場景,對於兩個大表的join場景並不適用。

上文說過,hash join是傳統資料庫中的單機join演算法,在分布式環境下需要經過一定的分布式改造,就是盡可能利用分布式計算資源進行並行化計算,提高總體效率。hash join分布式改造一般有兩種經典方案:

broadcast hash join:將其中一張小表廣播分發到另一張大表所在的分割槽節點上,分別併發地與其上的分割槽記錄進行hash join。broadcast適用於小表很小,可以直接廣播的場景。

shuffler hash join:一旦小表資料量較大,此時就不再適合進行廣播分發。這種情況下,可以根據join key相同必然分割槽相同的原理,將兩張表分別按照join key進行重新組織分割槽,這樣就可以將join分而治之,劃分為很多小join,充分利用集群資源並行化。

下面分別進行詳細講解。

如下圖所示,broadcast hash join可以分為兩步:

broadcast階段:將小表廣播分發到大表所在的所有主機。廣播演算法可以有很多,最簡單的是先發給driver,driver再統一分發給所有executor;要不就是基於bittorrent的torrentbroadcast。

hash join階段:在每個executor上執行單機版hash join,小表對映,大表試探。

3.sparksql規定broadcast hash join執行的基本條件為被廣播小表必須小於引數spark.sql.autobroadcastjointhreshold,預設為10m。

在大資料條件下如果一張表很小,執行join操作最優的選擇無疑是broadcast hash join,效率最高。但是一旦小表資料量增大,廣播所需記憶體、頻寬等資源必然就會太大,broadcast hash join就不再是最優方案。此時可以按照join key進行分割槽,根據key相同必然分割槽相同的原理,就可以將大表join分而治之,劃分為很多小表的join,充分利用集群資源並行化。如下圖所示,shuffle hash join也可以分為兩步:

shuffle階段:分別將兩個表按照join key進行分割槽,將相同join key的記錄重分布到同一節點,兩張表的資料會被重分布到集群中所有節點。這個過程稱為shuffle。

hash join階段:每個分割槽節點上的資料單獨執行單機hash join演算法。

看到這裡,可以初步總結出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join演算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join演算法;那如果是兩張大表進行join呢?

sparksql對兩張大表join採用了全新的演算法-sort-merge join,如下圖所示,整個過程分為三個步驟:

shuffle階段:將兩張大表根據join key進行重新分割槽,兩張表資料會分布到整個集群,以便分布式並行處理。

sort階段:對單個分割槽節點的兩表資料,分別進行排序。

merge階段:對排好序的兩張分割槽表資料執行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊。如下圖所示:

經過上文的分析,很明顯可以得出來這幾種join的代價關係:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),資料倉儲設計時最好避免大表與大表的join查詢,sparksql也可以根據記憶體資源、頻寬資源適量將引數spark.sql.autobroadcastjointhreshold調大,讓更多join實際執行為broadcast hash join。

SparkSQL大資料實戰 揭開Join的神秘面紗

join操作是資料庫和大資料計算中的高階特性,大多數場景都需要進行複雜的join操作,本文從原理層面介紹了sparksql支援的常見join演算法及其適用場景。join是資料庫查詢永遠繞不開的話題,傳統查詢sql技術總體可以分為簡單操作 過濾操作 where 排序操作 limit等 聚合操作 gro...

大資料實戰 Hive 技巧實戰

select address from test where dt 20210218 union all select address from test where dt 20210218 address m a m a m b m b m b m b select address from te...

大資料實戰 pyspark使用

解壓 tar xvf python 3.6.3.tgz原始碼編譯安裝python3 yum install zlib devel bzip2 devel openssl devel ncurses devel sqlite devel readline devel tk devel gcc make...