storm中負責topo分配的工作由nimbus負責,具體**在nimbus.clj中。
對於乙個新topo的分配來說,主要經歷兩個階段:
1. 邏輯分配階段
這裡又會涉及到兩個概念executor和task,簡單講對於乙個具體的component來說,task就是component在執行時的例項個數,即component使靜態的class**,task是執行時的具體object物件,task的個數即是component在runtime是被例項化的物件個數,
而executor可以理解為執行緒的概念,乙個component對應的executor個數就是component執行時所獨佔的執行緒數,舉例來講,某個component的task個數是6,executor個數是2,則執行時component就有6個例項執行在2個執行緒中,乙個執行緒負責執行3其中3個
task,預設情況下一般會將task個數配置為executor的個數,即每乙個執行緒只負責執行乙個component的例項化物件。
具體可以看官方的解釋:
邏輯階段所作的工作就是計算topology中所有的component的executor個數,task個數,然後將所有的task分配到executor中。
2. 物理分配階段
executor代表的是執行緒,具體要落地執行還需要依附於程序,因此物理分配階段做的工作就是將所有的executor分配到worker slot程序中(乙個slot代表乙個jvm虛擬機器)。
由於在邏輯分配階段,task就是按照topo進行了排序,即相同component所屬的task排列在一起,而在物理分配階段slot資源也是按照埠進行了排序,即相同埠的slot排在了一起,
而具體分配演算法是將排好序的task一次輪序分配到排好序的slot中,因此同乙個component所屬的不同task會盡可能的分到不同機器的相同埠上的slot中,實現了整個topo的負載均衡,
這樣分配的好處是防止同乙個component的所有task都分配到同一臺機器上,造成整個集群負載不均。
具體負責topo分配的函式為
(declare mk-assignments)
具體**就不展開了,其中核心計算邏輯對應的幾個方法呼叫鏈為
1. compute-new-topology->executor->node+port
計算topology的每乙個component對應的slots資源
2. compute-topology->executors
計算topology中每乙個component分配到的executor
3. compute-executors
將task分配到executor中
4. storm-task-info
計算taskid->componentid的對應關係
5. defaultscheduler.-schedule
將executor分配到slot中
最後要說明的一點是乙個component對應的task個數是如何獲取的。
在呼叫mk-assignments之前,還呼叫了乙個叫做normalize-topology的函式,稱為topo的規範化。規範化的主要工作就是設定component的task個數,即component執行時的instance個數。
(defn normalize-topology [storm-conf ^stormtopology topology]
(let [ret (.deepcopy topology)]
(doseq [[_ component] (all-components ret)]
(.set_json_conf
(.get_common component)
(->>
(merge (component-conf component))
to-json )))
ret ))
計算component對應task個數,這裡起作用的引數有三個
1. topology-max-task-parallelis
2. topology-tasks
3. parallelism-hint (該component對應的executor個數)
具體演算法為:
1. 如果設定了topology-max-task-parallelism和topology-tasks,則獲取兩者中較小者
2. 如果沒有設定topology-tasks,但是設定了topology-max-task-parallelism和parallelism-hint,則獲取兩者中較小者
3. 如果沒有設定topology-max-task-parallelism,則取topology-tasks,若兩者都沒有設定,則取parallelism-hint
簡單可以總結為:可以為task單獨設值的引數為topology-max-task-parallelis和topology-tasks,並且以其中較小者為準,如果這兩個引數均沒有設定,則將task個數設定為parallelism-hint即該component的執行時併發executor的個數。
(defn- component-parallelism [storm-conf component]
(let [storm-conf (merge storm-conf (component-conf component))
num-tasks (or (storm-conf topology-tasks) (num-start-executors component))
max-parallelism (storm-conf topology-max-task-parallelism)
](if max-parallelism
(min max-parallelism num-tasks)
num-tasks)))
storm中fieldsGroup的機制
說實話storm功能非常強大,但是參考資料是在是太少了,有些只能自己摸索,專案中用到了fieldsgroup,所以就研究一下。fieldsgroup的機制是把declar中暴露定義的fields中的字段進行hash,然後分到不同的bolt中,開始理解有誤,所以一直跑不通。囧 直接上 在第乙個spou...
storm中的ack fail機制
概念 storm的ack fail機制也就是storm的可靠訊息處理機制,通俗來講就是給spout發出的每個tuple帶上乙個messageid,然後這個spout下面的每乙個bolt 都會給他返回乙個完成情況,只有當每乙個bolt都返回了正確的結果,整個傳送過程才算成功,任何乙個bolt處理不成功...
Storm 中的ack機制
一.ack原理 storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每乙個tuple的tuple樹 因為乙個tuple通過spout發出了,經過每乙個bolt處理後,會生成乙個新的tuple傳送出去 當acker 框架自啟動的task 發現乙個tuple樹已經處理完成了,它會傳...