假如我們想要呼叫遠端的乙個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(remote procedure call)。怎麼辦?
今天我們就用rabbitmq來實現乙個簡單的rpc系統:客戶端傳送乙個請求訊息,服務端以乙個響應訊息回應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送乙個**佇列用來告訴服務端響應訊息傳送到哪個佇列裡面。也就是說每個訊息乙個**佇列,在此基礎上我們變下,將**佇列定義成類的屬性,這個每個客戶端乙個佇列,同乙個客戶端的請求共用乙個佇列。那麼接下來有個問題,怎麼知道這個佇列裡面的響應訊息是屬於哪個佇列的呢?
我們會用到關聯標識(correlationid),每個請求我們都會生成乙個唯一的值作為correlationid,這樣每次有響應訊息來的時候,我們就去看correlationid來確定到底是哪個請求的響應訊息,將請求和響應關聯起來。如果收到乙個不知道的correlationid,就可以確定不是這個客戶端的請求的響應,可以直接丟棄掉。
一、工作模型
客戶端傳送啟動後,會建立獨特的**佇列。對於乙個請求傳送配置了兩個屬性的訊息:乙個是**佇列(圖中的replay_to),乙個是correlation。 這個請求會傳送到rpc_queue佇列,然後到達服務端處理。
服務端等待rpc_queue
佇列的請求。當有請求到來時,它就會開始幹活並將結果通過傳送訊息來返回,該返回訊息傳送到replyto
指定的佇列。
客戶端將等待**佇列返回資料。當返回的訊息到達時,它將檢查correlation id
屬性。如果該屬性值和請求匹配,就將響應返回給程式。
二、**實現
接下來看**實現:
客戶端
public傳送請求的時候,它是生產者;接受響應的時候,它是消費者。class
rpcclient
//真正的處理邏輯
public string call(string msg) throws
ioexception, interruptedexception
}});
return
blockqueue.take();
}//關閉連線
public
void close() throws
ioexception
public
static
void main(string args) throws
ioexception, timeoutexception, interruptedexception
}
服務端
public接受請求的時候,它是消費者;傳送響應的時候,它是生產者。class
rpcserver
public
static
void main(string args) throws
interruptedexception
catch
(exception ex)
finally}};
//5.消費訊息(處理任務)
channel.basicconsume(queue_name, false
, consumer);
} catch
(ioexception e)
catch
(timeoutexception e) }}
執行服務端,開始等待請求
然後執行客戶端,控制台log:
服務端(多了一條列印):三、小插曲剛開始我在寫demo的時候,client中沒有用到阻塞佇列final blockingqueueblockqueue = new arrayblockingqueue(1);,而是直接這樣寫:****** rpc server waiting for
client request ......
*** will response to rpc client :3客戶端:
**** rpc client reciver response :[3]
@override期望能列印出結果來,但是執行後發現並沒有列印:**** rpc client reciver response :[" + msg + "]的值。public
void
handledelivery(string consumertag, envelope envelope,
com.rabbitmq.client.amqp.basicproperties properties,
byte body) throws
ioexception
}
原因是handledelivery()這個方法是在子執行緒中執行的,這個子執行緒執行的時候,主線程會繼續往後執行直到執行了client.close();方法而結束了。
由於主線程終止了,導致沒有列印出結果。加了阻塞佇列之後將主線程阻塞不執行close()方法,問題就解決了。
RabbitMQ入門學習系列 七 遠端呼叫RPC
生產者和消費者啟動以後,都有乙個接收事件,消費者是接收事件是處理呼叫方法以後等待生產者的返回,生產者的接收事件是處理接收生產者傳送的訊息,進行處理。消費者傳送的時候要在 佇列中加入乙個標識,標明是哪個方法進行的呼叫 生產者接收到消費以後,如果發現有訊息標識 把訊息標識繼續返回去,這樣消費者可以保證接...
RabbitMq 效能調優筆記
訂閱端每隔500ms呼叫一次amqp consume message介面函式從socket上獲取資料,正常情況下,伺服器每次會推送幾百條訊息,而且推送的頻率會比較高 導致訂閱端的本機socket緩衝區會很快存滿,導致很多訊息無法進行快取,而被丟掉 發布訊息條數 呼叫amqp comsume mess...
RabbitMq 效能調優記錄
訂閱端每隔500ms呼叫一次amqp consume message介面函式從socket上獲取資料,正常情況下,伺服器每次會推送幾百條訊息,而且推送的頻率會比較高 導致訂閱端的本機socket緩衝區會很快存滿,導致很多訊息無法進行快取,而被丟掉 發布訊息條數 呼叫amqp comsume mess...