Storm中Topology任務排程策略

2021-06-21 07:02:45 字數 2902 閱讀 4390

storm中負責topo分配的工作由nimbus負責,具體**在nimbus.clj中。

對於乙個新topo的分配來說,主要經歷兩個階段:

1. 邏輯分配階段

這裡又會涉及到兩個概念executor和task,簡單講對於乙個具體的component來說,task就是component在執行時的例項個數,即component使靜態的class**,task是執行時的具體object物件,task的個數即是component在runtime是被例項化的物件個數,

而executor可以理解為執行緒的概念,乙個component對應的executor個數就是component執行時所獨佔的執行緒數,舉例來講,某個component的task個數是6,executor個數是2,則執行時component就有6個例項執行在2個執行緒中,乙個執行緒負責執行3其中3個

task,預設情況下一般會將task個數配置為executor的個數,即每乙個執行緒只負責執行乙個component的例項化物件。

具體可以看官方的解釋:

邏輯階段所作的工作就是計算topology中所有的component的executor個數,task個數,然後將所有的task分配到executor中。

2. 物理分配階段

executor代表的是執行緒,具體要落地執行還需要依附於程序,因此物理分配階段做的工作就是將所有的executor分配到worker slot程序中(乙個slot代表乙個jvm虛擬機器)。

由於在邏輯分配階段,task就是按照topo進行了排序,即相同component所屬的task排列在一起,而在物理分配階段slot資源也是按照埠進行了排序,即相同埠的slot排在了一起,

而具體分配演算法是將排好序的task一次輪序分配到排好序的slot中,因此同乙個component所屬的不同task會盡可能的分到不同機器的相同埠上的slot中,實現了整個topo的負載均衡,

這樣分配的好處是防止同乙個component的所有task都分配到同一臺機器上,造成整個集群負載不均。

具體負責topo分配的函式為

(declare mk-assignments)

具體**就不展開了,其中核心計算邏輯對應的幾個方法呼叫鏈為

1. compute-new-topology->executor->node+port

計算topology的每乙個component對應的slots資源

2. compute-topology->executors 

計算topology中每乙個component分配到的executor

3. compute-executors 

將task分配到executor中

4. storm-task-info 

計算taskid->componentid的對應關係

5. defaultscheduler.-schedule 

將executor分配到slot中

最後要說明的一點是乙個component對應的task個數是如何獲取的。

在呼叫mk-assignments之前,還呼叫了乙個叫做normalize-topology的函式,稱為topo的規範化。規範化的主要工作就是設定component的task個數,即component執行時的instance個數。

(defn normalize-topology [storm-conf ^stormtopology topology]

(let [ret (.deepcopy topology)]

(doseq [[_ component] (all-components ret)]

(.set_json_conf

(.get_common component)

(->>

(merge (component-conf component))

to-json )))

ret ))

計算component對應task個數,這裡起作用的引數有三個

1. topology-max-task-parallelis

2. topology-tasks

3. parallelism-hint (該component對應的executor個數)

具體演算法為:

1. 如果設定了topology-max-task-parallelism和topology-tasks,則獲取兩者中較小者

2. 如果沒有設定topology-tasks,但是設定了topology-max-task-parallelism和parallelism-hint,則獲取兩者中較小者

3. 如果沒有設定topology-max-task-parallelism,則取topology-tasks,若兩者都沒有設定,則取parallelism-hint

簡單可以總結為:可以為task單獨設值的引數為topology-max-task-parallelis和topology-tasks,並且以其中較小者為準,如果這兩個引數均沒有設定,則將task個數設定為parallelism-hint即該component的執行時併發executor的個數。

(defn- component-parallelism [storm-conf component]

(let [storm-conf (merge storm-conf (component-conf component))

num-tasks (or (storm-conf topology-tasks) (num-start-executors component))

max-parallelism (storm-conf topology-max-task-parallelism)

](if max-parallelism

(min max-parallelism num-tasks)

num-tasks)))

storm中fieldsGroup的機制

說實話storm功能非常強大,但是參考資料是在是太少了,有些只能自己摸索,專案中用到了fieldsgroup,所以就研究一下。fieldsgroup的機制是把declar中暴露定義的fields中的字段進行hash,然後分到不同的bolt中,開始理解有誤,所以一直跑不通。囧 直接上 在第乙個spou...

storm中的ack fail機制

概念 storm的ack fail機制也就是storm的可靠訊息處理機制,通俗來講就是給spout發出的每個tuple帶上乙個messageid,然後這個spout下面的每乙個bolt 都會給他返回乙個完成情況,只有當每乙個bolt都返回了正確的結果,整個傳送過程才算成功,任何乙個bolt處理不成功...

Storm 中的ack機制

一.ack原理 storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每乙個tuple的tuple樹 因為乙個tuple通過spout發出了,經過每乙個bolt處理後,會生成乙個新的tuple傳送出去 當acker 框架自啟動的task 發現乙個tuple樹已經處理完成了,它會傳...