乙個topology包含一或多個spout bolt,spout負責在資料來源獲得資料並傳送給bolt,每個bolt負責做完處理後發給下乙個bolt。通常topology的建立是由topologybuilder來建立的,該元件會記錄包含哪些spout bolt,並做相應驗證:各元件是否有id衝突,校驗方法如下:
private void validateunusedid(string id)
if (_spouts.containskey(id))
if (_statespouts.containskey(id))
}
topologybuilder會儲存各個元件到相應的資料結構中,資料結構如下:
public class topologybuilder
元件配置資訊存放方法如下
private void initcommon(string id, icomponent component, number parallelism)
map conf = component.getcomponentconfiguration();
if (conf != null)
common.set_json_conf(utils.to_json(conf));
_commons.put(id, common);
}
資訊儲存好後,在topology階段builder會根據這些資訊建立乙個stormtopology例項,然後由stormsubmitter.submittopology進行提交,該階段分兩步:1、上傳jar檔案 2、提交作業
public static void submittopology(string name, map stormconf,
stormtopology topology, submitoptions opts)
throws alreadyaliveexception, invalidtopologyexception
stormconf = new hashmap(stormconf);
stormconf.putall(utils.readcommandlineopts());
map conf = utils.readstormconfig();
conf.putall(stormconf);
putuserinfo(conf, stormconf);
try else
//上傳jar檔案,下面會詳細解釋這個方法
submitjar(conf);
try else
} finally
} log.info("finished submitting topology: " + name);
} catch (invalidtopologyexception e)
}
jar檔案上傳包含兩部分,jar檔案本身和其依賴的庫檔案都會被傳到服務端,預設上傳buf大小為512k,可以通過nimbus.thrift.max_buffer_size來調整buf大小,服務端儲存的目錄結構如下:
[hongmin.lhm@rt2l02045 ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/
/home/hongmin.lhm/jstorm_data/nimbus/inbox/
`-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3
`-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar
private static void submitjar(map conf)
if (localjar != null)
submittedjar = submitjar(conf, localjar,
uploadlocation, client);
} else
} catch (exception e) finally
} else
}
JStorm之Topology提交服務端
topology提交前會先判斷集群中是否存在同名作業,如果存在在提交失敗,如果沒有則會增加集群提交次數submittedcount,每次提交成功,該變數都會加1,然後會為該作業分配乙個id,生成規則如下 public static string topologynametoid string top...
JStorm之Supervisor啟動流程
4 分配新的任務 該元件主要包含 心跳執行緒 supervisor事件接受執行緒 處理執行緒,一旦事件接受到則會進入任務分配環節,主要邏輯 如下 public static void main string args public void run catch exception e while s...
Jstorm排程規則
任務排程演算法以worker為維度 排程過程中正在進行的排程動作不會對已發生的排程動作產生影響 排程過程中使用者可以自定義 usedefined assignment,和使用已有的old assignment,這兩者的優先順序是 usedefined assignment old assignmen...