safetimer的使用
ceph中的事件都是整合自context
類,事件處理方法是finish(),這是乙個純虛函式,由不同的事件完成自己的處理方法。
ceph的定時器原理是開啟執行緒,迴圈檢查是否有事件需要執行,如果沒有就掛起,等待觸發條件到來。主要用於集群中的某些特定任務,比如心跳機制,心跳機制就是在finish()中每隔一段事件ping一次。
safetimer中是迴圈檢查需要執行的事件,所以維護了兩個map,乙個是包含時間和事件資訊的schedule,其中的事件按照時間排序,safetimer會檢視schedule中的第乙個任務是否到期,如果到了就呼叫callback函式執行,如果沒有就直接退出迴圈,是任務按時間執行的依據;另乙個是events,主要用來檢查任務是否正確新增或取消。
名稱型別
keyvalue
schedule
multimap
utime_t
context
events
mapcontext
schedule 的迭代器
該類是從thread類繼承,擁有乙個safetimer類成員變數和entry()成員函式,safetimer中的成員thread為此型別,承擔了定時器迴圈檢查是否有事件觸發需要執行相應程式的任務。
class
safetimerthread
:public thread
void
*entry
() override
};
safetimer類中有若干成員函式,包括
名稱作用
引數init()
定時器初始化
none
shutdown()
關閉定時器
none
timer_thread()
定時器的迴圈執行緒
none
add_event_after()
一段時間後新增事件
seconds ,callback
add_event_at()
在某個時間點新增事件
when, callback
cancel_event()
取消某個事件
callback
cancel_all_events()
取消所有事件
none
dump()
caller
init()
初始化並建立定時器的執行緒
void safetimer::
init()
shutdown()
關閉定時器,join()
函式應該會使得日誌中先顯示timer_thread awake
然後才是timer_thread exiting
。
void safetimer::
shutdown()
}
為什麼執行緒都結束了還要lock一下?
timer_thread()
這裡是safetimer執行緒執行的核心部分,配合流程圖分析。該執行緒只要定時器不停止就一直存在,迴圈檢查,有事件需要執行就去呼叫相應的callback函式執行;沒有的時候就處於掛起狀態,直達有新的事件新增進來,或者時間到了需要執行任務,該執行緒將會被變數cond
的signal喚醒
void safetimer::
timer_thread()
// recheck stopping if we dropped the lockif(
!safe_callbacks && stopping)
break
;ldout
(cct,20)
<<
"timer_thread going to sleep"
<< dendl;
if(schedule.
empty()
) cond.
wait
(lock)
;//這裡執行緒會進入等待佇列,掛起不執行後面的程式?直到由任務新增進來需要執行
else
cond.
waituntil
(lock, schedule.
begin()
->first)
;、而如果是執行事件還未到,那就掛起到時間條件滿足
ldout
(cct,20)
<<
"timer_thread awake"
<< dendl;
}ldout
(cct,10)
<<
"timer_thread exiting"
<< dendl;
lock.
unlock()
;}
add_event_after
,add_event_at
,cancel_event
,cancel_all_events
作用就是schedule和events中的事件新增和刪除,保證事件都能夠執行到。
safetimer在ceph中常用於定時的任務,比如monitor節點對集群的監控,osd之間的心跳機制等等。下面從ceph的osd.cc中的原始碼來分析safetimer的使用。
通常safetimer定義之後,會呼叫safetimer的呼叫函式,確定其將響應的事件cct
,然後會呼叫init()
函式初始化並啟動定時器。如osd.cc中
agent_timer
(osd-
>client_messenger-
>cct, agent_timer_lock);
agent_timer.
init()
;
如果需要持續不斷執行某一任務,那麼就需要不斷地呼叫add_event_after()
函式來為schedule新增事件,如
void osdservice::
agent_entry()
uint64_t level = agent_queue.
rbegin()
->first;
set& top = agent_queue.
rbegin()
->second;
dout(10
)<<
__func__
<<
" tiers "
<< agent_queue.
size()
<<
", top is "
<< level
<<
" with pgs "
<< top.
size()
<<
", ops "
<< agent_ops <<
"/"<< cct-
>_conf-
>osd_agent_max_ops
<<
(agent_active ?
" active"
:" not active"
)<< dendl;
dout(20
)<<
__func__
<<
" oids "
<< agent_oids << dendl;
int max = cct-
>_conf-
>osd_agent_max_ops - agent_ops;
int agent_flush_quota = max;if(
!flush_mode_high_count)
agent_flush_quota = cct-
>_conf-
>osd_agent_max_low_ops - agent_ops;
if(agent_flush_quota <=
0|| top.
empty()
||!agent_active)if(
!agent_valid_iterator || agent_queue_pos == top.
end())
pgref pg =
*agent_queue_pos;
dout(10
)<<
"high_count "
<< flush_mode_high_count
<<
" agent_ops "
<< agent_ops
<<
" flush_quota "
<< agent_flush_quota << dendl;
agent_lock.
unlock()
;if(!pg-
>
agent_work
(max, agent_flush_quota)
) agent_lock.
lock()
;}agent_lock.
unlock()
;dout(10
)<<
__func__
<<
" finish"
<< dendl;
}
如果不再需要定時器,那麼直接shutdown()
就可以
agent_timer.
shutdown()
;
Ceph中的序列化
yuandong 2015.05.11 作為主要和磁碟 網路打交道的分布式儲存系統,序列化是最基礎的功能之一,今天我們來看一下ceph中序列化的設計與實現。1 ceph序列化的方式 序列化 ceph稱之為encode 的目的是將資料結構表示為二進位製流的方式,以便通過網路傳輸或儲存在磁碟等儲存介質上...
在ceph中 pool PG OSD的關係
pool是儲存物件的邏輯分割槽,它規定了資料冗餘的型別和對應的副本分布策略 支援兩種型別 副本 replicated 和 糾刪碼 erasure code 目前我們公司內部使用的pool都是副本型別 3副本 pg placement group 是乙個放置策略組,它是物件的集合,該集合裡的所有物件都...
部署CEPH 一 (CEPH的環境準備)
實驗拓撲圖 配置ceph節點 二 為node1節點儲存各台主機的金鑰 三 為node1節點生成公鑰及金鑰 四 實現node1節點遠端各台主機免密登陸 包含node1自己 node1操作 五 為node6節點 客戶端 配置時間伺服器 node6操作 六 為node1 5配置為ntp伺服器 node6 ...