原生實現rabbitmq
為什麼要使用rabbitmq訊息佇列
我們模擬這樣乙個場景,假如你在網上購物**買了東西,你支付過程5s、傳送簡訊通知5s、郵件通知等時間就更長了,你整個支的過程我們模擬這樣乙個場景,假如你在網上購物**買了東西,你支付過程5s、傳送簡訊通知5s、郵件通知等時間就更長了,你整個支付的過程就和很長了,因為**是耦合在一起,但是如果你使用訊息中間系統的話,我們支付過程就很短了,因為訊息通知都是非同步的,由訊息系實現。實現原生rabbitmq流程
建立連線工廠–>給工廠設定連線屬性(位址,埠、賬號密碼等)–>由工廠得到連線生產者 :–>通過連線得到通道–>由通道設定交換器->通道發布訊息
//建立連線工廠,設定工廠屬性
connectionfactory factory = new connectionfactory();
factory.sethost("127.0.0.1");
factory.setusername("guest");
factory.setpassword("guest");
//宣告路由鍵和交換器
private final static string exchange_name = "direct_cc_confirm_1";
//路由鍵
private final static string route_key = "error";
connection connection = factory.newconnection();
channel channel = connection.createchannel();
//通過通道宣告乙個交換器 第乙個引數時交換器的名字 第二個引數時交換器的種類
channel.exchangedeclare(exchange_name, builtinexchangetype.direct);
//通道發布訊息
channel.basicpublish(exchange_name,route_key,null,message.getbytes());
//關閉連線
channel.close();
connection.close();
消費者:
// 在通道宣告交換器前步驟相同,在以後就是不同了
channel.queuebind(queuename,exchange_name,routingkey);
system.out.println("waiting message.......");
// 建立佇列消費者 設定乙個***監聽消費訊息
final consumer consumerb = new defaultconsumer(channel)
};//消費者自動確認:autoack引數為true
channel.basicconsume(queuename, true, consumerb);
}
以上只有乙個生產者和消費者只有乙個路由鍵,但是如果生產者由很多,消費者只有乙個那就會只接受路由鍵匹配的生產者傳送的訊息生產者:
//定義一組路由鍵
stringroutingkeys = ;
for(int i=0;i<3;i++)
消費者:
//只匹配生產者由error路由鍵傳送的訊息
string routingkey = "error";
//7.佇列通過路由鍵繫結到交換器上
channel.queuebind(queuename,exchange_name,routingkey);
system.out.println("waiting message.......");
//8.設定乙個***監聽消費訊息
consumer consumerb = new defaultconsumer(channel)
};
//9.自動確認:autoack引數為flase
channel.basicconsume(queuename,flase,consumerb);
}
//消費者如果要接受所有資訊的話
//5.宣告隨機佇列
string queuename = channel.queuedeclare().getqueue();
//6.定義一組路由鍵消費所有日誌
stringroutingkeys = ;
//7.佇列通過路由鍵繫結到交換器上
for(string routingkey:routingkeys)
system.out.println("waiting message.......");
//8.設定乙個***監聽消費訊息
consumer consumera = new defaultconsumer(channel)
};channel.basicconsume(queuename,flase,consumera);
rabbbitmq自動確認機制
自動確認機制分為同步確認和非同步確認,就是在傳送方發了訊息,消費者返回ack,才確認這個訊息為傳送成功,否則就是傳送失敗,unacked.同步實現:
非同步實現channel.confirmselect();
//發布訊息的交換器上
for(int i=0;i<2;i++)
}
//9.消費者自動確認:autoack引數為true//將通道設定為傳送方確認
channel.confirmselect();
//通道被關閉*** 用於rabbitmq伺服器斷線重連
/*** 生產者非同步確認監聽
* 引數deliverytag代表了當前channel唯一的投遞
* 引數multiple:false
* */
channel.addconfirmlistener(new confirmlistener()
//rabbitmq伺服器由於自己內部出現故障沒有收到訊息
public void handlenack(long deliverytag, boolean multiple)
throws ioexception
});//生產者非同步返回監聽 這裡和發布訊息時的mandatory引數有關
//引數mandatory:mandatory=true,投遞訊息時無法找到乙個合適的佇列,把訊息返回給生產者,mandatory=false 丟棄訊息(預設)
channel.addreturnlistener(new returnlistener()
});//後面是進行發布操作
channel.basicconsume(queuename, true, consumerb);
原生ajax實現
方法 描述open method,url,async 規定請求的型別 url 以及是否非同步處理請求。method 請求的型別 get 或 post。url 檔案在伺服器上的位置。async true 非同步 或 false 同步 如果不寫預設非同步 send string 將請求傳送到伺服器。st...
rabbitmq實現延遲佇列
延遲佇列應用場景 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段...
RabbitMQ實現延時佇列
rabbitmq實現延時佇列一般有兩種形式 第一種方式 利用兩個特性 time to live ttl dead letter exchanges dlx a訊息佇列過期 傳送給b佇列 第二種方式 利用rabbitmq的外掛程式x delay message rabbitmq可以針對佇列設定x ex...