CAT的Server消費訊息(一)

2021-08-28 02:39:31 字數 4297 閱讀 3765

1. 初始化realtimeconsumer實時消費,使用容器自帶的初始化前置處理initializable,初始化periodmanager,時間間隔為乙個小時,也就是以小時為維度進行統計資料,並且設定統計類和訊息分析器。

public void initialize() throws initializationexception
設定區間管理器相鄰維度的相容時間為三分鐘,也就是精度容許時間,在區間策略中儲存區間長度和相應的容錯長度。

public periodmanager(long duration, messageanalyzermanager analyzermanager,

serverstatisticmanager serverstatemanager, logger logger)

執行periodmanager#init,獲取當前時間段的開始時間,第一次執行肯定是當前時間段的開始時間。第二次執行並且在同時段的57分鐘內,則會設定m_lastendtime為第一次的開始時間,並且返回0,代表本時段沒有結束。第三次執行並且在同時段的第58分鐘,這時候就取得是下乙個小時的開始時間,設定m_laststarttime也一樣。以後當同時段的前57分鐘執行的時候都返回的是0,也就意味著是同時段,不需要開始下一時段,其他的情況和上面類似迴圈執行。

public void init()
public long next(long now) 

// prepare next period ahead

if (now - m_laststarttime >= m_duration - m_aheadtime)

// last period is over

if (now - m_lastendtime >= m_duration + m_extratime)

return 0;

}

開始乙個時段週期,獲取該時段的結束時間,初始化週期period,設定開始結束時間,訊息分析器,統計類以及日誌屬性。

private void startperiod(long starttime)
建立處理各個訊息型別的執行緒任務periodtask,包含各個型別的訊息分析器,訊息佇列以及時段的開始時間。

public period(long starttime, long endtime, messageanalyzermanager analyzermanager,

serverstatisticmanager serverstatemanager, logger logger)

analyzertasks.add(task);

}} }

2. 初始化defaultmessageanalyzermanager,使用容器擴充套件的初始化函式initialize,從容器中查詢所有實現了messageanalyzer介面的實現類,根據每個類的id放置到map中,經過一些篩選最後儲存相應的分析器名。

public void initialize() throws initializationexception 

m_analyzernames = new arraylist(map.keyset());

collections.sort(m_analyzernames, new comparator() else if (state.equals(str2))

if (top.equals(str1)) else if (top.equals(str2))

return str1.compareto(str2);

}});

m_analyzernames.remove("matrix");

m_analyzernames.remove("dependency");

}

在defaultmessageanalyzermanager#getanalyzer中,首先刪除當前時段往前第二個時段的分析器等資料,private map>> m_analyzers = new hashmap>>();最外層是時段,然後是分析器名,最後是分析器集合,查詢集合中所有符合名字條件的分析器,然後進行初始化,儲存開始時間,時段區間值,以及容許時間,最後把分析器放入集合map中儲存起來。

public void initialize(long starttime, long duration, long extratime)
3. 先用transactionanalyzer舉例說明,初始化報告管理類defaultreportmanager,預載入當前時段的分析報告

protected void loadreports()
這裡的m_name在載入components-cat-consumer.xml檔案中指定transaction,

public maploadhourlyreports(long starttime, storagepolicy policy, int index) 

try

m_reportdelegate.afterload(reports);

t.setstatus(message.success);

} catch (throwable e) finally

} return reports;

}

初始化報告塊管理器defaultreportbucketmanager,獲取hdfs的儲存路徑"target/bucket"

public void initialize() throws initializationexception
public reportbucket getreportbucket(long timestamp, string name, int index) throws ioexception
獲取hdfs根目錄,建立本時段的檔案,儲存路徑"///report-"對應timestamp, index, name,在記憶體中儲存索引檔案裡面的資料。索引檔案裡面儲存的主要是資料檔案裡面的下標,根據下標的找出訊息的長度,再通過長度找出後面相應的資料。

public void initialize(string name, date timestamp, int index) throws ioexception 

final file dir = datafile.getparentfile();

if (!dir.exists() && !dir.mkdirs())

m_logicalpath = logicalpath;

m_writedatafile = new bufferedoutputstream(new fileoutputstream(datafile, true), 8192);

m_writeindexfile = new bufferedoutputstream(new fileoutputstream(indexfile, true), 8192);

m_writedatafilelength = datafile.length();

m_readdatafile = new randomaccessfile(datafile, "r");

}

從報告塊中找出對應的報告資料,並且解析成transactionreport,最後儲存在記憶體快取中。呼叫載入的後置處理。

public string findbyid(string id) throws ioexception  catch (exception e)  finally 

} return null;

}

這個過程就是先從hdfs中載入當前時段的相應資料報告,放入記憶體中實時查詢,最後銷毀該檔案m_bucketmanager.closebucket(bucket);釋放localreportbucket類中儲存的檔案以及清空對應的記憶體資料。以上基本就是方法defaultmessageanalyzermanager#getanalyzer的全部內容。

4. 每個時段裡面包含不同訊息型別分析器的不同子任務,也就是map> m_tasks,最後把該時段加進時段管理器的集合m_periods中,執行該時段的start方法。給每個子任務設定下標並且放進守護執行緒池組"cat-realtimeconsumer"

public void start() 

} }

執行periodmanager守護執行緒。threads.forgroup("cat").start(m_periodmanager);

CAT的Server初始化

1.server初始化從web.xml檔案開始,作為乙個war包專案,首先需要初始化servlet,首先是catservlet專門初始化cat相關的server程式,比如接受客戶端傳過來的資料等等,另乙個servlet為mvc專門提供資料查詢介面的普通的mvc功能,功能相當於縮小版的springmv...

訊息佇列 保證訊息消費的冪等

昨天業務反饋了乙個問題,乙個使用者的月流水賬單重複了,拿到userid,開始定位問題之路。檢視資料庫記錄,如下圖,使用者月流水資料確實重複了 taskid同乙個批次,每個月資料都有二條 1.首先,看外部資料 商是否重複推送業務資料給我,我程式中是會設定攔截重複訊息 2.檢視訊息接收,以及訊息推送到m...

訊息消費要注意的細節

rocketmqmessagelistener consumergroup shop 消費者分組 topic order topic 要消費的主題 consumemode consumemode.concurrently,消費模式 無序和有序 messagemodel messagemodel.cl...