有時候我們需要實現乙個公共的模組,需要對多個其他的模組提供服務,最常用的方式就是實現乙個socket server,接受客戶的請求,並返回給客戶結果。
這經常涉及到如果管理多個連線及如何多執行緒的提供服務的問題,常用的方式就是連線池和執行緒池,基本流程如下:
首先伺服器端有乙個監聽執行緒,不斷監聽來自客戶端的連線。
當乙個客戶端連線到監聽執行緒後,便建立了乙個新的連線。
監聽執行緒將新建立的連線放入連線池進行管理,然後繼續監聽新來的連線。
執行緒池中有多個服務執行緒,每個執行緒都監聽乙個任務佇列,乙個建立的連線對應乙個服務任務,當服務執行緒發現有新的任務的時候,便用此連線向客戶端提供服務。
乙個socket server所能夠提供的連線數可配置,如果超過配置的個數則拒絕新的連線。
當服務執行緒完成服務的時候,客戶端關閉連線,服務執行緒關閉連線,空閒並等待處理新的任務。
連線池的監控執行緒清除其中關閉的連線物件,從而可以建立新的連線。
socket的呼叫主要包含以下的步驟:
呼叫比較複雜,我們首先區分兩類socket,一類是listening socket,一類是connected socket.
listening socket由mysocketserver負責,一旦accept,則生成乙個connected socket,又mysocket負責。
mysocket主要實現的方法如下:
int mysocket::write(const char * buf, int length)
left -= ret;
index += ret;
}if(left > 0)
return -1;
return 0;
}int mysocket::read(char * buf, int length)
return index;
}int mysocket::status()
int mysocket::close()
mysocketserver的主要方法實現如下:
int mysocketserver::init(int port)
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_addr.s_addr = htonl(inaddr_any);
serveraddr.sin_family = af_inet;
serveraddr.sin_port = htons(port);
if(bind(m_socket, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) == -1)
if(listen(m_socket, somaxconn) == -1)
struct linger lin;
lin.l_onoff = 1;
lin.l_linger = 0;
setsockopt(m_socket, sol_socket, so_linger, (const char *)&lin, sizeof(lin));
m_port = port;
m_inited = true;
return 0;
}mysocket * mysocketserver::accept()
mysocket* socket = new mysocket(sock);
return socket;
}mysocket * mysocketserver::accept(int timeout)}
乙個執行緒池一般有乙個任務佇列,啟動的各個執行緒從任務佇列中競爭任務,得到的執行緒則進行處理:listm_taskqueue;
任務佇列由鎖保護,使得執行緒安全:pthread_mutex_t m_queuemutex
任務佇列需要條件變數來支援生產者消費者模式:pthread_cond_t m_cond
如果任務列表為空,則執行緒等待,等待中的執行緒個數為:m_numwaitthreads
需要乙個列表來維護執行緒池中的執行緒:vectorm_threads
每個執行緒需要乙個執行緒執行函式:
void * __thread_new_proc(void *p)
每個執行緒由mythread類負責,主要函式如下:
int mythread::start()
int mythread::join()
int ret = pthread_join(m_thread, null);
if(ret != 0)
return –1;
void mythread::run()
while (false == m_bstop)
mytask *ptask = m_threadpool->getnexttask();
if (null != ptask)
ptask->process();
執行緒池由mythreadpool負責,主要函式如下:
int mythreadpool::init()
return 0;
}int mythreadpool::start()
ret = pthread_cond_broadcast(&m_cond);
if(ret != 0)
return –1;
return 0;
}void mythreadpool::addtask(mytask *ptask)
mytask * mythreadpool::getnexttask()
ptask = m_taskqueue.front();
m_taskqueue.pop_front();
pthread_mutex_unlock(&m_queuemutex);
return ptask;
}其中每乙個任務的執行由mytask負責,其主要方法如下:
void mytask::process()
//用read從客戶端讀取指令
//對指令進行處理
//用write向客戶端寫入結果
每個連線池儲存乙個鍊錶儲存已經建立的連線:list* m_connections
當然這個鍊錶也需要鎖來進行多執行緒保護:pthread_mutex_t m_connectionmutex;
此處乙個myconnection也是乙個mytask,由乙個執行緒來負責。
執行緒池也作為連線池的成員變數:mythreadpool * m_threadpool
連線池由類myconnectionpool負責,其主要函式如下:
void myconnectionpool::addconnection(myconnection * pconn)
myconnectionpool也要啟動乙個背後的執行緒,來管理這些連線,移除結束的連線和錯誤的連線。
void myconnectionpool::managepool()
else if (conn->iserror())
else
}pthread_mutex_unlock(&m_connectionmutex);
監聽執行緒需要有乙個mysocketserver來監聽客戶端的連線,每當形成乙個新的連線,檢視是否超過設定的最大連線數,如果超過則關閉連線,如果未超過設定的最大連線數,則形成乙個新的myconnection,將其加入連線池和執行緒池。
mysocketserver *pserver = new mysocketserver(port);
myconnectionpool *ppool = new myconnectionpool();
while (!stopflag)
mysocket * sock = pserver->acceptconnection(5);
if(sock != null)
if(m_connections.size > maxconnectionsize)
sock.close();
mytask *ptask = new myconnection();
ppool->addconnection(ptask);
摘自:
白話淺說TCP UDP面向連線,面向無連線的區別
tcp是面向連線的 udp是面向無連線的 就是這種關係了 tcp transmission control protocol,傳輸控制協議 udp user datagram protocol,使用者資料報協議 當ip包通過路由將資料傳輸到目的地時,會根據tcp或udp包頭中的源埠和目的埠資訊,請求...
面向連線與無連線
面向連線與面向無連線是兩種方法,在網路中用於相關網路協議的制定 例如tcp是面向連線的,而udp是面向無連線的 在應用中,它們代表著資料通訊的兩種不同的傳輸資料技術。本文主要指協議與服務。面向連線協議是tcp ip協議族的重要組成部分,面向連線依賴傳送方和接收方之間的顯示通訊和阻塞以管理雙方的資料傳...
面向連線的協議 2
面向連線的協議 伺服器端 由圖我們可以看出,伺服器與客戶的區別在於 伺服器必須進行套介面繫結。因為如果伺服器沒有位址,客戶就無法進行連線。s3 的作用在於告訴核心,在某個套介面上監聽並接收請求。伺服器需要監聽連線。下面,我們介紹一下有關的函式,int listen int sockfd,int ba...