如何基於RabbitMQ實現優先順序佇列

2021-06-26 18:52:29 字數 2039 閱讀 6397

由於種種原因,rabbitmq到目前為止,官方還沒有實現優先順序佇列,只實現了consumer的優先順序處理。

但是,迫於種種原因,應用層面上又需要優先順序佇列,因此需求來了:如何為rabbitmq加入優先順序佇列特性。

查詢資料後,得知rabbitmq雖然官方沒有支援此特性,但是社群已經有相關優先順序佇列外掛程式了,並且這個外掛程式被列在rabbitmq官方**中了。

位址如下:

外掛程式如何安裝?

進入rabbitmq安裝目錄,進入plugins目錄,將上面這個ez檔案拷貝到plugins目錄中,然後執行命令來enable這個外掛程式

centos下,預設路徑在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.3.4/plugins(版本號可能會變化)

windows下,預設路徑在:c:\program files\rabbitmq server\rabbitmq_server-3.3.4\plugins(版本號可能會變化)

把ez檔案拷貝過去,然後執行列舉外掛程式列表命令:

找到這個優先順序佇列外掛程式名為:rabbitmq_priority_queue

執行:rabbitmq-plugins enable rabbitmq_priority_queue

ok,重新啟動rabbitmq-server服務。

這樣,server端的配置算完成了。

%26nbsp;

下面看看客戶端類庫的編寫:

我們先要定義優先順序列舉,繼承自byte,因為rabbitmq的c#客戶端優先順序是用byte來傳遞的:

先定義3個級別的優先順序:低、中、高(其實可以定義很多級別,只是為了簡化,因此只定義了3個級別)

有2個地方需要改動:

申明佇列時需要加入自定義的屬性

傳送訊息到rabbitmq時,設定自定義屬性

%26nbsp;

internal

static idictionary%26lt;string, object%26gt;queuearguments

}

%26nbsp;%26nbsp;

channel.queuedeclare("

queuename

", true, false, false, queuearguments);//queuearguments就是上面定義的這個dictionary

%26nbsp;%26nbsp;

var headers =channel.createbasicproperties();

headers.priority = (byte

)msg.priority;//在這裡把繼承自byte的列舉轉換成byte

channel.basicpublish(

"exchange

", "

route

", headers, serializerutility.serialize2bytes(msg));

%26nbsp;%26nbsp;

在裝了優先順序佇列外掛程式的rabbitmq-server例項中,所有的durable佇列必須用如上的方式,設定x-max-priority屬性,否則rabbitmq-server服務會crash

%26nbsp;

RabbitMQ 基於RPC實現

對於使用rabbitmq執行command的情況,有時候需要有返回值資訊。此時相當於client發布乙個command後,並偵聽返回結果的queue,server接收並處理,將處理結果發布到client偵聽的queue中。簡單實現如下 1.client端 private static void rp...

如何基於RabbitMQ實現優先順序佇列

由於種種原因,rabbitmq到目前為止,官方還沒有實現優先順序佇列,只實現了consumer的優先順序處理。但是,迫於種種原因,應用層面上又需要優先順序佇列,因此需求來了 如何為rabbitmq加入優先順序佇列特性。查詢資料後,得知rabbitmq雖然官方沒有支援此特性,但是社群已經有相關優先順序...

如何基於RabbitMQ實現優先順序佇列

由於種種原因,rabbitmq到目前為止,官方還沒有實現優先順序佇列,只實現了consumer的優先順序處理。但是,迫於種種原因,應用層面上又需要優先順序佇列,因此需求來了 如何為rabbitmq加入優先順序佇列特性。查詢資料後,得知rabbitmq雖然官方沒有支援此特性,但是社群已經有相關優先順序...