join操作是非常常見的資料處理操作,spark作為乙個統一的大資料處理引擎,提供了非常豐富的join場景。本文分享將介紹spark所提供的5種join策略,希望對你有所幫助。本文主要包括以下內容:
參與join的資料集的大小會直接影響join操作的執行效率。同樣,也會影響join機制的選擇和join的執行效率。
join的條件會涉及字段之間的邏輯比較。根據join的條件,join可分為兩大類:等值連線和非等值連線。等值連線會涉及乙個或多個需要同時滿足的相等條件。在兩個輸入資料集的屬性之間應用每個等值條件。當使用其他運算子(運算連線符不為**=**)時,稱之為非等值連線。
在輸入資料集的記錄之間應用連線條件之後,join型別會影響join操作的結果。主要有以下幾種join型別:
spark提供了5種join機制來執行具體的join操作。該5種join機制如下所示:
簡介當要join的表資料量比較大時,可以選擇shuffle hash join。這樣可以將大表進行按照join的key進行重分割槽,保證每個相同的join key都傳送到同乙個分割槽中。如下圖示:
如上圖所示:shuffle hash join的基本步驟主要有以下兩點:
條件與特點簡介
也稱之為map端join。當有一張表較小時,我們通常選擇broadcast hash join,這樣可以避免shuffle帶來的開銷,從而提高效能。比如事實表與維表進行join時,由於維表的資料通常會很小,所以可以使用broadcast hash join將維表進行broadcast。這樣可以避免資料的shuffle(在spark中shuffle操作是很耗時的),從而提高join的效率。在進行 broadcast join 之前,spark 需要把處於 executor 端的資料先傳送到 driver 端,然後 driver 端再把資料廣播到 executor 端。如果我們需要廣播的資料比較多,會造成 driver 端出現 oom。具體如下圖示:
broadcast hash join主要包括兩個階段:
條件與特點
longmetric(
"datasize"
)+= datasize
if(datasize >=(8l
<<30)
) gb"
)}
簡介
該join機制是spark預設的,可以通過引數spark.sql.join.prefersortmergejoin進行配置,預設是true,即優先使用sort merge join。一般在兩張大表進行join時,使用該方式。sort merge join可以減少集群中的資料傳輸,該方式不會先載入所有資料的到記憶體,然後進行hashjoin,但是在join之前需要對join key進行排序。具體圖示:
sort merge join主要包括三個階段:
條件與特點簡介
如果 spark 中兩張參與 join 的表沒指定join key(on 條件)那麼會產生 cartesian product join,這個 join 得到的結果其實就是兩張行數的乘積。
條件簡介
該方式是在沒有合適的join機制可供選擇時,最終會選擇該種join策略。優先順序為:broadcast hash join > sort merge join > shuffle hash join > cartesian join > broadcast nested loop join.
在cartesian 與broadcast nested loop join之間,如果是內連線,或者非等值連線,則優先選擇broadcast nested loop策略,當時非等值連線並且一張表可以被廣播時,會選擇cartesian join。
條件與特點
有join提示(hints)的情況,按照下面的順序
沒有join提示(hints)的情況,則逐個對照下面的規則有join提示(hints),按照下面的順序
沒有join提示(hints),則逐個對照下面的規則
object joinselection extends strategy
with predicatehelper
with joinselectionhelper
}def createshufflehashjoin(onlylookingathint:
boolean)=
}def createsortmergejoin()=
else
}def createcartesianproduct()=
else
}def createjoinwithouthint()=
else
}.orelse(createsortmergejoin())
.orelse(createcartesianproduct())
.getorelse
} createbroadcasthashjoin(
true
).orelse
.orelse(createshufflehashjoin(
true))
.orelse
.getorelse(createjoinwithouthint())
if(canbuildleft(jointype)
) buildleft else buildright
}def createbroadcastnljoin(buildleft:
boolean
, buildright:
boolean)=
else
if(buildleft)
else
if(buildright)
else
maybebuildside.map
}def createcartesianproduct()=
else
}def createjoinwithouthint()=
} createbroadcastnljoin(hinttobroadcastleft(hint)
, hinttobroadcastright(hint)
).orelse
.getorelse(createjoinwithouthint())
case _ => nil}}
本文主要介紹了spark提供的5種join策略,並對三種比較重要的join策略進行了圖示解析。首先對影響join的因素進行了梳理,然後介紹了5種spark的join策略,並對每種join策略的具體含義和觸發條件進行了闡述,最後給出了join策略選擇對應的原始碼片段。希望本文能夠對你有所幫助。 Spark的五種Join策略
join操作是非常常見的資料處理操作,spark作為乙個統一的大資料處理引擎,提供了非常豐富的join場景。本文分享將介紹spark所提供的5種join策略,希望對你有所幫助。本文主要包括以下內容 參與join的資料集的大小會直接影響join操作的執行效率。同樣,也會影響join機制的選擇和join...
Spark (十) Spark 的種型別Join
join是sql語句中的常用操作,良好的表結構能夠將資料分散在不同的表中,使其符合某種正規化,減少表冗餘 更新容錯等。而建立表和表之間關係的最佳方式就是join操作。sparksql作為大資料領域的sql實現,自然也對join操作做了不少優化,今天主要看一下在sparksql中對於join,常見的3...
Spark和Hive處理資料傾斜的兩種解決方案
比如處理80tb的資料,partition數量為15000,理論上平均每個節點是5 6g的資料,但是實際上根據key value在儲存時,很有可能因為某個key的數量特別多,導致資料傾斜。這樣就會出現超過物理記憶體限制的報錯。偶爾重試可能會通過,但是會比較不穩定 目前我們這邊的兩種解決辦法是 1.如...