《SparkSQL核心剖析》 物理計畫篇

2021-09-12 18:52:57 字數 4633 閱讀 3379

物理計畫是將spark sql生成的邏輯運算元樹對映成物理運算元樹,並將邏輯計畫的資訊對映到spark core模型中的rdd、transformation、action的過程。生成物理計畫後,一條sql語句就變成了可以執行的spark任務。

物理計畫的定義在org.apache.spark.sql.catalyst.plans.queryplan中,從定義可以看出,物理計畫是乙個抽象語法樹,樹節點的主要組成部分包括:子樹節點、出現過的表示式、出現過的子查詢;

abstract class queryplan[plantype <: queryplan[plantype]] extends treenode[plantype] = 

// 子查詢

def subqueries: seq[plantype] = {}

}

乙個樣例物理計畫如下所示:

== physical plan ==

*(5) sortmergejoin [x#3l], [y#9l], inner

:- *(2) sort [x#3l asc nulls first], false, 0

: +- exchange hashpartitioning(x#3l, 200)

: +- *(1) project [(id#0l % 2) as x#3l]

: +- *(1) filter isnotnull((id#0l % 2))

: +- *(1) range (0, 5, step=1, splits=8)

+- *(4) sort [y#9l asc nulls first], false, 0

+- exchange hashpartitioning(y#9l, 200)

+- *(3) project [(id#6l % 2) as y#9l]

+- *(3) filter isnotnull((id#6l % 2))

+- *(3) range (0, 5, step=1, splits=8)

上面列印出的是乙個物理計畫的treestring表示,其中(5) sortmergejoin(3) range這些樹節點是spark查詢的表示式,表示式開頭的數字(5)(3)代表dfs遍歷物理計畫表示式樹的順序(見org.apache.spark.sql.catalyst.trees.treenode函式generatetreestring),簡化版的表示式遍歷順序如下:

== physical plan ==

*(5) sortmergejoin

:- *(2) sort

: +- *(1) project

: +- *(1) filter

: +- *(1) range

+- *(4) sort

+- *(3) project

+- *(3) filter

+- *(3) range

將邏輯計畫轉換成物理計畫的抽象類叫做queryplanner,它定義了轉換的框架:首先得到一系列候選物理計畫、然後自底向上替換運算元樹節點的物理計畫、最後化簡物理計畫。

queryplanner源**

abstract

class

queryplanner

[physicalplan <

: treenode[physicalplan]

]else}}

}}} val pruned =

pruneplans

(plans)

assert

(pruned.hasnext, s"no plan for $plan"

) pruned

}

注意planner的最後乙個過程pruneplans,截止到spark 2.4.4,這個方法只是佔位作用,它會原樣返回輸入的全部候選物理計畫,沒有任何剪枝,見org.apache.spark.sql.execution的sparkplanner

override protected def pruneplans

(plans: iterator[sparkplan]

): iterator[sparkplan]

=

批處理

下面以joinselection為例,說明物理計畫生成策略都在做什麼;

(plan: logicalplan)

: seq[sparkplan]

= plan match

}流式計算

patterns目前有4種

分布式系統的資料是有分割槽的,作為乙個通用計算引擎,spark如何建模資料的分割槽,如何利用分割槽資訊優化計算效率是值得研究學習的。下面我們來看一下spark的分割槽體系設計。

分割槽partitioning和分布distribution是兩個緊密相關的抽象。partitioning描述乙個運算元的輸出是如何劃分的,它有兩個主要的屬性,乙個是分割槽數量,另乙個是它是否滿足給定的分布

trait partitioning

}

partitioning的幾種實現策略包括:

具體地說,範圍分割槽表示根據排序表示式計算每一行的排序值,這樣每個分割槽存在乙個min和max, 相同排序值的行儲存在相同的分割槽,相鄰的數值會保持在同乙個分割槽或相鄰的分割槽,會保留一定的資料連續性。雜湊分割槽表示根據雜湊函式計算每一行的雜湊值,這樣相同雜湊值的行儲存在相同的分割槽,並不存在太多連續性,同時良好設計的雜湊函式往往能一定程度避免資料傾斜。廣播分割槽表示資料被廣播到每個節點,輪詢分割槽表示資料平均地分配到每個節點,未知分割槽代表未知,一般用在模式匹配中作為預設分割槽值。

hashpartitioning原始碼

case class

hashpartitioning

(expressions: seq[expression]

, numpartitions: int)

extends expression with partitioning with unevaluable

case clustereddistribution(requiredclustering, requirednumpartitions)

=>

expressions.forall(x =

> requiredclustering.exists(_.semanticequals(x)))

&&(requirednumpartitions.isempty |

| requirednumpartitions.get == numpartitions)

case _ =

> false

}}}def partitionidexpression: expression = pmod(new murmur3hash(expressions)

, literal(numpartitions)

)}

rangepartitioning原始碼

case class

rangepartitioning

(ordering: seq[sortorder]

, numpartitions: int)

extends expression with partitioning with unevaluable }}

}

distribution指定了執行查詢後同乙個表示式下的不同元組將如何分布。目前有兩種物理分布特性,節點間分布(inter-node)和分區內分布(intra-partition)。節點間分布表示資料元組如何在集群的物理機器之間分布,知道節點間分布可以用來做優化,例如優先使用本地聚集,避免不必要的全域性聚集;分區內分布表示乙個分割槽的資料元組的劃分情況。

常見的distribution包括:

可以看到分割槽和分布是成對出現的,它們之間的關聯稱作satisfy, 即某個分割槽方式p滿足某個分布d。

資料庫查詢優化是一項複雜的領域,基礎原理很簡潔,應用到具體的場景又錯綜複雜。簡單來說,sql優化策略可以分成兩大類,基於規則的優化(rule based optimization, rbo)和基於代價的優化(cost based optimization, cbo)。

常見的rbo規則包括連線謂詞下推predicate pushdown常量合併 constant folding列剪枝 column prunning等。這裡不展開介紹,感興趣可以參考這篇部落格。

cbo需要對spark增加一些基礎功能,例如統計資訊收集,代價函式、對operator提供基數估計等,詳見華為貢獻的cbo的issue。

spark sql核心API整理

核心api sparksession spark入口 統一封裝sparkconf,sparkcontext,sqlcontext,配置執行引數,讀取檔案,建立資料,使用sql dataset 統一dataset介面,其中dataframe dataset row 基本實現了類似rdd的所有運算元 c...

Spark SQL工作原理剖析和效能優化

一 工作原理剖析 spark sql 架構中主要有這幾個關鍵的元件 sqlparser sql分析程式 analyser 分析器 optimizer 優化器 sparkplan spark計畫 sparksql大致的執行流程是這樣的 1.sql 語句經過sqlparser 完成sql 語句的語法解析...

sparkStreaming核心剖析

receiver從kafka中接收的資料都是一條一條的資料,那麼接下來,會將這一條一條的資料儲存到currnetbuffer arraybuffer 這時有乙個執行緒 blockintervaltimer 每隔一段時間 可配置 將currentbuffer中所有資料打包,封裝為乙個block 然後將...