topology提交前會先判斷集群中是否存在同名作業,如果存在在提交失敗,如果沒有則會增加集群提交次數submittedcount,每次提交成功,該變數都會加1,然後會為該作業分配乙個id,生成規則如下:
public static string topologynametoid(string topologyname, int counter)
因此我們從作業id中就可以判斷集群作業成功提交次數、提交時間、還有作業名稱了,如果我們沒有指定acker數量,,storm會自動為我們生成乙個,然後進入作業校驗,對topology本身的校驗比較細緻:
1、元件id是否合法
2、是否存在同名id
3、woker數量是否合法,小於0或null
4、ack數量校驗同worker一樣
public void submittopologywithopts(string topologyname,
string uploadedjarlocation, string jsonconf,
stormtopology topology, submitoptions options)
throws alreadyaliveexception, invalidtopologyexception,
topologyassignexception, texception catch (alreadyaliveexception e) catch (throwable e)
//成功提交次數加1
int counter = data.getsubmittedcount().incrementandget();
string topologyid = common.topologynametoid(topologyname, counter);
try
serializedconf.put(config.topology_id, topologyid);
serializedconf.put(config.topology_name, topologyname);
mapstormconf;
stormconf = nimbusutils.normalizeconf(conf, serializedconf,
topology);
maptotalstormconf = new hashmap(
conf);
totalstormconf.putall(stormconf);
stormtopology normalizedtopology = nimbusutils.normalizetopology(
stormconf, topology, false);
// 校驗id、字段合法性,worker和acker數量合法性
common.validate_basic(normalizedtopology, totalstormconf,
topologyid);
// don't need generate real topology, so skip common.system_topology
// common.system_topology(totalstormconf, topology);
stormclusterstate stormclusterstate = data.getstormclusterstate();
// create /local-dir/nimbus/topologyid/***x files
// copy jar to /local-dir/nimbus/topologyid/stormjar.jar
setupstormcode(conf, topologyid, uploadedjarlocation, stormconf,
normalizedtopology);
// generate taskinfo for every bolt or spout in zk
// 為每個元件建立相應znode,並存放相應資料,資料如下:
/****
**///zk的目錄結構如下:
//[zk: localhost:2181(connected) 20] ls /jstorm/tasks/test-3-1421404402
// [3, 2, 1, 6, 5, 4]
setupzktaskinfo(conf, topologyid, stormclusterstate);
// make assignments for a topology
log.info("submit for " + topologyname + " with conf "
+ serializedconf);
//這裡開始任務分發,任務分發由topologyassign完成,這步僅僅是建立乙個事件物件放入佇列中,然後返回
//真正的任務分發由其他執行緒來操作,所以這裡返回比較快,除非佇列是滿的
// servicehandler中
makeassignment(topologyname, topologyid, options.get_initial_status());
} catch (failedassigntopologyexception e)
}
private void makeassignment(string topologyname, string topologyid,
topologyinitialstatus status) throws failedassigntopologyexception else
}
JStorm之Topology提交客戶端
乙個topology包含一或多個spout bolt,spout負責在資料來源獲得資料並傳送給bolt,每個bolt負責做完處理後發給下乙個bolt。通常topology的建立是由topologybuilder來建立的,該元件會記錄包含哪些spout bolt,並做相應驗證 各元件是否有id衝突,校...
JStorm之Supervisor啟動流程
4 分配新的任務 該元件主要包含 心跳執行緒 supervisor事件接受執行緒 處理執行緒,一旦事件接受到則會進入任務分配環節,主要邏輯 如下 public static void main string args public void run catch exception e while s...
Jstorm排程規則
任務排程演算法以worker為維度 排程過程中正在進行的排程動作不會對已發生的排程動作產生影響 排程過程中使用者可以自定義 usedefined assignment,和使用已有的old assignment,這兩者的優先順序是 usedefined assignment old assignmen...