1.main函式中呼叫thread_init(),初始化setting.num_threads個worker執行緒以及乙個主線程dispatcher_thread。
每個worker執行緒用pipe建立乙個管道,並註冊libevent事件,當管道的讀端可以讀時,就呼叫thread_libevent_process()函式。
thread_libevent_process()做的事情等下再說。
void thread_init(int nthreads, struct event_base *main_base)
//每個執行緒開啟乙個管道
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
//註冊libevent事件。當該執行緒的管道的讀端可以讀時,呼叫thread_libevent_process()
setup_thread(&threads[i]);
}/* create threads after we've done all the libevent setup. */
/* 建立worker執行緒,並讓每個執行緒進入event_base_loop()迴圈。
前面的threads陣列只是建立了nthreads個libevent_thread物件,
並初始化了libevent以及其他資訊。並沒有真正開始乙個新的程序。
在create_worker裡才真正呼叫了pthread_create,
並且每個程序都進入了event_base_loop()
*/ for (i = 0; i < nthreads; i++)
}static void setup_thread(libevent_thread *me)
/*初始化每個執行緒的new_conn_queue成員。
new_conn_queue成員是乙個conn_queue指標,相當於乙個佇列,
記錄分配到該執行緒的,等待new乙個conn物件的那些item的資訊。
每次呼叫thread_libevent_process()時,
就從該佇列中取出乙個item,然後建立乙個conn*/
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue); }
static void create_worker(void *(*func)(void *), void *arg)
}static void *worker_libevent(void *arg)
2.main函式中,然後呼叫server_sockets,在指定的埠和ip位址上分別建立tcp和udp的監聽。然後
1)如果是tcp,就呼叫conn_new(sfd, conn_listening, ev_read|ev+persist, 1, tcp_transport, main_base),該函式在main_base上建立libevent事件,當sfd可讀時,呼叫event_handler(). 而event_handler()又呼叫drive_machine().
2)如果是udp,就呼叫dispatch_conn_new(sfd, conn_read, ev_read | ev_persist, udp_read_buffer_size, udp_transport),該函式用round-robin的方法找到乙個worker執行緒,然後new乙個cq_item物件,把該item物件放到worker執行緒的new_conn_queue佇列中。然後,通知該worker執行緒。通知的方法是,往該執行緒的寫端寫乙個位元組,這樣,該執行緒的讀端就可讀了。由於之前已經講過,每個worker執行緒註冊了libevent事件,當讀端可讀時,就呼叫thread_libevent_process()。
server_sockets()呼叫server_socket(), server_socket()先在指定埠和ip上建立socket,得到檔案描述符sfd,然後bind,然後
static int server_socket(const char *inte***ce,
int port,
enum network_transport transport,
file *portnumber_file)
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1)
if (!is_udp(transport) && listen(sfd, settings.backlog) == -1)
if (is_udp(transport))
} else
} }
3. 繼續2.1
如果是tcp,把conn的狀態設為conn_listening, 設定libevent事件。當乙個新的tcp connection到達時,進入drive_machine。
static void drive_machine(conn *c)
呼叫dispatch_conn_new,把新的tcp connection的sfd指派到某個worker執行緒上。
4. 繼續2.2
注意server_socket中,當協議是udp時,呼叫dispatch_new_conn,但是是在乙個for迴圈中。就是說,對於乙個udp的描述符,指派了settings.num_threads_per_udp個執行緒來監控該udp描述符。
這叫做「驚群」。當乙個udp連線到達時,所有的libevent都能檢測到該事件,但是只有乙個執行緒呼叫recvfrom能返回資料。其他執行緒都返回失敗。
udp用驚群而tcp不用的原因可能是:對於tcp,主線程(即dispatch_thread)會一直監控是否有新的tcp connection到達。如果到達,就會指派乙個worker thread處理它。而對於udp,如果不用驚群,那麼只有乙個worker 執行緒監控udp 請求。因此,當該執行緒在處理乙個udp請求時,其他的udp請求就不能得到及時處理。
另乙個原因是:
「tcp是定然不能這樣設計的,所以只能accept之後分發到特定執行緒,因為是位元組流.
而udp是包,隨便哪個執行緒處理包1,再換個執行緒處理包2也是沒有問題的.」
就像unix network programming上說的,大多數tcp伺服器是併發的,而udp伺服器是迭代的。
linux上安裝memecached安裝小記!
最近在專案中使用了memcached來控制緩衝,到專案部署階段,memcached在linux上部署產生了困難,耗時半天,在此,整理安裝步驟,希望對大家有所幫助。我安裝時在網上參考多個資料,最有用的是 1.進入 usr lib目錄,使用命令 ls al libevent 檢視是否已安裝,linux系...
執行緒模型 讀寫模型(1)
讀寫模型 讀寫模型是乙個稍微複雜一些的模型。乙份共享資源允許多個讀者同時讀取。但是只要有乙個寫者在寫這份共享資源,任何其他的讀者和寫者都不能訪問這份共享資源。讀寫模型實現起來,不僅需要訊號量機制,還需要額外的讀者計數和寫者計數。public static final object signal ne...
11執行緒 多執行緒模型
一 什麼是執行緒 為什麼要引入執行緒 二 引入執行緒機制後的變化 三 執行緒有的屬性 四 執行緒的實現方式 五 多執行緒模型 1 執行緒引入原因分析 2 執行緒 可以把執行緒理解為輕量級程序。執行緒是基本的cpu執行單元,也是程式執行流的最小單元。引入執行緒後不僅程序可以併發,程序內的執行緒也可以併...