// synchronization flush 同步刷盤
if (flushdisktype.sync_flush == this.defaultmessagestore.getmessagestoreconfig().getflushdisktype())
} else
}// asynchronous flush ②
else else
}}①同步刷盤使用groupcommitservice
②非同步刷盤 且開啟了transientstorepoolenable且不是從伺服器,使用commitlogservice 否則使用flushcommitlogservice刷盤
public synchronized void putrequest(final groupcommitrequest request)
if (hasnotified.compareandset(false, true))
}
public void run() catch (exception e)
}// under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try catch (interruptedexception e)
synchronized (this)
this.docommit();
commitlog.log.info(this.getservicename() + " service end");
}
private void docommit()
}//喚醒等待刷盤完成的阻塞執行緒
req.wakeupcustomer(flushok);
}if (storetimestamp > 0)
this.requestsread.clear();
} else
}}
public boolean flush(final int flushleastpages)
}return result;
}
①根據刷盤指標找到對應的檔案
public int flush(final int flushleastpages) else
} catch (throwable e)
//設定刷盤指標
this.flushedposition.set(value);
this.release();
} else
}//返回刷盤指標位置
return this.getflushedposition();
}
未開啟transientstorepoolenable
public void run()
try else
if (printflushprogress)
long begin = system.currenttimemillis();
//至少滿4頁才刷盤 但是每10秒將會強制刷盤一次,flushphysicqueueleastpages會被設定為0
if (storetimestamp > 0)
long past = system.currenttimemillis() - begin;
if (past > 500) ms", past);
}} catch (throwable e)
}
非同步將writebuffer的資料刷到filechannel
public void run()
try
if (end - begin > 500) ms", end - begin);
}this.waitforrunning(interval);
} catch (throwable e)
}boolean result = false;
for (int i = 0; i < retry_times_over && !result; i++)
commitlog.log.info(this.getservicename() + " service end");
}
public boolean commit(final int commitleastpages)
return result;
}
public int commit(final int commitleastpages)
//是否可以進行commit,至少堆積commitleastpages頁資料 為0的話表示強制commit
if (this.isabletocommit(commitleastpages)) else
}// all dirty data has been committed to filechannel. 檔案已經寫滿且已經全commit,可以把writebuffer歸還給池子裡了
if (writebuffer != null && this.transientstorepool != null && this.filesize == this.committedposition.get())
//返回commit指標
return this.committedposition.get();
}
protected void commit0(final int commitleastpages) catch (throwable e)
}}
RocketMQ訊息刷盤
刷盤策略 commitlog在初始化的時候,會根據配置,啟動兩種不同的刷盤服務。1.broker 同步刷盤 if flushdisktype.sync flush this defaultmessagestore.getmessagestoreconfig getflushdisktype catc...
RocketMQ 主從同步機制
主從同步 ha 高可用 主從同步原理 為了保證系統的高可用,訊息到達主伺服器後,需要將訊息同步到從伺服器。如果主伺服器宕機,消費者可用從從伺服器拉取訊息。大體步驟 1 主伺服器啟動,監聽從伺服器的鏈結。2 從伺服器主動鏈結主伺服器,建立tcp相關鏈結。3 從伺服器主動向主伺服器傳送待拉取訊息偏移量,...
RocketMQ 的訊息傳遞機制及AOP
1,mq中訊息投遞分為兩種,一種是生產者往mq broker種投遞,另一種是broker往消費者投遞 乙個訊息主題對應了多個訊息佇列,所以會產生兩個問題,生成者應該把訊息放入到哪個佇列種,消費者應該從哪個訊息佇列中拉取訊息。因為訊息在系統之間傳遞的時候,跨越網路,訊息的傳播無法保證其有序 生產者投遞...