用凡科做的網(wǎng)站保存不了sem專員
目錄
- 確保消息的可靠性
- RabbitMQ 消息發(fā)送可靠性
- 分析
- 解決方案
- 開啟事務機制
- 發(fā)送方確認機制
- 單條消息處理
- 消息批量處理
- 失敗重試
- 自帶重試機制
- 業(yè)務重試
- RabbitMQ 消息消費可靠性
- 如何保證消息在隊列
- RabbitMQ 的消息消費,整體上來說有兩種不同的思路:
- 確保消費成功兩種思路
- 消息確認
- 自動確認
- 手動確認
- 推模式手動確認
- 拉模式手動確認
- 消息拒絕
- 總結:如何保證消息的可靠性。
- 冪等性問題
- 背景
- 解決思路
- 代碼
確保消息的可靠性
先確定消息可能在哪些位置丟失—不同的位置可以有不同的解決方案
- 發(fā)送過程
- 從生產(chǎn)者到交換機
- 從交換機到隊列
- 消費過程
- 消息在隊列中
- 消費者消費
RabbitMQ 消息發(fā)送可靠性
分析
-
目標
- 消息成功到達 Exchange
- 消息成功到達 Queue
-
如果能確認這兩步,那么我們就可以認為消息發(fā)送成功了。
-
如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達,此時我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達,則可能就需要人工介入了。
-
經(jīng)過上面的分析,我們可以確認,要確保消息成功發(fā)送,我們只需要做好三件事就可以了:
- 確認消息到達 Exchange。
- 確認消息到達 Queue。
- 開啟定時任務,定時投遞那些發(fā)送失敗的消息
解決方案
-
如何確保消息成功到達 RabbitMQ?RabbitMQ 給出了兩種方案:
- 開啟事務機制
- 發(fā)送方確認機制
-
這是兩種不同的方案,不可以同時開啟,只能選擇其中之一,如果兩者同時開啟,則會報如下錯誤
開啟事務機制
-
事務管理器
@Configuration public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);} }
-
消息生產(chǎn)者:添加事務注解并設置通信信道為事務模式
@Service public class MqService {@Resourceprivate RabbitTemplate rabbitTemplate;@Transactional //標記事務public void send() {rabbitTemplate.setChannelTransacted(true);//開啟事務模式rabbitTemplate.convertAndSend("mq_exchange_name","mq_queue_name","hello rabbitmq!".getBytes());int i = 1 / 0;//運行時必然拋出異常,我們可以嘗試運行該方法,發(fā)現(xiàn)消息并未發(fā)送成功} }
當我們開啟事務模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會多出四個步驟:
- 客戶端發(fā)出請求,將信道設置為事務模式。
- 服務端給出回復,同意將信道設置為事務模式。
- 客戶端發(fā)送消息。
- 客戶端提交事務。
- 服務端給出響應,確認事務提交。
上面的步驟,除了第三步是本來就有的,其他幾個步驟都是平白無故多出來的。所以大家看到,事務模式其實效率有點低,這并非一個最佳解決方案。我們可以想想,什么項目會用到消息中間件?一般來說都是一些高并發(fā)的項目,這個時候并發(fā)性能尤為重要。
所以,RabbitMQ 還提供了發(fā)送方確認機制(publisher confirm)來確保消息發(fā)送成功,這種方式,性能要遠遠高于事務模式
發(fā)送方確認機制
單條消息處理
-
配置文件:開啟消息發(fā)送方確認機制
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達交換器的確認回調(diào)publisher-returns: true #配置消息到達隊列的回調(diào) # publisher-confirm-type有三個值 : # none:表示禁用發(fā)布確認模式,默認即此。 # correlated:表示成功發(fā)布消息到交換器后會觸發(fā)的回調(diào)方法。 # simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調(diào)用。
-
開啟兩個監(jiān)聽
/*** @author: zjl* @datetime: 2024/5/9* @desc:* 定義配置類,實現(xiàn) RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個接口,* 這兩個接口,前者的回調(diào)用來確定消息到達交換器,后者則會在消息路由到隊列失敗時被調(diào)用。* * 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,* 在該方法中為 rabbitTemplate 分別配置這兩個 Callback。*/ @Configuration @Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {public static final String MQ_EXCHANGE_NAME = "mq_exchange_name";public static final String MQ_QUEUE_NAME = "mq_queue_name";@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic Queue queue() {return new Queue(MQ_QUEUE_NAME);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MQ_EXCHANGE_NAME);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(MQ_QUEUE_NAME);}@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("{}:消息成功到達交換器",correlationData.getId());}else{log.error("{}:消息發(fā)送失敗", correlationData.getId());}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("{}:消息未成功路由到隊列",message.getMessageProperties().getMessageId());} }
-
測試
首先嘗試將消息發(fā)送到一個不存在的交換機中@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";} }
給定一個真實存在的交換器,但是給一個不存在的隊列
@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME,"RabbitConfig.MQ_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";} }
可以看到,消息雖然成功達到交換器了,但是沒有成功路由到隊列(因為隊列不存在)
消息批量處理
- 如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的,這里不再贅述。
- 這就是 publisher-confirm 模式。相比于事務,這種模式下的消息吞吐量會得到極大的提升
失敗重試
- 失敗重試分兩種情況,一種是壓根沒找到 MQ 導致的失敗重試,另一種是找到 MQ 了,但是消息發(fā)送失敗了
自帶重試機制
- 前面所說的事務機制和發(fā)送方確認機制,都是發(fā)送方確認消息發(fā)送成功的辦法。
- 如果發(fā)送方一開始就連不上 MQ,那么 Spring Boot 中也有相應的重試機制,但是這個重試機制就和 MQ 本身沒有關系了,這是利用 Spring 中的 retry 機制來完成的
-
配置
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達交換器的確認回調(diào)publisher-returns: true #配置消息到達隊列的回調(diào)template: retry:enabled: true # 開啟重試機制initial-interval: 1000ms # 重試起始間隔時間max-attempts: 10 # 最大重試次數(shù)max-interval: 10000ms # 最大重試間隔時間multiplier: 2 # 間隔時間乘數(shù)。(這里配置間隔時間乘數(shù)為 2,則第一次間隔時間 1 秒,第二次重試間隔時間 2 秒,第三次 4 秒,以此類推)
-
再次啟動 Spring Boot 項目,然后關掉 MQ,此時嘗試發(fā)送消息,就會發(fā)送失敗,進而導致自動重試
業(yè)務重試
- 業(yè)務重試主要是針對消息沒有到達交換機的情況
- 如果消息沒有成功到達交換機,此時就會觸發(fā)消息發(fā)送失敗回調(diào),我們可以利用起來這個回調(diào)
- 下面說一下整體思路
-
準備數(shù)據(jù)庫表
DROP TABLE IF EXISTS `service_msg_mq_info`; CREATE TABLE `service_msg_mq_info` (`msgid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`empid` int(11) NULL DEFAULT NULL,`status` int(11) NULL DEFAULT NULL,`routekey` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`count` int(11) NULL DEFAULT NULL,`trytime` datetime NULL DEFAULT NULL,`createtime` datetime NULL DEFAULT NULL,`updatetime` datetime NULL DEFAULT NULL,PRIMARY KEY (`msgid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
status:表示消息的狀態(tài),有三個取值,0,1,2 分別表示消息發(fā)送中、消息發(fā)送成功以及消息發(fā)送失敗。
tryTime:表示消息的第一次重試時間(消息發(fā)出去之后,在 tryTime 這個時間點還未顯示發(fā)送成功,此時就可以開始重試了)。
count:表示消息重試次數(shù)。
-
每次發(fā)送消息的時候,就往數(shù)據(jù)庫中添加一條記錄
-
在消息發(fā)送的時候,我們就往該表中保存一條消息發(fā)送記錄,并設置狀態(tài) status 為 0,tryTime 為 1 分鐘之后
-
在消息發(fā)送的時候,我們就往該表中保存一條消息發(fā)送記錄,并設置狀態(tài) status 為 0,tryTime 為 1 分鐘之后
-
另外開啟一個定時任務,定時任務每隔 10s 就去數(shù)據(jù)庫中撈一次消息,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時間記錄,把這些消息拎出來后,首先判斷其重試次數(shù)是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發(fā)送失敗,并且不再重試。對于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息,并且為其 count 的值+1
當然這種思路有兩個弊端:
- 去數(shù)據(jù)庫走一遭,可能拖慢 MQ 的 Qos,不過有的時候我們并不需要 MQ 有很高的 Qos,所以這個應用時要看具體情況。
- 按照上面的思路,可能會出現(xiàn)同一條消息重復發(fā)送的情況,不過這都不是事,我們在消息消費時,解決好冪等性問題就行了。
當然,大家也要注意,消息是否要確保 100% 發(fā)送成功,也要看具體情況。
RabbitMQ 消息消費可靠性
如何保證消息在隊列
- 隊列持久化—》創(chuàng)建的時候設置持久化
- 搭建rabbitmq集群–保證高可用
RabbitMQ 的消息消費,整體上來說有兩種不同的思路:
-
推(push):MQ 主動將消息推送給消費者,這種方式需要消費者設置一個緩沖區(qū)去緩存消息,對于消費者而言,內(nèi)存中總是有一堆需要處理的消息,所以這種方式的效率比較高,這也是目前大多數(shù)應用采用的消費方式。這種方式通過 @RabbitListener 注解去標記消費者,如以下代碼,當監(jiān)聽的隊列中有消息時,就會觸發(fā)該方法
@Component public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void handle(String msg) {System.out.println("msg = " + msg);} }
-
拉(pull):消費者主動從 MQ 拉取消息,這種方式效率并不是很高,不過有的時候如果服務端需要批量拉取消息,倒是可以采用這種方式
@Test public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);System.out.println("o = " + new String(((byte[]) o),"UTF-8")); }
- 調(diào)用 receiveAndConvert 方法,方法參數(shù)為隊列名稱,
- 方法執(zhí)行完成后,會從 MQ 上拉取一條消息下來,如果該方法返回值為 null,表示該隊列上沒有消息了。
- receiveAndConvert 方法有一個重載方法,可以在重載方法中傳入一個等待超時時間,例如 3 秒。
- 此時,假設隊列中沒有消息了,則 receiveAndConvert 方法會阻塞 3 秒,3 秒內(nèi)如果隊列中有了新消息就返回,3 秒后如果隊列中還是沒有新消息,就返回 null,這個等待超時時間要是不設置的話,默認為 0
-
這是消息兩種不同的消費模式
-
如果需要從消息隊列中持續(xù)獲得消息,就可以使用推模式;
-
如果只是單純的消費一條消息,則使用拉模式即可。
-
切忌將拉模式放到一個死循環(huán)中,變相的訂閱消息,這會嚴重影響 RabbitMQ 的性能
確保消費成功兩種思路
- 為了保證消息能夠可靠的到達消息消費者,RabbitMQ 中提供了消息消費確認機制。
- 當消費者去消費消息的時候,可以通過指定 autoAck 參數(shù)來表示消息消費的確認方式
- 當 autoAck 為 false 的時候,此時即使消費者已經(jīng)收到消息了,RabbitMQ 也不會立馬將消息移除,而是等待消費者顯式的回復確認信號后,才會將消息打上刪除標記,然后再刪除。
- 當 autoAck 為 true 的時候,此時消息消費者就會自動把發(fā)送出去的消息設置為確認,然后將消息移除(從內(nèi)存或者磁盤中),即使這些消息并沒有到達消費者。
- 屬性解釋
- Ready 表示待消費的消息數(shù)量。
- Unacked 表示已經(jīng)發(fā)送給消費者但是還沒收到消費者 ack 的消息數(shù)量。
- 當我們將 autoAck 設置為 false 的時候,對于 RabbitMQ 而言,消費分成了兩個部分:
- 待消費的消息
- 已經(jīng)投遞給消費者,但是還沒有被消費者確認的消息
- 換句話說,當設置 autoAck 為 false 的時候,消費者就變得非常從容了,它將有足夠的時間去處理這條消息,當消息正常處理完成后,再手動 ack,此時 RabbitMQ 才會認為這條消息消費成功了。
- 如果 RabbitMQ 一直沒有收到客戶端的反饋,并且此時客戶端也已經(jīng)斷開連接了,那么 RabbitMQ 就會將剛剛的消息重新放回隊列中,等待下一次被消費。
綜上所述,確保消息被成功消費,無非就是手動 Ack 或者自動 Ack,無他。當然,無論這兩種中的哪一種,最終都有可能導致消息被重復消費,所以一般來說我們還需要在處理消息時,解決冪等性問題。
消息確認
自動確認
- 在 Spring Boot 中,默認情況下,消息消費就是自動確認的
- 通過 @Componet 注解將當前類注入到 Spring 容器中,然后通過 @RabbitListener 注解來標記一個消息消費方法
- 默認情況下,消息消費方法自帶事務,即如果該方法在執(zhí)行過程中拋出異常,那么被消費的消息會重新回到隊列中等待下一次被消費
- 如果該方法正常執(zhí)行完沒有拋出異常,則這條消息就算是被消費了
@Component public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void receive1(String msg) {System.out.println("msg = " + msg);int i = 1 / 0;} }
手動確認
-
配置:修改為手動確認模式
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達交換器的確認回調(diào)publisher-returns: true #配置消息到達隊列的回調(diào)template:retry:enabled: trueinitial-interval: 1000msmax-attempts: 10max-interval: 10000msmultiplier: 2listener:simple:acknowledge-mode: manual
推模式手動確認
- 將消費者要做的事情放到一個 try…catch 代碼塊中。
- 如果消息正常消費成功,則執(zhí)行 basicAck 完成確認。
- 如果消息消費失敗,則執(zhí)行 basicNack 方法,告訴 RabbitMQ 消息消費失敗。
@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME) public void receive1(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消息消費的代碼寫到這里String s = new String(message.getBody());System.out.println("s = " + s);//消費完成后,手動 ackchannel.basicAck(deliveryTag, false);} catch (Exception e) {//手動 nacktry {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}} }
- 這里涉及到兩個方法:
- basicAck:這個是手動確認消息已經(jīng)成功消費,該方法有兩個參數(shù):
- 第一個參數(shù)表示消息的 id;
- 第二個參數(shù) multiple 如果為 false,表示僅確認當前消息消費成功,如果為 true,則表示當前消息之前所有未被當前消費者確認的消息都消費成功。
- basicNack:這個是告訴 RabbitMQ 當前消息未被成功消費,該方法有三個參數(shù):
- 第一個參數(shù)表示消息的 id;
- 第二個參數(shù) multiple 如果為 false,表示僅拒絕當前消息的消費,如果為 true,則表示拒絕當前消息之前所有未被當前消費者確認的消息;
- 第三個參數(shù) requeue 含義和前面所說的一樣,被拒絕的消息是否重新入隊。
- 當 basicNack 中最后一個參數(shù)設置為 false 的時候,還涉及到一個死信隊列的問題
- basicAck:這個是手動確認消息已經(jīng)成功消費,該方法有兩個參數(shù):
拉模式手動確認
- 拉模式手動 ack 比較麻煩一些,在 Spring 中封裝的 RabbitTemplate 中并未找到對應的方法,所以我們得用原生的辦法
- 這里涉及到的 basicAck 和 basicNack 方法跟前面的一樣
public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);long deliveryTag = 0L;try {GetResponse getResponse = channel.basicGet(RabbitConfig.MQ_QUEUE_NAME, false);deliveryTag = getResponse.getEnvelope().getDeliveryTag();System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));channel.basicAck(deliveryTag, false);} catch (IOException e) {try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}
消息拒絕
- 當客戶端收到消息時,可以選擇消費這條消息,也可以選擇拒絕這條消息
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void receive2(Channel channel, Message message) {//獲取消息編號long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//拒絕消息channel.basicReject(deliveryTag, true);} catch (IOException e) {e.printStackTrace();}}
}
-
消費者收到消息之后,可以選擇拒絕消費該條消息,拒絕的步驟分兩步
- 獲取消息編號 deliveryTag。
- 調(diào)用 basicReject 方法拒絕消息。
-
調(diào)用 basicReject 方法時,第二個參數(shù)是 requeue,即是否重新入隊。
-
如果第二個參數(shù)為 true,則這條被拒絕的消息會重新進入到消息隊列中,等待下一次被消費;
-
如果第二個參數(shù)為 false,則這條被拒絕的消息就會被丟掉,不會有新的消費者去消費它了。
-
需要注意的是,basicReject 方法一次只能拒絕一條消息
總結:如何保證消息的可靠性。
- 設置confirm和returning機制
- 設置隊列和交互機的持久化
- 搭建rabbitMQ服務集群
- 消費者改為手動確認機制。
冪等性問題
背景
- 消費者在消費完一條消息后,向 RabbitMQ 發(fā)送一個 ack 確認,
- 此時由于網(wǎng)絡斷開或者其他原因?qū)е?RabbitMQ 并沒有收到這個 ack,
- 那么此時 RabbitMQ 并不會將該條消息刪除
- 當重新建立起連接后,消費者還是會再次收到該條消息,這就造成了消息的重復消費。
- 同時,由于類似的原因,消息在發(fā)送的時候,同一條消息也可能會發(fā)送兩次
解決思路
- 采用 Redis,在消費者消費消息之前,現(xiàn)將消息的 id 放到 Redis 中,存儲方式如下:
- id-0(正在執(zhí)行業(yè)務)
- id-1(執(zhí)行業(yè)務成功)
- 如果 ack 失敗,在 RabbitMQ 將消息交給其他的消費者時,先執(zhí)行 setnx,如果 key 已經(jīng)存在(說明之前有人消費過該消息),獲取他的值,如果是 0,當前消費者就什么都不做,如果是 1,直接 ack。
- 極端情況:第一個消費者在執(zhí)行業(yè)務時,出現(xiàn)了死鎖,在 setnx 的基礎上,再給 key 設置一個生存時間。生產(chǎn)者,發(fā)送消息時,指定 messageId
代碼
-
添加redis依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>]
-
添加redis配置
redis:host: localhostport: 6379password: 123456timeout: 3000msdatabase: 0
-
配置類
@Configuration @Slf4j public class RabbitConfig{public final static String DIRECTNAME = "mq-direct";@Beanpublic Queue queue() {return new Queue("hello-queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with("direct");} }
-
生產(chǎn)者
@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//攜帶信息發(fā)送CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.DIRECTNAME,"direct","message",messageId);return "send success";} }
-
消費者
package cn.smbms.consumer;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.io.IOException; import java.util.concurrent.TimeUnit;/*** @author: zjl* @datetime: 2024/5/9* @desc: */ @Component public class DirectReceiver {@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = "hello-queue")public void getMassage(String msg, Channel channel, Message message) throws IOException {//1、獲取messageIDString messageID = message.getMessageProperties().getHeader("spring_returned_message_correlation");//2、用redis的setnx()方法放入值 放入成功返回true 放入失敗返回falseif (stringRedisTemplate.opsForValue().setIfAbsent(messageID, "0", 10, TimeUnit.SECONDS)) {//3、消費消息System.out.println("接收到消息:" + msg);//4、設置value值為1stringRedisTemplate.opsForValue().set(messageID, "1",10,TimeUnit.SECONDS);//5、手動ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {//6、如果放入值失敗 獲取messageID對應的valueString s = stringRedisTemplate.opsForValue().get(messageID);//7、value=0 什么都不做if ("0".equalsIgnoreCase(s)) {return;//8、value=1 手動ack} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}} }