SpringBoot使用Rabbit多消費者削峰

2021-09-26 21:21:23 字數 3471 閱讀 6898

前端系統推送大批量資料進入我方系統進行處理, 為了減輕我方系統的壓力, 並且充分發揮伺服器的效能, 提高處理效率, 於是使用 rabbit 做了限流處理, 同時有多執行緒執行多個消費者處理任務, 來提高效率

rabbit配置類, 其餘的基礎配置配置都維護在配置檔案或者配置中心

/***

* rabbit配置類

* @author yanqiang.jiang

* @version 1.0

* @date 2019/08/26

**/@configuration

@slf4j

public

class

rabbitconfig

/** * 單一消費者

** @return ******rabbitlistenercontaine***ctory

*/@bean

(name =

"singlelistenercontainer"

)public ******rabbitlistenercontaine***ctory listenercontainer()

/** * 多個消費者

** @return ******rabbitlistenercontaine***ctory

*/@bean

(name =

"multilistenercontainer"

)public ******rabbitlistenercontaine***ctory multilistenercontainer()

/** * rabbittemplate 配置

** @return rabbittemplate

*/@bean

public rabbittemplate rabbittemplate()

),ack({}),cause({})"

, correlationdata, ack, cause);}

);rabbittemplate.

setreturncallback

((message message,

int replycode, string replytext, string exchange, string routingkey)

->

),route({}),replycode({}),replytext({}),message:{}"

, exchange, routingkey, replycode, replytext, message);}

);return rabbittemplate;

}}

此處注意點:

factory.setmessageconverter(new jackson2jsonmessageconverter());傳送的訊息將會使用它來序列化

factory.setacknowledgemode(acknowledgemode.manual);必須開啟手動確認模式

factory.setconcurrentconsumers(10);factory.setmaxconcurrentconsumers(15);這個表示消費者的數量, 也就是消費多執行緒執行的執行緒數目.

factory.setprefetchcount(5);每次取的訊息的數目, 數目大效率高, 但是順序越得不到保證

/***

* rabbit 生產者

* @author yanqiang.jiang

* @version 1.0

* @date 2019/08/26

**/@component

@slf4j

public

class

accoflowhsproducer

,發布訊息:{}"

,"testqueue"

, batchnum)

;// 第乙個引數為剛剛定義的佇列名稱

this

.rabbittemplate.

convertandsend

("testqueue"

, messagebuilder.

withbody

(batchnum.

getbytes

(standardcharsets.utf_8)).

build()

);}}

/***

* rabbit 消費者

* @author yanqiang.jiang

* @version 1.0

* @date 2019/08/26

**/@component

@slf4j

public

class

accoflowhsconsumer

收到訊息"

, channel.

getchannelnumber()

);patransactiontask task = json.

parseobject

(msg.

getbody()

, patransactiontask.

class);

log.

info

("**事務落地通用消費者{}解析訊息:{}"

, channel.

getchannelnumber()

, task.

getplancontrolid()

);// 這裡最好新增重複執行判斷

// 處理**事務

runner.

excutetransaction

(task);}

catch

(exception e)

出錯", channel.

getchannelnumber()

);e.

printstacktrace()

;}channel.

basicack

(msg.

getmessageproperties()

.getdeliverytag()

,true);

log.

info

("**事務落地通用消費者{}確認消"

, channel.

getchannelnumber()

);}}

注意:

channel.basicack(msg.getmessageproperties().getdeliverytag(), true);處理完畢後一定要確認訊息, 不然不會繼續處理下個訊息. 同時考慮異常的情況,也要手工確認

採取手動確認後處理完成後才會確認,這裡處理時間可能比較長, 這個時候訊息超時server會向消費者再次傳送訊息, 所以這裡建議添防重複處理。防止重複消費訊息。

SpringBoot使用訊息中介軟體RabbitMQ

首先在docker中安裝rabbitmq,pull 帶有web介面的 docker pull rabbitmq 3 management5672為客戶端,15672為web介面埠 docker run d p5672 5672 p15672 15672 name rabbitmq01 映象id簡要介...

R A B 大數問題

給定兩個整數a和b,其表示形式是 從個位開始,每三位數用逗號 隔開。現在請計算a b的結果,並以正常形式輸出。input 輸入包含多組資料資料,每組資料佔一行,由兩個整數a和b組成 10 9 a,b 10 9 output 請計算a b的結果,並以正常形式輸出,每組資料佔一行。sample inpu...

CentOS中利用Docker安裝RabbitMQ

centos中利用docker安裝rabbitmq 1 拉取映象 帶管理平台 docker pull rabbitmq 3.7.7 management 2 啟動容器 docker run d restart always name test rabbit e rabbitmq default us...