1. 初始化realtimeconsumer實時消費,使用容器自帶的初始化前置處理initializable,初始化periodmanager,時間間隔為乙個小時,也就是以小時為維度進行統計資料,並且設定統計類和訊息分析器。
public void initialize() throws initializationexception
設定區間管理器相鄰維度的相容時間為三分鐘,也就是精度容許時間,在區間策略中儲存區間長度和相應的容錯長度。
public periodmanager(long duration, messageanalyzermanager analyzermanager,
serverstatisticmanager serverstatemanager, logger logger)
執行periodmanager#init,獲取當前時間段的開始時間,第一次執行肯定是當前時間段的開始時間。第二次執行並且在同時段的57分鐘內,則會設定m_lastendtime為第一次的開始時間,並且返回0,代表本時段沒有結束。第三次執行並且在同時段的第58分鐘,這時候就取得是下乙個小時的開始時間,設定m_laststarttime也一樣。以後當同時段的前57分鐘執行的時候都返回的是0,也就意味著是同時段,不需要開始下一時段,其他的情況和上面類似迴圈執行。
public void init()
public long next(long now)
// prepare next period ahead
if (now - m_laststarttime >= m_duration - m_aheadtime)
// last period is over
if (now - m_lastendtime >= m_duration + m_extratime)
return 0;
}
開始乙個時段週期,獲取該時段的結束時間,初始化週期period,設定開始結束時間,訊息分析器,統計類以及日誌屬性。
private void startperiod(long starttime)
建立處理各個訊息型別的執行緒任務periodtask,包含各個型別的訊息分析器,訊息佇列以及時段的開始時間。
public period(long starttime, long endtime, messageanalyzermanager analyzermanager,
serverstatisticmanager serverstatemanager, logger logger)
analyzertasks.add(task);
}} }
2. 初始化defaultmessageanalyzermanager,使用容器擴充套件的初始化函式initialize,從容器中查詢所有實現了messageanalyzer介面的實現類,根據每個類的id放置到map中,經過一些篩選最後儲存相應的分析器名。
public void initialize() throws initializationexception
m_analyzernames = new arraylist(map.keyset());
collections.sort(m_analyzernames, new comparator() else if (state.equals(str2))
if (top.equals(str1)) else if (top.equals(str2))
return str1.compareto(str2);
}});
m_analyzernames.remove("matrix");
m_analyzernames.remove("dependency");
}
在defaultmessageanalyzermanager#getanalyzer中,首先刪除當前時段往前第二個時段的分析器等資料,private map>> m_analyzers = new hashmap>>();最外層是時段,然後是分析器名,最後是分析器集合,查詢集合中所有符合名字條件的分析器,然後進行初始化,儲存開始時間,時段區間值,以及容許時間,最後把分析器放入集合map中儲存起來。
public void initialize(long starttime, long duration, long extratime)
3. 先用transactionanalyzer舉例說明,初始化報告管理類defaultreportmanager,預載入當前時段的分析報告
protected void loadreports()
這裡的m_name在載入components-cat-consumer.xml檔案中指定transaction,
public maploadhourlyreports(long starttime, storagepolicy policy, int index)
try
m_reportdelegate.afterload(reports);
t.setstatus(message.success);
} catch (throwable e) finally
} return reports;
}
初始化報告塊管理器defaultreportbucketmanager,獲取hdfs的儲存路徑"target/bucket"
public void initialize() throws initializationexception
public reportbucket getreportbucket(long timestamp, string name, int index) throws ioexception
獲取hdfs根目錄,建立本時段的檔案,儲存路徑"///report-"對應timestamp, index, name,在記憶體中儲存索引檔案裡面的資料。索引檔案裡面儲存的主要是資料檔案裡面的下標,根據下標的找出訊息的長度,再通過長度找出後面相應的資料。
public void initialize(string name, date timestamp, int index) throws ioexception
final file dir = datafile.getparentfile();
if (!dir.exists() && !dir.mkdirs())
m_logicalpath = logicalpath;
m_writedatafile = new bufferedoutputstream(new fileoutputstream(datafile, true), 8192);
m_writeindexfile = new bufferedoutputstream(new fileoutputstream(indexfile, true), 8192);
m_writedatafilelength = datafile.length();
m_readdatafile = new randomaccessfile(datafile, "r");
}
從報告塊中找出對應的報告資料,並且解析成transactionreport,最後儲存在記憶體快取中。呼叫載入的後置處理。
public string findbyid(string id) throws ioexception catch (exception e) finally
} return null;
}
這個過程就是先從hdfs中載入當前時段的相應資料報告,放入記憶體中實時查詢,最後銷毀該檔案m_bucketmanager.closebucket(bucket);釋放localreportbucket類中儲存的檔案以及清空對應的記憶體資料。以上基本就是方法defaultmessageanalyzermanager#getanalyzer的全部內容。
4. 每個時段裡面包含不同訊息型別分析器的不同子任務,也就是map> m_tasks,最後把該時段加進時段管理器的集合m_periods中,執行該時段的start方法。給每個子任務設定下標並且放進守護執行緒池組"cat-realtimeconsumer"
public void start()
} }
執行periodmanager守護執行緒。threads.forgroup("cat").start(m_periodmanager); CAT的Server初始化
1.server初始化從web.xml檔案開始,作為乙個war包專案,首先需要初始化servlet,首先是catservlet專門初始化cat相關的server程式,比如接受客戶端傳過來的資料等等,另乙個servlet為mvc專門提供資料查詢介面的普通的mvc功能,功能相當於縮小版的springmv...
訊息佇列 保證訊息消費的冪等
昨天業務反饋了乙個問題,乙個使用者的月流水賬單重複了,拿到userid,開始定位問題之路。檢視資料庫記錄,如下圖,使用者月流水資料確實重複了 taskid同乙個批次,每個月資料都有二條 1.首先,看外部資料 商是否重複推送業務資料給我,我程式中是會設定攔截重複訊息 2.檢視訊息接收,以及訊息推送到m...
訊息消費要注意的細節
rocketmqmessagelistener consumergroup shop 消費者分組 topic order topic 要消費的主題 consumemode consumemode.concurrently,消費模式 無序和有序 messagemodel messagemodel.cl...