寫這個模型需要提前了解selector以及channel,之前記錄過filechannel,除此之外還有以下幾種channel:
serversocketchannel:用於監聽新的tcp連線的通道,負責讀取&響應,通常用於服務端的實現。
socketchannel:用於發起tcp連線,讀寫網路中的資料,通常用於客戶端的實現。
datagramchannel:上述兩個通道基於tcp傳輸協議,而這個通道則基於udp,用於讀寫網路中的資料。
filechannel:從檔案讀取資料。
本篇重點放在serversocketchannel和socketchannel上,大部分客戶端/服務端為了保證資料準確性,都是基於tcp傳輸協議實現的,由於使用selector註冊必須要求被註冊的channel是非阻塞模式的,因此filechannel由於沒有非阻塞模式(無法設定configureblocking(false)),沒辦法和註冊到selector。
selector是個通道註冊器(用法會在程式裡標註),是實現reactor模型的關鍵,多個通道均可以註冊到selector,selector負責監聽每個channel的幾個事件:連線就緒、寫就緒、讀就緒,當某個channel註冊感興趣就緒事件到selector時,若發生興趣事件就緒,則selector.select()方法不再阻塞,返回興趣事件集合(可能包含多個channel的),然後按照事件不同進行分發處理。
selector返回對應的就緒事件,封裝為selectionkey,每個channel對應乙個selectionkey,這個物件還可以通過attach方法附著處理類(handler、acceptor等)。
先來看個簡單使用selector做處理的服務端實現,可以簡單對selector和selectionkey的用法做個了解:
public static void main(string args) throws ioexception else if (key.isreadable())
system.out.println(string.format("收到來自 %s 的訊息: %s",
socketchannel.getremoteaddress(),
new string(buffer.array())));
}keys.remove(key);}}
}
上面是乙個簡單的例子,接下來,就利用選擇器、通道來實現reactor單執行緒模型。
實現服務端,服務端負責接收客戶端的連線,接收客戶端的請求資料以及響應客戶端。
圖1
通過上圖,我們需要實現的模組有reactor、acceptor、handler,下面來逐個編寫:
該模組內部包含兩個核心方法,select和dispatch,該模組負責監聽就緒事件和對事件的分發處理:
public class reactor implements runnable
@override
public void run()
selected.clear();
}} catch (ioexception e)
}void dispatch(selectionkey k) }}
細節已標註。
這個模組只負責處理連線就緒事件,有了這個事件就可以拿到客戶單的socketchannel,就可以繼續完成接下來的讀寫任務了:
public class acceptor implements runnable
@override
public void run()
} catch (ioexception e) }}
細節已標註。
這個模組負責接下來的讀寫操作:
public class handler implements runnable
@override
public void run()
} catch (ioexception e) catch (ioexception e2) }}
private void read() throws ioexception else }}
void send() throws ioexception
//沒斷開連線,則再次切換到讀
status = read;
selectionkey.interestops(selectionkey.op_read);}}
}
細節已標註。
關鍵模組已實現,下面來啟動服務端:
new thread(new reactor(2333)).start();
接下來同樣利用selector編寫客戶端,客戶端需要做的事情就是傳送訊息到服務端,等待服務端響應,然後再次傳送訊息,發夠10條訊息斷開連線:
public class nioclient implements runnable catch (ioexception e)
}@override
public void run()
selected.clear();
}} catch (ioexception e)
}void dispatch(selectionkey k) }}
細節已標註。
public class connector implements runnable
@override
public void run()
} catch (ioexception e) }}
細節已標註。
public class handler implements runnable
@override
public void run()
} catch (ioexception e) catch (ioexception e2) }}
void send() throws ioexception else }}
private void read() throws ioexception }}
細節已標註。
下面啟動客戶端去連線之前的服務端:
new thread(new nioclient("127.0.0.1", 2333)).start();
new thread(new nioclient("127.0.0.1", 2333)).start();
上面模擬了兩個客戶端同時連到服務端,執行結果如下:
服務端執行結果:
圖2
客戶端執行結果如下:
圖3
單執行緒reactor模型有個致命的缺點,通過上述例子可以看出,整個執行流程都是線性的,客戶端請求→服務端讀取→服務端響應→客戶端收到響應→客戶端再次傳送請求,那麼在這個鏈路中,如果handler中某個位置存在效能瓶頸,比如我們可以改造下服務端的send方法:
try catch (interruptedexception e)
int count = socketchannel.write(sendbuffer);
在響應客戶端之前睡眠2s,當做是效能瓶頸點,同樣的再次開兩個客戶端同時訪問服務端,每個客戶端傳送10條訊息,會發現,程式直接執行了40s,這是大多數情況下不願意看到的,因此,就有了多執行緒reactor模式,跟bio為了提高效能將讀操作放到乙個獨立執行緒處理一樣,reactor這樣做,也是為了解決上面提到的效能問題,只不過nio比bio做非同步有個最大的優勢就是nio不會阻塞乙個執行緒,類似read這種操作狀態都是由selector負責監聽的,不像bio裡都是阻塞的,只要被非同步出去,那麼一定是非阻塞的業務**(除非是人為將**搞成阻塞),而bio由於read本身阻塞,因此會阻塞掉整個執行緒,這也是同樣是非同步為什麼nio可以更加高效的原因之一。
那麼單執行緒reactor適用於什麼情況呢?適用於那種程式複雜度很低的系統,例如redis,其大部分操作都是非常高效的,很多命令的時間複雜度直接為o(1),這種情況下適合這種簡單的reactor模型實現服務端。
**
簡單執行緒池實現
執行緒池可以處理多執行緒問題,只要將任務放到任務佇列中,執行緒池中的執行緒就會從佇列中取任務,以預設的優先順序開始執行,如果你的任務數大於正在工作的執行緒數,則執行緒池將會建立一根新的執行緒來輔助工作,但是永遠都不會超過執行緒池中線程的最大值。執行緒池的結構 pragma once include ...
簡單執行緒池實現
1.用於執行大量相對短暫的任務 2.當任務增加的時候能夠動態的增加執行緒池中線程的數量值到達乙個閾值 3.當任務執行完畢的時候,能夠動態的銷毀執行緒池中的執行緒 4.該執行緒池的實現本質上也是生產者與消費者模型的應用。生產者執行緒向任務佇列新增任務,一旦佇列有任務到來,如果有等待 執行緒就喚醒來執行...
簡單執行緒池實現
1.用於執行大量相對短暫的任務 2.當任務增加的時候能夠動態的增加執行緒池中線程的數量值到達乙個閾值 3.當任務執行完畢的時候,能夠動態的銷毀執行緒池中的執行緒 4.該執行緒池的實現本質上也是生產者與消費者模型的應用。生產者執行緒向任務佇列新增任務,一旦佇列有任務到來,如果有等待執行緒就喚醒來執行任...