Apache Storm 官方文件 FAQ

2021-09-23 23:21:22 字數 3793 閱讀 9039

worker 的完整數量是由 supervisor 配置的。每個 supervisor 會分配到一定數量的 jvm slot,你在拓撲中設定的 worker number 就是以這個 slot 數量為依據進行分配的。

不建議為每個拓撲在每台機器上分配超過乙個 worker。

假如有乙個執行於三颱 8 核伺服器節點的拓撲,它的並行度為24,每個 bolt 在每台機器上分配有 8 個 executor(即每個 cpu 核心分配乙個)。這種場景下,使用三個 worker (每個 worker 分配 8 個executor)相對於使用更多的 worker (比如使用 24 個 worker,為每個 executor 分別分配乙個)有三點好處:

首先,在 worker 內部將資料流重新分割槽到不同的 executor 的操作(比如 shuffle 或者 group-by)就不會產生觸發到傳輸 buffer 緩衝區,tuple 會直接從傳送端轉儲到接收端的 buffer 區。這一點有很大的優勢。相反,如果目標 executor 是在同一臺機器的不同 worker 程序內,tuple 就需要經歷「傳送 -> worker 傳輸佇列 -> 本地 socket 埠 -> 接收端 worker -> 接收端 executor」這樣乙個漫長的過程。雖然這個過程並不會產生網路級傳輸,但是在同一臺機器的不同程序間的傳輸損耗也是很可觀的。

其次,三個大的聚合器帶來的大的快取空間比 24 個小聚合器帶來的小快取空間要有用得多。因為這回降低資料傾斜造成的影響,同時提高 lru 的效能。

最後,更少的 worker 可以有效地降低控制流的頻繁變動。

trident 拓撲可以設計成條件路徑(if-else)的工作流形式嗎?例如,bolt0 在接收 spout 的資料流時,可以根據輸入 tuple 的值來選擇將資料流傳送到 bolt1 或者 bolt2,而不是同時向兩個 bolt 傳送。

trident 的 「each」 運算子可以返回乙個資料流物件,你可以將該物件儲存在某個變數中,然後你可以對同乙個資料流執行多個 each 操作來分解該資料流,如下述**所示:

stream s = topology.each(...).groupby(...).aggregate(...) 

stream branch1 = s.each(..., filtera)

stream branch2 = s.each(..., filterb)

你可以使用 join、merge 或者 multireduce 來聯結各個資料流。

到目前為止,trident 暫時不支援輸出多個資料流。(詳見 storm-68)

trident spout 實際上是通過 storm 的 bolt 執行的。masterbatchcoordinator(mbc)封裝了 trident 拓撲的 spout,它負責整合 trident 中的 batch,這一點對於你所使用的任何型別的 spout 而言都是一樣的。trident 的 batch 就是在 mbc 向各個 spout-coordinator 分發種子 tuple 的過程中生成的。spout-coordinator bolt 知道你所定義的 spout 是如何互相協作的 —— 實際上,在使用 kafka 的情況下,各個 spout 就是通過 spout-coordinator 來獲取 pull 訊息所需要的 partition 和 offset 資訊的。

由於在 trident 中 mbc 才是實際執行的 spout,乙個 batch 中的所有 tuple 都是 mbc 生成的 tuple 樹的節點。也就是說,storm 的 「max spout pending」 引數實際上定義的是可以併發執行的 batch 數量。mbc 在滿足以下兩個條件下會傳送出乙個新的 batch:首先,掛起的 tuple 數需要小於 「max pending」 引數;其次,距離上乙個 batch 的傳送已經過去了至少乙個trident batch interval 的間隔時間。

是的,storm 中有乙個可選的 「spout 等待策略」,預設配置是 sleep 一段指定的配置時間。

你知道 486 時代的計算機上面為什麼有個 trubo button 嗎?這個引數的作用和這個按鈕有點像。

實際上,trident batch interval 有兩個用處。首先,它可以用於減緩 spout 從遠端資料來源獲取資料的速度,但這不會影響資料處理的效率。例如,對於乙個從給定的 s3 儲存區中讀取批量上傳檔案並按行傳送資料的 spout,我們就不希望它經常觸發 s3 的閾值,因為檔案要隔幾分鐘才會上傳一次,而且每個 batch 也需要花費一定的時間來執行。

