RabbitMQ入門 遠端過程呼叫 RPC

2022-03-18 23:59:26 字數 2725 閱讀 6450

假如我們想要呼叫遠端的乙個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(remote procedure call)。怎麼辦?

今天我們就用rabbitmq來實現乙個簡單的rpc系統:客戶端傳送乙個請求訊息,服務端以乙個響應訊息回應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送乙個**佇列用來告訴服務端響應訊息傳送到哪個佇列裡面。也就是說每個訊息乙個**佇列,在此基礎上我們變下,將**佇列定義成類的屬性,這個每個客戶端乙個佇列,同乙個客戶端的請求共用乙個佇列。那麼接下來有個問題,怎麼知道這個佇列裡面的響應訊息是屬於哪個佇列的呢?

我們會用到關聯標識(correlationid),每個請求我們都會生成乙個唯一的值作為correlationid,這樣每次有響應訊息來的時候,我們就去看correlationid來確定到底是哪個請求的響應訊息,將請求和響應關聯起來。如果收到乙個不知道的correlationid,就可以確定不是這個客戶端的請求的響應,可以直接丟棄掉。

一、工作模型

客戶端傳送啟動後,會建立獨特的**佇列。對於乙個請求傳送配置了兩個屬性的訊息:乙個是**佇列(圖中的replay_to),乙個是correlation。 這個請求會傳送到rpc_queue佇列,然後到達服務端處理。

服務端等待rpc_queue佇列的請求。當有請求到來時,它就會開始幹活並將結果通過傳送訊息來返回,該返回訊息傳送到replyto指定的佇列。

客戶端將等待**佇列返回資料。當返回的訊息到達時,它將檢查correlation id屬性。如果該屬性值和請求匹配,就將響應返回給程式。

二、**實現

接下來看**實現:

客戶端

public

class

rpcclient

//真正的處理邏輯

public string call(string msg) throws

ioexception, interruptedexception

}});

return

blockqueue.take();

}//關閉連線

public

void close() throws

ioexception

public

static

void main(string args) throws

ioexception, timeoutexception, interruptedexception

}

傳送請求的時候,它是生產者;接受響應的時候,它是消費者。

服務端

public

class

rpcserver

public

static

void main(string args) throws

interruptedexception

catch

(exception ex)

finally}};

//5.消費訊息(處理任務)

channel.basicconsume(queue_name, false

, consumer);

} catch

(ioexception e)

catch

(timeoutexception e) }}

接受請求的時候,它是消費者;傳送響應的時候,它是生產者。

執行服務端,開始等待請求

然後執行客戶端,控制台log:

服務端(多了一條列印):

****** rpc server waiting for

client request ......

*** will response to rpc client :3客戶端:

**** rpc client reciver response :[3]

三、小插曲剛開始我在寫demo的時候,client中沒有用到阻塞佇列final blockingqueueblockqueue = new arrayblockingqueue(1);,而是直接這樣寫:

@override

public

void

handledelivery(string consumertag, envelope envelope,

com.rabbitmq.client.amqp.basicproperties properties,

byte body) throws

ioexception

}

期望能列印出結果來,但是執行後發現並沒有列印:**** rpc client reciver response :[" + msg + "]的值。

原因是handledelivery()這個方法是在子執行緒中執行的,這個子執行緒執行的時候,主線程會繼續往後執行直到執行了client.close();方法而結束了。

由於主線程終止了,導致沒有列印出結果。加了阻塞佇列之後將主線程阻塞不執行close()方法,問題就解決了。

RabbitMQ入門學習系列 七 遠端呼叫RPC

生產者和消費者啟動以後,都有乙個接收事件,消費者是接收事件是處理呼叫方法以後等待生產者的返回,生產者的接收事件是處理接收生產者傳送的訊息,進行處理。消費者傳送的時候要在 佇列中加入乙個標識,標明是哪個方法進行的呼叫 生產者接收到消費以後,如果發現有訊息標識 把訊息標識繼續返回去,這樣消費者可以保證接...

RabbitMq 效能調優筆記

訂閱端每隔500ms呼叫一次amqp consume message介面函式從socket上獲取資料,正常情況下,伺服器每次會推送幾百條訊息,而且推送的頻率會比較高 導致訂閱端的本機socket緩衝區會很快存滿,導致很多訊息無法進行快取,而被丟掉 發布訊息條數 呼叫amqp comsume mess...

RabbitMq 效能調優記錄

訂閱端每隔500ms呼叫一次amqp consume message介面函式從socket上獲取資料,正常情況下,伺服器每次會推送幾百條訊息,而且推送的頻率會比較高 導致訂閱端的本機socket緩衝區會很快存滿,導致很多訊息無法進行快取,而被丟掉 發布訊息條數 呼叫amqp comsume mess...