wordpress主題下新建頁面網(wǎng)站seo站外優(yōu)化
SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丟失、消息重復(fù)、消息順序、消息順序)
- RabbitMQ常見問題解決方案
- 問題一:消息丟失的解決方案
- (1)生成者丟失消息
- 丟失的情景
- 解決方案1:發(fā)送方確認(rèn)機(jī)制(推薦,最常用)
- 解決方案2:事務(wù)(不推薦,因?yàn)樾阅懿?#xff09;
- (2)MQ丟失消息
- 丟失的情景
- 解決方案:開啟RabbitMQ的持久化+開啟鏡像隊(duì)列
- (3)消費(fèi)者丟失消息
- 丟失的情景 1
- 解決方案:無需解決
- 丟失的情景 2
- 擴(kuò)展:重試機(jī)制
- 解決方案:消費(fèi)者方確認(rèn)機(jī)制(推薦,最常用)
- 問題二:消息重復(fù)的解決方案
- 什么時(shí)候會(huì)重復(fù)消費(fèi)
- 如何解決
- 問題三:保證消息順序的解決方案
- 單一隊(duì)列和單一消費(fèi)者模式(RabbitMQ)
- 問題四:消息堆積的解決方案
- 消息堆積原因
- 預(yù)防措施
- 已出事故的解決措施
RabbitMQ常見問題解決方案
問題一:消息丟失的解決方案
首先明確一條消息的傳送流程:生產(chǎn)者->MQ->消費(fèi)者
所以這三個(gè)節(jié)點(diǎn)都可能丟失數(shù)據(jù)
(1)生成者丟失消息
丟失的情景
發(fā)送消息過程中出現(xiàn)網(wǎng)絡(luò)問題:生產(chǎn)者以為發(fā)送成功,但RabbitMQ server沒有收到
解決方案1:發(fā)送方確認(rèn)機(jī)制(推薦,最常用)
發(fā)送方確認(rèn)機(jī)制最大的好處在于它是異步的,等信道返回ark確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息(不會(huì)堵塞其他消息的發(fā)送)
(一)修改application.properties配置
# 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
spring.rabbitmq.publisher-confirms=true #舊版本
spring.rabbitmq.publisher-confirm-type=correlated #新版本
# 確認(rèn)消息已發(fā)送到隊(duì)列(Queue)
spring.rabbitmq.publisher-returns=true
springBoot 2.2.0.RELEASE版本之前 是使用 spring.rabbitmq.publisher-confirms=true
在2.2.0及之后 使用spring.rabbitmq.publisher-confirm-type=correlated 屬性配置代替
(二)新建配置文件RabbitTemplate
對(duì)于 發(fā)送確認(rèn) 寫法有多種方式,以下的是其中的一種方式
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//setMandatory設(shè)置表示:消息在沒有被隊(duì)列接收時(shí)是否應(yīng)該被退回給生產(chǎn)者(true:退回;false:丟棄)。//通常與yml配置文件中的publisher-returns配合一起使用,若不配置該項(xiàng),setReutrnCallback將不會(huì)有消息返回rabbitTemplate.setMandatory(true);//幫助生產(chǎn)者判斷 確認(rèn)消息是否成功發(fā)送到RabbitMQ//ack 為true表示已發(fā)送成功 false表示發(fā)送失敗rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {System.out.println("ConfirmCallback: "+"相關(guān)數(shù)據(jù):"+correlationData);System.out.println("ConfirmCallback: "+"確認(rèn)情況:"+ack);System.out.println("ConfirmCallback: "+"原因:"+cause);});//當(dāng)消息無法 放到隊(duì)列里面時(shí) 返回的提醒rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("ReturnCallback: "+"消息:"+message);System.out.println("ReturnCallback: "+"回應(yīng)碼:"+replyCode);System.out.println("ReturnCallback: "+"回應(yīng)信息:"+replyText);System.out.println("ReturnCallback: "+"交換機(jī):"+exchange);System.out.println("ReturnCallback: "+"路由鍵:"+routingKey);});return rabbitTemplate;}
}
解決方案2:事務(wù)(不推薦,因?yàn)樾阅懿?#xff09;
RabbitMQ提供的事務(wù)功能,在生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟RabbitMQ事務(wù)
(2)MQ丟失消息
丟失的情景
RabbitMQ服務(wù)端接收到消息后由于服務(wù)器宕機(jī)或重啟等原因(消息默認(rèn)存在內(nèi)存中)導(dǎo)致消息丟失;
解決方案:開啟RabbitMQ的持久化+開啟鏡像隊(duì)列
RabbitMQ的持久化分為三個(gè)部分:交換器的持久化、隊(duì)列的持久化、消息的持久化
三者 都 持久化 才能保證 RabbitMQ服務(wù)重啟之后,消息才能存在且能發(fā)出去
交換機(jī)持久化
交換機(jī)持久化描述的是當(dāng)這個(gè)交換機(jī)上沒有注冊(cè)隊(duì)列時(shí),這個(gè)交換機(jī)是否刪除。
如果要打開持久化的話也很簡單 (上面列子都是有體現(xiàn)的)
//定義直接交換機(jī)
@Bean
public DirectExchange directExchange() {//第一個(gè)參數(shù):定義交換機(jī)的名稱,第二個(gè)參數(shù):是否持久化,第三個(gè)參數(shù):是否自動(dòng)刪除return new DirectExchange("directExchange", true, false);
}
隊(duì)列持久化
隊(duì)列持久化描述的是當(dāng)這個(gè)隊(duì)列沒有消費(fèi)者在監(jiān)聽時(shí),是否進(jìn)行刪除。
持久化做法:
//定義隊(duì)列
@Bean
public Queue directQueue() {//第一個(gè)參數(shù):隊(duì)列的名稱,第二個(gè)參數(shù):是否持久化return new Queue("directQueue", true);
}
消息持久化
關(guān)鍵配置 持久化(MessageDeliveryMode.PERSISTENT)
@Test
public void testDurableMessage() {// 1.準(zhǔn)備消息Message message = MessageBuilder.withBody("hello, rabbitmq".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.消息ID,封裝到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.發(fā)送消息rabbitTemplate.convertAndSend("simple.queue", message, correlationData);log.info("發(fā)送消息成功");
}
(3)消費(fèi)者丟失消息
丟失的情景 1
RabbitMQ服務(wù)端向消費(fèi)者發(fā)送完消息之后,網(wǎng)絡(luò)斷了,消息并沒有到達(dá)消費(fèi)者
解決方案:無需解決
無需解決。因?yàn)榇饲榫跋路?wù)端收不到確認(rèn)消息,會(huì)再次發(fā)送的。
丟失的情景 2
啟用了重試機(jī)制,重試指定次數(shù)之后,還沒成功,但消息被確認(rèn)。
擴(kuò)展:重試機(jī)制
重試機(jī)制的三大前提
- 重試模式已啟用:通過配置 spring.rabbitmq.listener.simple.retry.enabled=true 來啟用重試模式。
- 拋出了異常:在 @RabbitListener 標(biāo)注的方法中拋出了異常,通常是 RuntimeException 或 Error。
Spring AMQP 會(huì)捕獲這些異常并根據(jù)配置的重試策略來重試消息。- 未達(dá)到最大重試次數(shù):消息的重試次數(shù)尚未達(dá)到配置的最大值(spring.rabbitmq.listener.simple.retry.maxAttempts)。
配置以下即可實(shí)現(xiàn)重試操作
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 重試次數(shù)(默認(rèn)3次)
spring.rabbitmq.listener.simple.retry.max-attempts=5
解決方案:消費(fèi)者方確認(rèn)機(jī)制(推薦,最常用)
改成手動(dòng)后就 可以實(shí)現(xiàn) “先操作業(yè)務(wù)邏輯(數(shù)據(jù)庫操作)后,再手動(dòng)從隊(duì)列上刪除這個(gè)消息” 的動(dòng)作
其中“從隊(duì)列上刪除這個(gè)消息“這個(gè)動(dòng)作體現(xiàn)就是 使用 channel.basicAck 去完成的。
切記改成手動(dòng)后,這個(gè)channel.basicAck方法一定要寫。
(一)修改application.properties配置
# 設(shè)置消費(fèi)端手動(dòng) ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(二)修改Service接收信息項(xiàng)
當(dāng)消息在進(jìn)入 emailProcess、smsProcess(被@RabbitListener注解) 方法時(shí)就已經(jīng)被視為“接收到了”,但是需要 你 執(zhí)行 channel.basicAck(手動(dòng)確認(rèn))才能讓這個(gè)消息從隊(duì)列上刪除。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class DirectReceiver {@RabbitHandler@RabbitListener(queues = "emailQueue") //監(jiān)聽的隊(duì)列名稱public void emailProcess(Channel channel, Message message) throws IOException {try{System.out.println(new String(message.getBody(),"UTF-8"));//TODO 具體業(yè)務(wù).......//你使用手動(dòng)消息確認(rèn)模式時(shí),basicAck 一定要執(zhí)行,不然會(huì)導(dǎo)致會(huì)保留在隊(duì)列中,無法被消費(fèi)//第1個(gè)參數(shù)表示消息投遞序號(hào)//第2個(gè)參數(shù)false只確認(rèn)當(dāng)前一個(gè)消息收到(大多數(shù)情況下都設(shè)置為false),true確認(rèn)所有consumer獲得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//若是消息沒有成功接收,第二個(gè)參數(shù)設(shè)置為true的話,代表重新放回隊(duì)列中,false則為丟棄,在此也可以做成放置死信隊(duì)列的操作channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
確認(rèn)和拒絕消息:
- basicAck: 這個(gè)方法用于確認(rèn)消息已被成功處理。
第一個(gè)參數(shù)是消息的delivery tag(用于標(biāo)識(shí)消息),
第二個(gè)參數(shù)指定是否批量確認(rèn)(false
表示只確認(rèn)當(dāng)前消息)。- basicReject: 這個(gè)方法用于拒絕消息。
第一個(gè)參數(shù)同樣是delivery tag,
第二個(gè)參數(shù)指定是否將消息重新放回隊(duì)列(false
表示不重新放回,即丟棄消息)。
方法解釋:
- emailProcess: 這個(gè)方法監(jiān)聽
emailQueue
隊(duì)列。
當(dāng)隊(duì)列中有消息時(shí),它會(huì)打印出消息的內(nèi)容,并嘗試確認(rèn)消息。
如果處理過程中發(fā)生異常,它會(huì)拒絕消息,但不會(huì)重新放回隊(duì)列(第二個(gè)參數(shù)為false
)。
問題二:消息重復(fù)的解決方案
什么時(shí)候會(huì)重復(fù)消費(fèi)
1.自動(dòng)提交模式時(shí)
消費(fèi)者收到消息后,要自動(dòng)提交,但提交后,網(wǎng)絡(luò)出故障,RabbitMQ服務(wù)器沒收到提交消息,那么此消息會(huì)被重新放入隊(duì)列,會(huì)再次發(fā)給消費(fèi)者。
2.手動(dòng)提交模式時(shí)
情景1:網(wǎng)絡(luò)故障問題,同上。
情景2:接收到消息并處理結(jié)束了,此時(shí)消費(fèi)者掛了,沒有手動(dòng)提交消息。
總體來說就是:網(wǎng)絡(luò)不可達(dá)、消費(fèi)端宕機(jī)。
如何解決
消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性
比如你拿個(gè)數(shù)據(jù)要寫庫,先根據(jù)主鍵查一下,如果這數(shù)據(jù)有了,就別插入了,update 一下。
比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
問題三:保證消息順序的解決方案
單一隊(duì)列和單一消費(fèi)者模式(RabbitMQ)
在RabbitMQ中,可以確保一個(gè)隊(duì)列只被一個(gè)消費(fèi)者消費(fèi),這樣可以保證消息按照發(fā)送的順序被處理。
因?yàn)殛?duì)列本身就是一個(gè)先進(jìn)先出的結(jié)構(gòu)。
適用場景:RabbitMQ用戶且對(duì)消息順序有嚴(yán)格要求的場景。
優(yōu)點(diǎn):實(shí)現(xiàn)簡單,易于管理。
缺點(diǎn):可能成為性能瓶頸,在處理大量消息時(shí)需要考慮擴(kuò)展性。
問題四:消息堆積的解決方案
消息堆積原因
消息堆積即消息沒及時(shí)被消費(fèi),是生產(chǎn)者生產(chǎn)消息速度快于消費(fèi)者消費(fèi)的速度導(dǎo)致的。
消費(fèi)者消費(fèi)慢可能是因?yàn)?#xff1a;本身邏輯耗費(fèi)時(shí)間較長、阻塞了。
預(yù)防措施
生產(chǎn)者
1.減少發(fā)布頻率
3.考慮使用隊(duì)列最大長度限制
消費(fèi)者
1.優(yōu)化代碼
已出事故的解決措施
情況1:堆積的消息還需要使用
方案1:簡單修復(fù)
修復(fù) 消費(fèi)者(consumer)的問題,讓他恢復(fù)消費(fèi)速度,然后等待幾個(gè)小時(shí)消費(fèi)完畢
方案2:復(fù)雜修復(fù)
單隊(duì)列消費(fèi)轉(zhuǎn)變?yōu)槎嚓?duì)列并行消費(fèi)
也是需要先 修復(fù) 消費(fèi)者(consumer)的問題,再進(jìn)行下面的步驟
步驟 1: 隊(duì)列和路由設(shè)置
1.創(chuàng)建新隊(duì)列:在RabbitMQ中創(chuàng)建10個(gè)新隊(duì)列,每個(gè)隊(duì)列分配一個(gè)獨(dú)特的名稱。
2. 設(shè)置交換機(jī):定義一個(gè)直連型(Direct)交換機(jī)。
3. 綁定路由鍵:將每個(gè)新隊(duì)列通過唯一的路由鍵綁定到直連型交換機(jī)上。
偽代碼例子:
// 假設(shè)這是配置類的一部分
@Bean
Queue queue1() { return new Queue("queue1", false);
}
@Bean
Queue queue2() { return new Queue("queue2", false);
}
// 以此類推,為其他9個(gè)隊(duì)列創(chuàng)建Bean
.........
@Bean
DirectExchange exchange() { return new DirectExchange("myExchange");
}
@Bean
Binding binding1(Queue queue1, DirectExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("routingKey1");
}
@Bean
Binding binding2(Queue queue2, DirectExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("routingKey2");
}
// 以此類推,為其他隊(duì)列和路由鍵創(chuàng)建綁定
......
步驟 2: 消息分發(fā)
1.接收堆積數(shù)據(jù):現(xiàn)有消費(fèi)者(或分發(fā)者)接收從發(fā)送者處堆積的數(shù)據(jù)。
2.分發(fā)到新隊(duì)列:實(shí)現(xiàn)分發(fā)邏輯,將接收到的消息根據(jù)路由鍵分發(fā)到相應(yīng)的10個(gè)新隊(duì)列中。
偽代碼例子:
@RabbitListener(queues = "oldQueue")
public void emailProcess(Message message, Channel channel) throws IOException { try { // 生成1-10之間的順序數(shù) SequentialRandom sequentialRandom = new SequentialRandom()String key = sequentialRandom.getNextSequentialRandom();// 重新發(fā)送消息到交換機(jī),交換機(jī)將根據(jù)routingKey將消息路由到正確的隊(duì)列 rabbitTemplate.convertAndSend("myExchange", "routingKey"+key, new String(message.getBody(),"UTF-8")); // 確認(rèn)原始隊(duì)列中的消息(如果您想要的話) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 處理異常,可能包括記錄日志、發(fā)送警報(bào)等 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }
}
public class SequentialRandom { private int currentIndex = 1; // 初始索引為1 /** * 獲取下一個(gè)順序數(shù)* @return 下一個(gè)數(shù)字,從1到10循環(huán) */ public int getNextSequentialRandom() { int next = currentIndex; currentIndex = (currentIndex % 10) + 1; // 使用模運(yùn)算實(shí)現(xiàn)循環(huán),并更新索引 return next; }
}
步驟 3: 并行消費(fèi)
1.開發(fā)新消費(fèi)端:編寫新的消費(fèi)端程序,該程序能夠監(jiān)并處理來自10個(gè)新隊(duì)列的消息。
2. 部署并啟動(dòng):將新消費(fèi)端程序部署到服務(wù)器,并啟動(dòng)它以開始并行消費(fèi)。
偽代碼例子:
@Component
public class ParallelConsumer { @RabbitListener(queues = {"queue1"}) public void receiveMessage1(Message message) { // 處理消息 } @RabbitListener(queues = {"queue2"}) public void receiveMessage2(Message message) { // 處理消息 } // ... @RabbitListener(queues = {"queue10"}) public void receiveMessage3(Message message) { // 處理消息 }
}
情況2:堆積的消息不需要使用
刪除消息即可。(可以在RabbitMQ控制臺(tái)刪除,或者使用命令)。