前端系統推送大批量資料進入我方系統進行處理, 為了減輕我方系統的壓力, 並且充分發揮伺服器的效能, 提高處理效率, 於是使用 rabbit 做了限流處理, 同時有多執行緒執行多個消費者處理任務, 來提高效率
rabbit配置類, 其餘的基礎配置配置都維護在配置檔案或者配置中心
/***
* rabbit配置類
* @author yanqiang.jiang
* @version 1.0
* @date 2019/08/26
**/@configuration
@slf4j
public
class
rabbitconfig
/** * 單一消費者
** @return ******rabbitlistenercontaine***ctory
*/@bean
(name =
"singlelistenercontainer"
)public ******rabbitlistenercontaine***ctory listenercontainer()
/** * 多個消費者
** @return ******rabbitlistenercontaine***ctory
*/@bean
(name =
"multilistenercontainer"
)public ******rabbitlistenercontaine***ctory multilistenercontainer()
/** * rabbittemplate 配置
** @return rabbittemplate
*/@bean
public rabbittemplate rabbittemplate()
),ack({}),cause({})"
, correlationdata, ack, cause);}
);rabbittemplate.
setreturncallback
((message message,
int replycode, string replytext, string exchange, string routingkey)
->
),route({}),replycode({}),replytext({}),message:{}"
, exchange, routingkey, replycode, replytext, message);}
);return rabbittemplate;
}}
此處注意點:
factory.setmessageconverter(new jackson2jsonmessageconverter());
傳送的訊息將會使用它來序列化
factory.setacknowledgemode(acknowledgemode.manual);
必須開啟手動確認模式
factory.setconcurrentconsumers(10);
和factory.setmaxconcurrentconsumers(15);
這個表示消費者的數量, 也就是消費多執行緒執行的執行緒數目.
factory.setprefetchcount(5);
每次取的訊息的數目, 數目大效率高, 但是順序越得不到保證
/***
* rabbit 生產者
* @author yanqiang.jiang
* @version 1.0
* @date 2019/08/26
**/@component
@slf4j
public
class
accoflowhsproducer
,發布訊息:{}"
,"testqueue"
, batchnum)
;// 第乙個引數為剛剛定義的佇列名稱
this
.rabbittemplate.
convertandsend
("testqueue"
, messagebuilder.
withbody
(batchnum.
getbytes
(standardcharsets.utf_8)).
build()
);}}
/***
* rabbit 消費者
* @author yanqiang.jiang
* @version 1.0
* @date 2019/08/26
**/@component
@slf4j
public
class
accoflowhsconsumer
收到訊息"
, channel.
getchannelnumber()
);patransactiontask task = json.
parseobject
(msg.
getbody()
, patransactiontask.
class);
log.
info
("**事務落地通用消費者{}解析訊息:{}"
, channel.
getchannelnumber()
, task.
getplancontrolid()
);// 這裡最好新增重複執行判斷
// 處理**事務
runner.
excutetransaction
(task);}
catch
(exception e)
出錯", channel.
getchannelnumber()
);e.
printstacktrace()
;}channel.
basicack
(msg.
getmessageproperties()
.getdeliverytag()
,true);
log.
info
("**事務落地通用消費者{}確認消"
, channel.
getchannelnumber()
);}}
注意:
channel.basicack(msg.getmessageproperties().getdeliverytag(), true);
處理完畢後一定要確認訊息, 不然不會繼續處理下個訊息. 同時考慮異常的情況,也要手工確認
採取手動確認後處理完成後才會確認,這裡處理時間可能比較長, 這個時候訊息超時server會向消費者再次傳送訊息, 所以這裡建議添防重複處理。防止重複消費訊息。
SpringBoot使用訊息中介軟體RabbitMQ
首先在docker中安裝rabbitmq,pull 帶有web介面的 docker pull rabbitmq 3 management5672為客戶端,15672為web介面埠 docker run d p5672 5672 p15672 15672 name rabbitmq01 映象id簡要介...
R A B 大數問題
給定兩個整數a和b,其表示形式是 從個位開始,每三位數用逗號 隔開。現在請計算a b的結果,並以正常形式輸出。input 輸入包含多組資料資料,每組資料佔一行,由兩個整數a和b組成 10 9 a,b 10 9 output 請計算a b的結果,並以正常形式輸出,每組資料佔一行。sample inpu...
CentOS中利用Docker安裝RabbitMQ
centos中利用docker安裝rabbitmq 1 拉取映象 帶管理平台 docker pull rabbitmq 3.7.7 management 2 啟動容器 docker run d restart always name test rabbit e rabbitmq default us...