本文**摘錄的時候,將一些與本流程無關的內容去掉了,如有需要請看原始碼。
如果大家對dubbo rpc原理原理感興趣,可以看我之前寫過的另外一篇部落格《
dubbo rpc源
碼解讀》。
併發情況下,dubbo的rpc模型如下圖所示:
如圖所示,consumer端可能同時有多個執行緒呼叫provider的服務,此時provider會啟動多個執行緒來分別處理這些併發呼叫,處理完以後將資料返回。在這種多執行緒環境中,dubbo是如何做到consumer thread a不會拿到其他執行緒的請求結果?
其實這是乙個非常普遍的問題,例如我們的伺服器與mq伺服器、資料庫、快取等等第三方伺服器通訊時,都需要確保併發環境下資料不會錯亂,並且要盡可能降低伺服器的效能損耗。
lâ â
**dubbo rpc多執行緒通訊原理。
lâ â
寫乙個簡單的示例,實現多執行緒資料交換
如上圖所示,消費端多執行緒並行消費服務的場景,主要流程如下
dubboinvoker#doinvoke
方法中,在exchangeclient#request(inv, timeout)呼叫時,返回乙個defaultfuture物件,接著會呼叫defaultfuture.get()方法(等待返回結果)。
對於consumer端而言,伺服器會為每乙個請求建立乙個執行緒,因為rpc操作是乙個慢動作,為了節省資源,當執行緒傳送rpc請求後,需要讓當前執行緒釋放資源、進入等待佇列,當獲取到返回結果以後,再喚醒這個執行緒。
rpc請求的過程為:每乙個rpc請求都有乙個唯一的id,rpc請求的時候,會將此id也傳送給provider;provider處理完請求後會將此id和返回結果一同返回給consumer;consumer收到返回資訊以後解析出id,然後從futures中找到相對應的defaultfuture,並通過defaultfuture.done#signal()喚醒之前等待執行緒。
下面根據原始碼詳細討論一下多執行緒情況下rpc請求的細節,即dubbo多執行緒模型的實現。
1) defaultfuture#field
這裡列出了與多執行緒相關的幾個重要的屬性
private final lock lock = new reentrantlock();
private final condition done = lock.newcondition();
private static final mapfutures = new concurrenthashmap();
2)
defaultfuture#
建構函式
建立好defaultfuture物件以後,將defaultfuture存入了futures中。其實每一次請求,多會生成乙個唯一的id,即對於每個伺服器而言,id唯一。
public defaultfuture(channel channel, request request, int timeout)
3) defaultfuture#get
主要邏輯是:獲取鎖,呼叫
await
方法,此時當前執行緒進入等待佇列,此執行緒會有兩種結果過:要麼超時,要麼被喚醒;如果被喚醒,則返回
rpc的結果。
public object get(int timeout) throws remotingexception
if (! isdone())
}} catch (interruptedexception e) finally
if (! isdone())
}return returnfromresponse();
}
4) defaultfuture#received
收到返回結果時,呼叫此方法。首先從
futures
中根據id
獲取defaultfuture
,如果不存在,列印一條日誌;如果存在則通過
signal
釋放乙個喚醒訊號,將執行緒從等待佇列中喚醒。
public static void received(channel channel, response response) else
} finally
}private void doreceived(response res)
} finally
if (callback != null)
}
5) defaultfuture#received
以下**是用來從
futures
清理rpc
請求超時的
defaultfuture
private static class remotinginvocationtimeoutscan implements runnable
if (system.currenttimemillis() - future.getstarttimestamp() > future.gettimeout())
}thread.sleep(30);
} catch (throwable e) }}
}static
6) headerexchangehandler#handlerequest
建立response
物件時,帶上請求id。
response handlerequest(exchangechannel channel, request req) throws remotingexception
// find handler by message class.
object msg = req.getdata();
try catch (throwable e)
return res;
}
根據dubbo
的實現方案,我寫了乙個簡單的示例,以方便理解,見中concurrentrwtester**
多執行緒 多執行緒原理
我們首先要知道什麼是多執行緒,說白了就是多個執行緒,執行緒是什麼呢,其實就是程序執行的途徑,那麼說道這裡我們又引入了乙個新的名字,就是程序,那麼我們來看看什麼是程序,其實我們自己也能看到,啟動電腦的任務管理器,我們就可以看到程序選項,裡面是我們電腦所有的程序,我們會發現有很多的程序.簡單地說就是程序...
執行緒通訊,多執行緒
多執行緒 thread handler thread處理一些複雜的業務邏輯 耗時的事情 handler在主線程中接收訊息的乙個物件 mhandler.sendmessage msg 傳送乙個訊息物件 mhandler.sendemptymessage what 傳送空訊息,只有what沒有obj m...
多執行緒 執行緒通訊
總結 今天小鹹兒來講解乙個好玩的事,那就是執行緒之間該如何通訊,執行緒通訊之後又會出現什麼問題?先來一張導圖來看看執行緒通訊的分布?疑問 如果想要執行緒按照使用者自定義的順序執行的話,那該如何操作呢?思考 如果能夠讓執行緒等待先執行的執行緒執行完,再執行不就能達到效果了嗎!果然出現問題之後,就會有對...