物理計畫是將spark sql生成的邏輯運算元樹對映成物理運算元樹,並將邏輯計畫的資訊對映到spark core模型中的rdd、transformation、action的過程。生成物理計畫後,一條sql語句就變成了可以執行的spark任務。
物理計畫的定義在org.apache.spark.sql.catalyst.plans.queryplan
中,從定義可以看出,物理計畫是乙個抽象語法樹,樹節點的主要組成部分包括:子樹節點、出現過的表示式、出現過的子查詢;
abstract class queryplan[plantype <: queryplan[plantype]] extends treenode[plantype] =
// 子查詢
def subqueries: seq[plantype] = {}
}
乙個樣例物理計畫如下所示:
== physical plan ==
*(5) sortmergejoin [x#3l], [y#9l], inner
:- *(2) sort [x#3l asc nulls first], false, 0
: +- exchange hashpartitioning(x#3l, 200)
: +- *(1) project [(id#0l % 2) as x#3l]
: +- *(1) filter isnotnull((id#0l % 2))
: +- *(1) range (0, 5, step=1, splits=8)
+- *(4) sort [y#9l asc nulls first], false, 0
+- exchange hashpartitioning(y#9l, 200)
+- *(3) project [(id#6l % 2) as y#9l]
+- *(3) filter isnotnull((id#6l % 2))
+- *(3) range (0, 5, step=1, splits=8)
上面列印出的是乙個物理計畫的treestring表示,其中(5) sortmergejoin
和(3) range
這些樹節點是spark查詢的表示式,表示式開頭的數字(5)
和(3)
代表dfs遍歷物理計畫表示式樹的順序(見org.apache.spark.sql.catalyst.trees.treenode
函式generatetreestring
),簡化版的表示式遍歷順序如下:
== physical plan ==
*(5) sortmergejoin
:- *(2) sort
: +- *(1) project
: +- *(1) filter
: +- *(1) range
+- *(4) sort
+- *(3) project
+- *(3) filter
+- *(3) range
將邏輯計畫轉換成物理計畫的抽象類叫做queryplanner,它定義了轉換的框架:首先得到一系列候選物理計畫、然後自底向上替換運算元樹節點的物理計畫、最後化簡物理計畫。
queryplanner源**
abstract
class
queryplanner
[physicalplan <
: treenode[physicalplan]
]else}}
}}} val pruned =
pruneplans
(plans)
assert
(pruned.hasnext, s"no plan for $plan"
) pruned
}
注意planner的最後乙個過程pruneplans
,截止到spark 2.4.4,這個方法只是佔位作用,它會原樣返回輸入的全部候選物理計畫,沒有任何剪枝,見org.apache.spark.sql.execution
的sparkplanner
override protected def pruneplans
(plans: iterator[sparkplan]
): iterator[sparkplan]
=
批處理
下面以joinselection為例,說明物理計畫生成策略都在做什麼;
(plan: logicalplan)
: seq[sparkplan]
= plan match
}流式計算
patterns目前有4種
分布式系統的資料是有分割槽的,作為乙個通用計算引擎,spark如何建模資料的分割槽,如何利用分割槽資訊優化計算效率是值得研究學習的。下面我們來看一下spark的分割槽體系設計。
分割槽partitioning和分布distribution是兩個緊密相關的抽象。partitioning描述乙個運算元的輸出是如何劃分的,它有兩個主要的屬性,乙個是分割槽數量,另乙個是它是否滿足給定的分布。
trait partitioning
}
partitioning的幾種實現策略包括:
具體地說,範圍分割槽表示根據排序表示式計算每一行的排序值,這樣每個分割槽存在乙個min和max, 相同排序值的行儲存在相同的分割槽,相鄰的數值會保持在同乙個分割槽或相鄰的分割槽,會保留一定的資料連續性。雜湊分割槽表示根據雜湊函式計算每一行的雜湊值,這樣相同雜湊值的行儲存在相同的分割槽,並不存在太多連續性,同時良好設計的雜湊函式往往能一定程度避免資料傾斜。廣播分割槽表示資料被廣播到每個節點,輪詢分割槽表示資料平均地分配到每個節點,未知分割槽代表未知,一般用在模式匹配中作為預設分割槽值。
hashpartitioning原始碼
case class
hashpartitioning
(expressions: seq[expression]
, numpartitions: int)
extends expression with partitioning with unevaluable
case clustereddistribution(requiredclustering, requirednumpartitions)
=>
expressions.forall(x =
> requiredclustering.exists(_.semanticequals(x)))
&&(requirednumpartitions.isempty |
| requirednumpartitions.get == numpartitions)
case _ =
> false
}}}def partitionidexpression: expression = pmod(new murmur3hash(expressions)
, literal(numpartitions)
)}
rangepartitioning原始碼
case class
rangepartitioning
(ordering: seq[sortorder]
, numpartitions: int)
extends expression with partitioning with unevaluable }}
}
distribution指定了執行查詢後同乙個表示式下的不同元組將如何分布。目前有兩種物理分布特性,節點間分布(inter-node)和分區內分布(intra-partition)。節點間分布表示資料元組如何在集群的物理機器之間分布,知道節點間分布可以用來做優化,例如優先使用本地聚集,避免不必要的全域性聚集;分區內分布表示乙個分割槽的資料元組的劃分情況。
常見的distribution包括:
可以看到分割槽和分布是成對出現的,它們之間的關聯稱作satisfy, 即某個分割槽方式p滿足某個分布d。
資料庫查詢優化是一項複雜的領域,基礎原理很簡潔,應用到具體的場景又錯綜複雜。簡單來說,sql優化策略可以分成兩大類,基於規則的優化(rule based optimization, rbo)和基於代價的優化(cost based optimization, cbo)。
常見的rbo規則包括連線謂詞下推predicate pushdown、常量合併 constant folding、列剪枝 column prunning等。這裡不展開介紹,感興趣可以參考這篇部落格。
cbo需要對spark增加一些基礎功能,例如統計資訊收集,代價函式、對operator提供基數估計等,詳見華為貢獻的cbo的issue。
spark sql核心API整理
核心api sparksession spark入口 統一封裝sparkconf,sparkcontext,sqlcontext,配置執行引數,讀取檔案,建立資料,使用sql dataset 統一dataset介面,其中dataframe dataset row 基本實現了類似rdd的所有運算元 c...
Spark SQL工作原理剖析和效能優化
一 工作原理剖析 spark sql 架構中主要有這幾個關鍵的元件 sqlparser sql分析程式 analyser 分析器 optimizer 優化器 sparkplan spark計畫 sparksql大致的執行流程是這樣的 1.sql 語句經過sqlparser 完成sql 語句的語法解析...
sparkStreaming核心剖析
receiver從kafka中接收的資料都是一條一條的資料,那麼接下來,會將這一條一條的資料儲存到currnetbuffer arraybuffer 這時有乙個執行緒 blockintervaltimer 每隔一段時間 可配置 將currentbuffer中所有資料打包,封裝為乙個block 然後將...