合肥做網(wǎng)站優(yōu)化哪家好建立網(wǎng)站需要什么條件
一、部署單點RocketMQ
Docker 部署 RocketMQ (圖文并茂超詳細)_docker 部署rocketmq-CSDN博客
這個博主講的很好,可食用,替大家實踐了一遍
二、原理篇
為什么使用RocketMQ:
為什么選擇RocketMQ | RocketMQ
關(guān)于一些原理,感覺官網(wǎng)講的也非常透徹
領(lǐng)域模型概述 | RocketMQ
還有一些功能特性:
普通消息 | RocketMQ
本文的實操篇只是講了發(fā)送普通消息
關(guān)于中間件對比,下面我之前有看過一些很好的文章:
Kafka、RabbitMQ、RocketMQ等消息中間件的對比_rabbimq rocket 差異-CSDN博客
rpc和zmq性能對比 rpc mq區(qū)別_mob6454cc70642f的技術(shù)博客_51CTO博客
RabbitMQ,RocketMQ,Kafka--區(qū)別/對比/選型_51CTO博客_rocketmq rabbitmq kafka選型
三、實操篇
先講講原理:
如果你需要不同業(yè)務,就需要不同消費者組,不要想著同一個消費者組可以通過訂閱不同主題達到不同業(yè)務,因為同一個消費者組內(nèi)的功能必須是一致的,可以換個角度想,既然你是一個業(yè)務,一個業(yè)務就是一個主題嘛,你用不同的業(yè)務實現(xiàn),就多添加幾個消費者組,分別訂閱那個主題(業(yè)務),然后通過不同的Tag區(qū)分就行了,而且而且,不要想著說:一個消費者組一個主題通過不同Tag來區(qū)分,雖然我在剛剛學習的時候也這樣子想,結(jié)果踩了一天的坑,看了好多博客好文來理解,在文末也有關(guān)于為什么不能這樣子做。
1、引入依賴
RocketMQ的依賴:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
demo案例的全部依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>
2、啟動自動裝配
2.2.3 版本的RocketMQ 沒有適配 SpringBoot3,只適配SpringBoot2,所以需要自己去配置好自動裝配??梢詤⒖嘉蚁旅孢@篇文章:
Springboot3+自動裝配_springboot3自動裝配-CSDN博客
在項目中的 resources 目錄下創(chuàng)建 META-INF/spring 文件夾,并創(chuàng)建下面這個文件。
org.springframework.boot.autoconfigure.AutoConfiguration.imports
# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
3、配置application.yml
server:port: 8080spring:profiles:active: devrocketmq:name-server: xxx:9876 # NameServer 地址producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局發(fā)送者組定義send-message-timeout: 2000# 發(fā)送消息失敗時的重試次數(shù)。設(shè)置為 1 表示如果發(fā)送失敗,會再重試一次(總共嘗試兩次)。適用于同步發(fā)送消息失敗時的重試次數(shù)。retry-times-when-send-failed: 1# 異步發(fā)送失敗時的重試次數(shù)。設(shè)置為 1 表示在異步發(fā)送失敗時會再嘗試一次。適用于異步發(fā)送消息失敗時的重試次數(shù)。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info
4、啟動類
相比這個不必多說了。
RocketMQApplication:
package com.bluefoxyu;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@Slf4j
@SpringBootApplication
public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
}
5、編寫一個統(tǒng)一格式的消息對象
package com.bluefoxyu.message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEvent implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}
上述實體類實現(xiàn)了Serializable接口,能夠正常被序列化或者反序列化。
6、生產(chǎn)者
編寫一個生產(chǎn)者,統(tǒng)一做好發(fā)送消息的一個模板,方便簡化接口實現(xiàn)發(fā)送消息的代碼編寫,顯得更加優(yōu)雅一點,說到發(fā)送消息,就需要知道發(fā)送到哪個主題,然后哪些消費者組去消費,然后還有每條消息的唯一標識key,唯一標識可以用uuid生成,也可以用redis生成一個增長的不重復的id,這里使用uuid簡化。
注意:如果你的項目里面只有一個消費者組,只有一個消費業(yè)務,這樣子是不需要傳Tag(過濾標簽)的,但是正常情況都會有多個消息隊列任務,下面提供兩種重載的方法。
code:
package com.bluefoxyu.producer;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封裝全體的消息生產(chǎn)者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageProducer {private final RocketMQTemplate rocketMQTemplate;/*** 發(fā)送普通消息** @param topic 消息發(fā)送主題,用于標識同一類業(yè)務邏輯的消息* @param keys 消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息* @param messageSendEvent 普通消息發(fā)送事件,自定義對象,最終都會序列化為字符串* @return 消息發(fā)送 RocketMQ 返回結(jié)果*/public SendResult sendMessage(String topic, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).build();// 2000L 表示發(fā)送消息的超時時間為 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(topic,message,2000L);log.info("[普通消息] 消息發(fā)送結(jié)果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息發(fā)送失敗,消息體:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}/*** 發(fā)送普通消息** @param topic 消息發(fā)送主題,用于標識同一類業(yè)務邏輯的消息* @param tag 消息的過濾標簽,消費者可通過Tag對消息進行過濾,僅接收指定標簽的消息。* @param keys 消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息* @param messageSendEvent 普通消息發(fā)送事件,自定義對象,最終都會序列化為字符串* @return 消息發(fā)送 RocketMQ 返回結(jié)果*/public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {// 構(gòu)建消息的 destination (主題和標簽)StringBuilder destinationBuilder = StrUtil.builder().append(topic);if (StrUtil.isNotBlank(tag)) {destinationBuilder.append(":").append(tag); // 設(shè)置tag}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag) // 設(shè)置消息的標簽.build();// 2000L 表示發(fā)送消息的超時時間為 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息發(fā)送結(jié)果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息發(fā)送失敗,消息體:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}}
7、定義一個constant
package com.bluefoxyu.constant;/*** RocketMQ 常量類* @author bluefoxyu*/
public class RocketMQConstant {/*** Group 消費者組定義*/public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";/*** Topic 主題定義*/public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";/*** Tag 標簽*/public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";}
8、多/單個消費者訂閱一個主題
1.實現(xiàn)消費者
這里需要實現(xiàn)監(jiān)聽的消息的實體類類型是什么,GeneralMessageEvent 是我們之前封裝的統(tǒng)一消息對象
implements RocketMQListener<GeneralMessageEvent>
在onMessage方法中,通過
JSON.toJSONString(message)
就可以拿到解析好的消息內(nèi)容,也就是我們真正需要發(fā)送的消息,下面我編寫三個消費者來進行消費,不過綁定的都是同一個主題,類似負載均衡的功能,這里只用一個消費者也是一樣的,因為后續(xù)還需要測其他功能,所以這里我寫了三個消費者。
GeneralMessageConsumer1:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer1 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer1] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer2:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer2 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer2] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer3:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer3 implements RocketMQListener<GeneralMessageEvent> {@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer3] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
2.編寫接口發(fā)送消息
發(fā)送消息需要發(fā)送這四要素:
- ?topic 主題?
- ?key 唯一標識
- message 需要發(fā)送的消息
package com.bluefoxyu.controller;import com.bluefoxyu.producer.GeneralMessageProducer;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;@RestController
@RequiredArgsConstructor
public class controller {private final GeneralMessageProducer generalMessageDemoProduce;@PostMapping("/send/topic1/general-messageA")public String sendTopic1GeneralMessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);// 返回發(fā)送成功的狀態(tài)名return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageB")public String sendTopic1GeneralMessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageC")public String sendTopic1GeneralMessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}}
3.接口測試
準備這三個測試接口:
開始分別測試三個接口這里就不一一展示了。
看控制臺:
如結(jié)果消費了三次
9、測試多個消費者分別訂閱不同主題
如果相同消費組的三個消費者組分別訂閱不同主題,會怎么樣呢。修改的代碼如下,
當然,哈哈哈哈哈哈,就是消費不到消息(對于小白的我也被困擾了好久),由于是有問題的,代碼就不粘貼了【狗頭】。如下:
這里參考了一篇大佬的文章:
rocketmq問題匯總-一個consumerGroup只對應一個topic_org.apache.rocketmq.client.exception.mqbrokerexcep-CSDN博客
看完后悟了很多,大概意思就是一個消費者組中的職責應該是一致的,應該都去訂閱相同主題的,如果一個消費者訂閱了兩個主題,那么其他同組的消費者也應該訂閱那兩個主題,參考評論區(qū)這幾大佬的評論:
這個大佬就說的很透徹了:
10、一個消費者訂閱多個主題
在上面說了既然一個消費者可以訂閱多個主題,但是前提條件是同一個消費組中必須訂閱相同主題,那應該怎么實現(xiàn)呢。
直接給代碼:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎(chǔ)改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class GeneralMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者GeneralMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("General message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者GeneralMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎(chǔ)改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagAMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者TagAMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("tagA message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者TagAMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}
}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎(chǔ)改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagBMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者TagBMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("tagB message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者TagBMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}
}
分別測試三個接口:
參考這位大佬的博客:
rocketmq (消費者消費同一個消費組,不同的topic)_rocketmq一個消費組消費多個topic-CSDN博客
11、多個消費者組訂閱相同主題
這個業(yè)務經(jīng)常是有的,希望訂閱同一種業(yè)務,但是有不同的實現(xiàn),這時候就需要使用Tag過濾標簽來區(qū)分了。
1、實現(xiàn)消費者
MessageConsumerA:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_A,selectorExpression = MESSAGE_TAG_A
)
public class MessageConsumerA implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerA] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerB:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_B,selectorExpression = MESSAGE_TAG_B
)
public class MessageConsumerB implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerB] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerC:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_C,selectorExpression = MESSAGE_TAG_C
)
public class MessageConsumerC implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerC] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
2、編寫接口發(fā)送消息
再controller添加那三個接口
@PostMapping("/send/topic2/messageA")public String sendTopic2MessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_A,keys,generalMessageEvent);// 返回發(fā)送成功的狀態(tài)名return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageB")public String sendTopic3MessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_B,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageC")public String sendTopic2MessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_C,keys,generalMessageEvent);return sendResult.getSendStatus().name();}
3、接口測試
消費成功!
也可以參考一個佬的博客:RocketMQ,同一個topic下是否可以通過不同的tag來進行訂閱嗎?_rocketmq一個topic多個tag-CSDN博客
當然,如果訂閱不同主題也是沒問題的,這里就不作演示了。
四、文末大佬好文
最后加上兩個大佬好文,感覺講的都很好:
面試官:RocketMQ同一個消費組內(nèi)的消費者訂閱了不同tag,會有問題嗎?_rocketmq 訂閱多個tag-CSDN博客
面試官:RocketMQ一個消費組內(nèi)訂閱同一個主題不同的TAG為什么會丟消息_為什么rocketmq相同消費組不同tag會有問題-CSDN博客
對于一個消費者組訂閱同一個主題不同tag會丟消息,在前幾天從0到1學習的時候,以為是可以的,但是踩了大坑。