RabbitMQ訊息的處理

2022-07-27 02:03:10 字數 4562 閱讀 9556

訊息的確認,是指生產者投遞訊息後,如果broker收到訊息,則會給我們生產這乙個應答。

生產者進行接收應答,用來確定這條訊息是否正常的傳送到broker,這種方式也是訊息的可靠性投遞的核心保障。

確認機制流程圖

如何實現confirm確認訊息?

第一步:在channel上開啟確認模式:channel.confirmselect()

第二步:在channel上新增監聽:addconfirmlistener,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新傳送、或者記錄日誌等後續處理。

消費者

public class consumer 

}}

生產者

public class producer 

@override

public void handleack(long deliverytag, boolean multiple) throws ioexception

});}}

return listener用於處理一些不可路由的訊息我們的訊息生產者,通過指定乙個exchange和routingkey,把訊息送到某乙個佇列中,然後我們的消費者監聽佇列,進行訊息處理操作。

但是在某些情況下,如果我們在傳送訊息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候我們需要監聽這種不可達的訊息,就要使用return listener。

在基礎api中有乙個關鍵的配置項:

mandatory:如果為true,則監聽會接收到路由不可達的訊息,然後進行後續處理,如果為false,那麼broker端自動刪除該訊息。(預設false)

public class consumer 

}}

public class producer 

});channel.basicpublish(exchange, routingkeyerror, true, null, msg.getbytes());

routingkeyerror, true, null, msg.getbytes());

}}

我們一般在**中編寫while迴圈,進行consumer.nextdelivery方法進行獲取下一條訊息,然後進行消費處理,比較low。

使用自定義的consumer更加的方便,解耦性更強

自定義consumer

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte body) throws ioexception

}

public class consumer 

}

public class producer 

}}

我們rabbitmq伺服器有上萬條未處理的訊息,我們隨便開啟乙個消費者客戶端,會出現下面情況:巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多資料。(導致伺服器崩潰,線上故障)生產端一次推送幾百條資料庫,客戶端只接收一兩條,在高併發的情況下,不能再生產端做限流,只能在消費端處理。

解決方法

rabbitmq提供了一種qos(服務質量保證)功能,在非自動確認訊息的前提下,如果一定資料的訊息(通過基於consumer或者channel設定qos的值)未被確認前,不進行消費新的訊息。

void basicqos(uint prefetchsize,ushort prefetchcount,bool global);
消費端體現,一次最多能處理多少條訊息(基本上為1),限流策略在什麼上應用(channel--true,consumer---false)

prefetchsize和global這兩項,rabbitmq沒有實現,暫不研究

prefetch_count在no_ack=false的情況下生效,在自動應答的情況下兩個值不生效

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte body) throws ioexception

}

public class consumer 

}

public class producer }}

消費端進行消費的時候,如果由於業務異常我們可以進行日誌的記錄,然後進行補償。

如果由於伺服器宕機等嚴重問題,那麼我們就需要手工進行ack保障消費端成功。

為了對沒有處理成功的訊息,把訊息重新回遞給broker

一般我們在實際應用中,都會關閉重回佇列,也就是設定為false

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte body) throws ioexception catch (interruptedexception e)

if((integer)properties.getheaders().get("num") == 0) else

}}

public class consumer 

}

public class producer 

}}

ttl是time to live的縮寫,也就是生存時間

rabbitmq支援訊息的過期時間,在訊息傳送時可以進行指定

rabbitmq支援佇列的過期時間,從訊息入佇列開始計算,只要超過了佇列的超過時間配置,那麼訊息會自動的清除

amqp.basicproperties properties = new amqp.basicproperties.builder()

.deliverymode(2)

.contentencoding("utf-8")

.expiration("10000")

.build();

訊息10s過期,ttl是佇列過期時間

dlx,dead-letter-exchange

利用dlx,當訊息在乙個佇列中變成死信之後,它能夠被重新publish到另乙個exchange,這個exchange就是dlx。

訊息變成死信情況

dlx也是乙個正常的exchange,和一般的exchange沒有區別,他能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。

當這個佇列中有死信時,rabbitmq就會自動的將這個訊息重新發布到設定的exchange上去,進而被路由到另乙個佇列。 

可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補rabbitmq3.0以前支援的immediate引數的功能。 

死信佇列設定

首先要設定死信佇列的exchange和queue,並進行繫結,然後我們進行正常宣告交換機,佇列,繫結,只不過我們需要在佇列加上乙個引數:arguments.put("x-dead-letter-exchange","dlx.exchange");

這樣訊息在過期、request、佇列達到最大長度時, 訊息就可以直接路由到死信佇列。 

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte body) throws ioexception

}

public class consumer 

}

public class producer 

}}

RabbitMQ 訊息廣播

rabbitmq訊息模型的核心理念是 發布者 producer 不會直接傳送任何訊息給佇列。事實上,發布者 producer 甚至不知道訊息是否已經被投遞到佇列。發布者 producer 只需要把訊息傳送給乙個交換機 exchange 交換機非常簡單,它一邊從發布者方接收訊息,一邊把訊息推送到佇列。...

RabbitMQ 廣播訊息

定義 廣播訊息是指生產者產生的訊息將分發給所有訂閱這個訊息的消費者,而普通的模式是 一批訊息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模組就是共同消費模組 而今天說的是廣播訊息,它是指一些訊息同時被推送到多個訂閱者,而這些訂閱者收...

RabbitMQ 廣播訊息

定義 廣播訊息是指生產者產生的訊息將分發給所有訂閱這個訊息的消費者,而普通的模式是 一批訊息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模組就是共同消費模組 而今天說的是廣播訊息,它是指一些訊息同時被推送到多個訂閱者,而這些訂閱者收...