企業(yè)電話認(rèn)證百度seo教程視頻
說明:在RabbitMQ消息傳遞過程中,有以下問題:
-
消息沒發(fā)到交換機(jī)
-
消息沒發(fā)到隊列
-
MQ宕機(jī),消息在隊列中丟失
-
消息者接收到消息后,未能正常消費(程序報錯),此時消息已在隊列中移除
針對以上問題,提供以下解決方案:
-
消息確認(rèn):確認(rèn)消息是否發(fā)送到交換機(jī)、隊列;
-
消息持久化:持久化消息,以防MQ宕機(jī)造成消息丟失;
-
消費者消息確認(rèn):確認(rèn)消費者已正確消費消息,才把消息從隊列中刪除;
消息確認(rèn)
可以使用Rabbit MQ提供的publisher confirm機(jī)制來避免消息發(fā)送到MQ過程丟失。具體實現(xiàn)是,publisher-confirm(發(fā)送者確定)、publisher-return(發(fā)送者回執(zhí)),前者判斷消息到交換機(jī)、后者判斷交換機(jī)到隊列
publisher-confirm(發(fā)送者確定)
-
消息成功投遞到交換機(jī),返回ack;
-
消息未投遞到交換機(jī),返回nack;
publisher-return(發(fā)送者回執(zhí))
- 消息投遞到交換機(jī),但沒有到隊列,返回ack,即失敗原因;
在生產(chǎn)者端添加配置
spring:rabbitmq:# rabbitMQ相關(guān)配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 開啟生產(chǎn)者確認(rèn),correlated為異步,simple為同步publisher-confirm-type: correlated# 開啟publish-return功能,基于callback機(jī)制publisher-returns: true# 開啟消息路由失敗的策略,true是調(diào)用returnCallback方法,false是丟棄消息template:mandatory: true
publisher-return(發(fā)送者回執(zhí))代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 發(fā)送者回執(zhí)實現(xiàn)*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplate對象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 設(shè)置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回執(zhí)信息* @param message 信息對象* @param replyCode 回執(zhí)碼* @param replyText 回執(zhí)內(nèi)容* @param exchange 交換機(jī)* @param routingKey 路由鍵值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息發(fā)送隊列失敗=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}
publisher-confirm(發(fā)送者確定)代碼
@Testpublic void sendExceptionMessage() {// 路由鍵值String routingKey = "exception";// 消息String message = "This is a exception message";// 給消息設(shè)置一個唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 編寫confirmCallBack回調(diào)函數(shù)correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息發(fā)送交換機(jī)成功log.debug("消息送達(dá)至交換機(jī)成功");} else {// 消息發(fā)送交換機(jī)失敗,打印消息log.error("消息未能送達(dá)至交換機(jī),ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息發(fā)送交換機(jī)異常@Overridepublic void onFailure(Throwable ex) {log.error("消息發(fā)送交換機(jī)異常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}
測試,設(shè)置一個不存在的routingKey,被發(fā)送者確認(rèn)(publisher-confirm)捕獲到;
// 路由鍵值
String routingKey = "null";
設(shè)置一個不存在的路由,被發(fā)送者回執(zhí)(publisher-return)捕獲到;
rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);
消息持久化
消息持久化,是指把消息保存到磁盤中,在RabbitMQ宕機(jī)或者關(guān)機(jī)時,重啟后,消息仍可以保存下來。消息依賴于交換機(jī)、隊列,因此持久化消息,同時也需要持久化交換機(jī)、隊列。
創(chuàng)建一個持久化的交換機(jī)、隊列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交換機(jī)持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三個參數(shù)分別是:交換機(jī)名、是否持久化、沒有隊列與之綁定時是否自動刪除return new DirectExchange("durable.direct",true,false);}/*** 隊列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交換機(jī)與隊列綁定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}
發(fā)送一個持久化的消息
/*** 發(fā)送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 設(shè)置該消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}
打開RabbitMQ管理平臺,可以看到"delivery_mode: 2",表示該消息是持久化消息
(源碼:MessageDeliveryMode類)
實際上,交換機(jī)、隊列默認(rèn)就是持久化的(durable: true),所以不用特意設(shè)置;
消費者消息確認(rèn)
介紹
消費者消息確認(rèn),是為了確保消費者已經(jīng)消費了消息,才讓MQ把該消息刪除;
可通過在消費者的配置文件中增加下面這行配置實現(xiàn),備選項有以下三個:
-
none:關(guān)閉ack,表示不做處理,消息發(fā)給消費者之后就立即被刪除;
-
auto:自動ack,表示由Spring檢測代碼是否出現(xiàn)異常,出現(xiàn)異常則保留消息,沒有異常則刪除消息;
-
manual:手動ack,可根據(jù)業(yè)務(wù)手動編寫代碼,返回ack;
spring:rabbitmq:listener:simple:# 設(shè)置消息確認(rèn)模式acknowledge-mode: none
測試:none
可編寫代碼測試,下面是生產(chǎn)者代碼,發(fā)送消息
/*** 發(fā)送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}
消費者代碼有問題,未能正常消費消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}
測試結(jié)果,程序報錯,消息也沒能保留下來
測試:auto
更改設(shè)置為:auto,重試
但是消息未被刪除
這種情況,在實際開發(fā)中是不能允許,可以通過更改消費失敗的重試機(jī)制解決。
消費失敗重試機(jī)制
方法一:設(shè)置retry
因為消息被消費失敗,消息會一直循環(huán)重試,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力,這種情況可以通過在消費者端添加以下配置,限制失敗重試的條件來解決:
spring:rabbitmq:listener:simple:retry:# 開啟消費者失敗重試enabled: true# 初次失敗等待時長為1秒initial-interval: 1000# 失敗的等待時長倍數(shù),即后一次等待的時間是前一次等待時間的多少倍multiplier: 1# 最多重試次數(shù)max-attempts: 3# true 無狀態(tài) false 有狀態(tài) 如果業(yè)務(wù)中包含事務(wù) 改為falsestateless: true
開啟后,控制臺可以發(fā)現(xiàn),信息不回一直循環(huán)打印,而是打印數(shù)條后停止,日志信息中有提示“Retry Policy Exhausted”(重試策略已用盡)
這種通過配置的方式,并不會重試數(shù)次后仍保留消息,而是重試數(shù)次仍失敗,隨即丟棄消息,消息丟失,這在實際開發(fā)中也是不能被允許的。
方法二:路由存儲消息
因此,可以通過下面這個方法,把消費失敗的消息,通過交換機(jī)路由到另外的隊列中存儲起來,等業(yè)務(wù)代碼被修復(fù),再路由回來消費。
代碼如下
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 錯誤消息隊列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 創(chuàng)建一個交換機(jī),用于路由消費失敗的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 創(chuàng)建一個隊列,用于存儲消費失敗的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 綁定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,當(dāng)消費失敗時,把消費失敗的消息路由到此隊列中,路由key為"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}
可以看到,消息消費失敗后并沒有被丟失,而是路由到錯誤隊列中存儲了起來。因為錯誤隊列沒有設(shè)置RabbitListener,所以可以存儲消息,等帶代碼問題被排查出來后,可以再針對該隊列設(shè)置監(jiān)聽方法,消費這部分錯誤的消息。
另外,值得一提的是,消費者這邊的控制臺會報一個警告,提示路由密鑰錯誤。我們可以理解,在RabbitMQ底層,會把消費失敗了的消息,統(tǒng)一路由到一個地方去,而我們這種手動把消費失敗的消息路由到自定義的隊列中的方式,打破了這種“默認(rèn)的規(guī)則”,所以報了一個這樣的警告。這種警告是在可控范圍內(nèi)的。
總結(jié)
RabbitMQ發(fā)送消息,為了確保消息的可靠性,保證消息能被交換機(jī)、隊列收到,消息能被正常消費,而不會因消費失敗而丟失,提供了對應(yīng)的一系列方法,并且最后還提供了兩種消費失敗重試方法,優(yōu)化了消費過程,非常Nice。