在使用阿里的rocketmq傳送資料時,大資料量,多執行緒情況下,會產生並非網路原因的傳送失敗.故使用以下傳送方式,方法是採用內部靜態型別的單例模式.
@component
public class producerbeansingleton ")
private string producerid;
@value("$")
private string accesskey;
@value("$")
private string secretkey;
@value("$")
private string onsaddr;
private static producer producer;
private static class singletonholder
private producerbeansingleton (){}
public static final producerbeansingleton getinstance()
@postconstruct
public void init()
public producer getproducer()
public mapsendmsg2mq(string topic, string tag, string object,
string key) throws interruptedexception ,tag={},key={},object={}", topic, tag, key,object);
if (!isstartmq)
mapresult = new hashmap<>();
//producer init = init();
message msg = null;
try else
// 非同步傳送訊息, 傳送結果通過 callback 返回給客戶端。
// producer.sendasync(msg, new sendcallback() ,tag={},
key={},msgid={}", topic, tag, key, sendresult.getmessageid());
//// }
//// @override
// public void onexception(onexceptioncontext context) ,tag={},key={},msgid={}", topic, tag, key, context.getmessageid());
// }
// });
mapresultmap=new hashmap<>();
resultmap.put("msgid",sendresult.getmessageid());
return responseinfo.dook("傳送成功!");
} catch (onsclientexception e) ,topic={},tag={},msgid={}", e.getmessage(), topic, tag, msg.getmsgid());
return responseinfo.dofail(e);
} catch (exception e) ", e.getmessage());
return responseinfo.dofail(e);
}}
RocketMQ 訊息傳送
訊息傳送基本流程 1 訊息驗證 驗證主題 topic 訊息體不能為空和大小不能超過4m。2 路由查詢 a 檢視快取,是否有topic的路由資訊。b 如果沒有則到nameserver中獲取路由資訊,如果快取內能找到則獲取相應路由資訊。c 從快取中獲取上一次異常的broker節點資訊,跟獲取到的節點資訊...
RocketMQ訊息順序傳送和消費問題
事故現場分析 由於創新業務產品上線,運營產品想通過一些活動來刺激使用者,採用註冊邀請機制即可獲取積分的相關活動。考慮到後續可能還有其他可能的活動來發放積分,所以設計的時候,採用mq訊息模式發放積分,非同步解耦,並能夠保證資料的最終一致性。考慮到使用者積分計算的時候可能存在併發操作的情況,想到兩種解決...
RocketMQ(04) 傳送順序訊息
如果你的業務上對訊息的傳送和消費順序有較高的需求,那麼在傳送訊息的時候你需要把它們放到同乙個訊息佇列中,因為只有同乙個佇列的訊息才能確保消費的順序性。下面 我們在傳送訊息的時候,呼叫的是需要傳遞messagequeueselector的send 該方法還可以傳遞乙個額外的引數,其對應messageq...