Dubbo同步呼叫和超時原始碼

2021-10-02 15:17:29 字數 3680 閱讀 7256

同步呼叫是一種阻塞式的呼叫方式,即 consumer 端**一直阻塞等待,直到 provider 端返回為止;

dubbo預設的協議是netty, netty 是nio 非同步通訊機制,那麼服務呼叫是怎麼轉化為同步的呢?

下面看原始碼:

省略一部分呼叫鏈,最終會來到這裡 dubboinvoker

protected result doinvoke(final invocation invocation) throws throwable  else 

try else if (isasync) else

}}

接著我們看同步呼叫部分,(result) currentclient.request(inv, timeout).get();

關於上面這句**,它包含兩個動作:先呼叫currentclient.request方法,通過netty傳送請求資料;然後呼叫其返回值的get方法,來獲取返回值。

1、傳送請求

這一步主要是將請求方法封裝成request物件,通過netty將資料傳送到服務端,然後返回乙個defaultfuture物件。

com.alibaba.dubbo.remoting.exchange.support.header.headerexchangechannel的request方法:

public responsefuture request(object request, int timeout) throws remotingexception 

//封裝請求資訊

request req = new request();

req.setversion("2.0.0");

req.settwoway(true);

req.setdata(request);

//構建defaultfuture物件

defaultfuture future = new defaultfuture(channel, req, timeout);

try catch (remotingexception e)

return future;

}

如上**,邏輯很清晰。關於看它的返回值是乙個defaultfuture物件,我們再看它的構造方法。

public defaultfuture(channel channel, request request, int timeout)
同時類載入的時候會啟動乙個超時掃瞄線程:

static
看看執行緒掃瞄啥:

private static class remotinginvocationtimeoutscan implements runnable 

// 如果future未完成,且超時

if (system.currenttimemillis() - future.getstarttimestamp() > future.gettimeout())

}thread.sleep(30);

} catch (throwable e) }}

}

2、獲取返回值

我們接著看get方法。com.alibaba.dubbo.remoting.exchange.support.defaultfuture#get()

public object get() throws remotingexception
com.alibaba.dubbo.remoting.exchange.support.defaultfuture#get(int)

public object get(int timeout) throws remotingexception 

//判斷 如果操作未完成

if (!isdone())

}} catch (interruptedexception e) finally

if (!isdone())

}//返回資料

return returnfromresponse();

}//獲取返回值response

private object returnfromresponse() throws remotingexception

// 正常返回,返回 result 物件

if (res.getstatus() == response.ok)

// 超時處理

if (res.getstatus() == response.client_timeout || res.getstatus() == response.server_timeout)

throw new remotingexception(channel, res.geterrormessage());

}

其中的isdone()方法為:

public boolean isdone()
我們總結下它的執行流程:

其中等待用的是 done.await(timeout, timeunit.milliseconds);

done為private final condition done;

因此呼叫的是condition的await方法,會釋放鎖和資源,singl方法會將其喚醒。

get方法後面的returnfromresponse()方法可以看到超時處理,也就是之前那個超時掃瞄線程對status進行賦值後,returnfromresponse()方法裡面對超時掃瞄線程賦值的status值進行判斷是否超時,如果超時就丟擲timeoutexception異常

response在**被賦值、await在**被通知?

在netty讀取到網路資料後,其中會呼叫到headerexchangehandler中的方法,我們來看一眼就明白了。

public class headerexchangehandler implements channelhandlerdelegate 

}}

如果response 不為空,並且不是心跳資料,就呼叫defaultfuture.received,在這個方法裡面,主要就是根據返回資訊的id找到對應的future,然後通知。

public static void received(channel channel, response response)     

try else

} finally

}

future.doreceived(response);就很簡單了,它就回答了我們上面的那兩個小問題。賦值response和await通知。

private void doreceived(response res) 

} finally

if (callback != null)

}

非同步呼叫的話用到了rpccontext.

rpccontext是dubbo中的乙個上下文資訊,它是乙個 threadlocal 的臨時狀態記錄器

Dubbo 同步 非同步呼叫的幾種方式

我們知道,dubbo 預設協議採用單一長連線,底層實現是 netty 的 nio 非同步通訊機制 基於這種機制,dubbo 實現了以下幾種呼叫方式 同步呼叫是一種阻塞式的呼叫方式,即 consumer 端 一直阻塞等待,直到 provider 端返回為止 通常,乙個典型的同步呼叫過程如下 consu...

C 委託的同步呼叫和非同步呼叫

委託的invoke方法用來進行同步呼叫。同步呼叫也可以叫阻塞呼叫,它將阻塞當前執行緒,然後執行呼叫,呼叫完畢後再繼續向下進行。同步呼叫的例子 using system using system.threading public delegate int addhandler int a,int b ...

C 委託的同步呼叫和非同步呼叫

委託的invoke方法用來進行同步呼叫。同步呼叫也可以叫阻塞呼叫,它將阻塞當前執行緒,然後執行呼叫,呼叫完畢後再繼續向下進行。同步呼叫的例子 using system using system.threading public delegate int addhandler int a,int b ...