另乙個用處是限制啟動期間或者突發資料負載情況下內部訊息佇列的負載壓力。如果 spout 突然活躍起來,並向系統中擠入了 10 個 batch 的記錄,那麼可能會有從 batch7 開始的大量不緊急的 tuple 堵塞住傳輸緩衝區,並且阻塞了從 batch3 中的 tuple(甚至可能包含 batch3 中的部分舊 tuple)的 commit 過程#。對於這種情況,我們的解決方法就是將 trident batch interval 設定為正常的端到端處理時延的一半左右 —— 也就是說如果需要花費 600 ms 的時間處理乙個 batch,那麼就可以每 300 ms 處理乙個 batch。

注意,這個 300 ms 僅僅是乙個上限值,而不是額外增加的延時時間,如果你的 batch 需要花費 258 ms 來執行,那麼 trident 就只會延時等待 42 ms。

trident 本身不會對 batch 進行限制。不過如果使用 kafka 的相關 spout,那麼就可以使用 max fetch bytes 大小除以 平均 record 大小來計算每個子 batch 分割槽的有效 record 大小。

trident 的 batch 在某種意義上是一種過載的設施。batch 大小與 partition 的數量均受限於或者是可以用於定義#:

事務安全單元(一段時間內存在風險的 tuple);

相對於每個 partition,乙個用於視窗資料流分析的有效視窗機制;

相對於每個 partition,使用 partitionquery,partitionpersist 等命令時能夠同時進行的查詢運算元量;

相對於每個 partition,spout 能夠同時分配的 record 數量。

不能在 batch 生成之後更改 batch 的大小,不過可以通過 shuffle 操作以及修改並行度的方式來改變 partition 的數量。

對於帶有固定時間戳的 records,如果需要對他們執行計數、求均值或者聚合操作,並將結果整合到離散的時間桶(time bucket)中,trident 是乙個很好的具有可擴充套件性的解決方案。

這種情況下可以寫乙個each函式來將時間戳置入乙個時間桶中:如果桶的大小是以「小時」為單位的,那麼時間戳2013-08-08 12:34:56就會被匹配到2013-08-08 12:00:00桶中,其他的 12 時到 13 時之間的時間也一樣。然後可以使用persistentaggregate來對時間桶分組。persistentaggregate會使用乙個基於資料儲存的本地 cachemap。這些包含有大量 records 的 group 會使用高效的批量讀取/寫入方式對資料儲存區進行操作,所以並不會對資料儲存區進行大量的讀操作;只要你的資料傳送足夠快捷,trident 就可以高效地使用記憶體與網路。即使某台伺服器宕機了一天,需要重新快速地傳送一整天的資料,舊有的結果也可以靜默地獲取到並進行更新,並且這並不會影響當前結果的計算過程。

很遺憾,你不會知道什麼時候所有的 event 都已經採集到了 —— 這是乙個認識論問題,而不是乙個分布式系統的問題。你可以:

# 此處譯文可能不夠準確,有疑問的讀者請參考原文對應內容。

Apache Storm 官方文件 本地模式

本地模式是一種在本地程序中模擬 storm 集群的工作模式,對於開發和測試拓撲很有幫助。在本地模式下執行拓撲與在集群模式下執行拓撲的方式很相似。建立乙個程序內的 集群 只需要使用localcluster類即可,例如 import backtype.storm.localcluster localcl...

Apache Storm 官方文件中文版

本專案是 apache storm 官方文件的中文翻譯版,致力於為有實時流計算專案需求和對 apache storm 感興趣的同學提供有價值的中文資料,希望能夠對大家的工作和學習有所幫助。說明 如果沒有特殊宣告,本專案文件中所述 storm 版本均為 0.9.x 版本。原文資料 官方 trident...

Apache Storm 官方文件中文版

本專案是 apache storm 官方文件的中文翻譯版,致力於為有實時流計算專案需求和對 apache storm 感興趣的同學提供有價值的中文資料,希望能夠對大家的工作和學習有所幫助。說明 如果沒有特殊宣告,本專案文件中所述 storm 版本均為 0.9.x 版本。原文資料 官方 trident...