對於
flume
來說主要有兩個
channel
:memory
,file
;對於線上環境主要以
filechannel
為主,因此這裡主要討論它的實現: 在
filechannel
裡主要由乙個
wal的
log和乙個記憶體佇列組成:
filechannel
的queue
主要又以下幾個部分組成:
privatefinaleventqueuebackingstore
backingstore;
privatefinal
inflighttakes;
privatefinal
inflightputs;
其中backingstore
代表了queue
在持久化存在,使用了記憶體對映檔案的方式;每次對
queue
的讀寫操作都記錄在
backingstore
的overwritemap
(update in place
)中,當進行
checkpoint
的時候合併到
elementsbuffer
並持久化到磁碟;所有未提交的正在讀寫資料都分別儲存在
inflight
結構中,當
checkpoint
時一併進行持久化,為回滾時使用; 在
inflight
中儲存了
transactionid->fileid
以及transactionid->eventptr
的對映,具體儲存在
backingstore
裡的則是
eventptr
(fileid,offset);
checkpoint file
的檔案結構如下:
file header
:1029 bytes
eventptr; 在
file header裡前8
個位元組儲存了版本號,接下來
24個位元組是
sequeuece no.(
類似rdbms
的scn)
,接下來
4個位元組儲存了
checkpoint
的狀態;
作為wal的
log主要儲存了(
transactionid,sequenceno,event),
每次讀寫都先在
log裡寫入
event
,對於寫操作會拿到
eventptr
放入queue
中;而commit
和rollback
操作在log
中的記錄形式是
(transactionid,sequenceno
,op=);
這兩個結構主要是體現在
filebackedtransaction
中如下:
filebackedtransaction extendsbasictransactionsemantics
......
linkedblockingdeque
takelist;
linkedblockingdeque
putlist;
longtransactionid;
log
log;
flumeeventqueue
queue
: eventqueuebackingstorefile
其中queue = log.getflumeeventqueue();
首先看put/take path
以及commit:
1. doput(eventevent)->
queue.addwithoutcommit(ptr, transactionid)
log.put(transactionid, event)->
synchronized logfile.writer.put(bytebufferbuffer)
putlist.offer(ptr)
2. dotake()->
flumeeventpointer ptr =
queue
.removehead(
transactionid);
takelist
.offer(ptr),
log.take(
transactionid
, ptr);
->
synchronizedlogfile.writer.take(bytebuffer buffer)
event event =
log.get(ptr);
3. docommit()->
if(puts > 0) }
elseif(takes > 0) }
從上面的**可以看出,對於每乙個
put/take
都會記錄一條
oplog
到log裡,
當commit
的時候會對
log進行
sync
到磁碟持久化,同時會把
event
指標存放到
queue
上;這裡的
log就類似於
mysql
裡的binlog(binlog_format=statement)
,而這裡的
queue
存放的是指向
event
的指標;
簡例:filechannel如下,對filechannel put了2個訊息,a,b;則在log,queue裡的儲存狀態如下,log裡儲存了(transactionid,sequenceno,event),queue則儲存了eventptr;
queue:ptr->a,ptr->b
wal log:(1,1,put a),(1,2,put b),(1,3,commit)
當例項crash
時,通過
log來恢復
queue
的狀態,類似
rdbms
一樣,replay
是很耗時的操作,因此會定期對
queue
進行checkpoint:
log在初始化的時候會啟動乙個排程執行緒
workerexecutor,
由排程執行緒定期(
checkpoint interval
)排程乙個
backgroupworkder
來進行非強制性
checkpoint;
log.writecheckpoint(boolean force):trylockexclusive->
synchronized queue.checkpoint->
backingstore
.begincheckpoint();//
檢查是否
checkpoint
正在進行
;同時進行標記
checkpoint
開始,並同步
mmap file;
inflightputs
.serializeandwrite();//
inflighttakes
.serializeandwrite();//
將inflightputs/takes
序列化並寫到相應檔案
backingstore.checkpoint();->
setlogwriteorderid(writeorderoracle.next());
writecheckpointmetadata();
//copy from overwritemap toelementsbuffer(mmap)
//標記checkpoint
結束,並同步檔案
簡例:接上例,在
a,b提交後,這時進行了一次
checkpoint
(儲存在磁碟上的
checkpoint則是2
個指標ptr->a,ptr->b
),此時
scn=4
;之後,又完成了乙個
take transaction ,ptr to a
也同時被刪除;如果這時
flume crash
,queue
從checkpoint
中重建,並且取得
checkpoint scn=4,
則replay
這之後的
log進行
crash recovery
;在恢復後,立刻執行一次
checkpoint.
queue:ptr->b
wal log:(1,1,put a),(1,2,put b),(1,3,commit),(2,5,take a),(2,6,commit)
ConcurrentHashMap原理解析
什麼是concurrenthashmap?眾所周知,hashmap是一種非常高效的資料結構,但是依舊有它的缺陷。那就是在併發插入資料時,有可能會出現帶環鍊錶,讓下一次的讀操作出現死迴圈。於是為了避免hashmap的執行緒安全問題,concurrenthashmap應運而生。concurrenthas...
ConcurrentHashMap原理解析
concurrenthashmap是jdk提供的乙個執行緒安全的集合類,它內部的結構原理和我們常用的hashmap基本是一致,那我們可以先來認識一下hashmap,這樣基本上也能大致明白concurrenthashmap了。hashmap與concurrenthashmap都是用來存放一種鍵值對形式...
理解爬蟲原理
本次作業 於 1.簡單說明爬蟲原理 請求 並提取資料的自動化流程 2.理解爬蟲開發過程 1 簡要說明瀏覽器工作原理 web瀏覽器提交請求後,通過http協議傳送給web伺服器。web伺服器接到後,進行事務處理,處理結果又通過http傳回給web瀏覽器,從而在web瀏覽器上顯示出所請求 的頁面。2 使...