小語種網(wǎng)站寧德seo培訓
RocketMQ 延遲消息
RocketMQ 消費者啟動流程
什么是延遲消息
RocketMQ 延遲消息是指,生產者發(fā)送消息給消費者消息,消費者需要等待一段時間后才能消費到。
使用場景
用戶下單之后,15分鐘未支付,對支付賬單進行提醒或者關單處理。
RocketMQ 開源版本的消息不支持任意時間精度,只支持5s 10s 1m
等等。
Broker 如何處理延遲消息
消息投遞如下:
- 生產者發(fā)送一個延遲消息到一個
topic
中 - Broker 判斷是個延遲消息后,將消息暫存
- Broker 通過延遲服務, 先檢查消息是否過期,如果到期將消息投遞到目標
topic
- 消費者消費
topic
中的投遞延遲消息。
開源RocketMQ 的消息不支持任意精度,默認支持 18個 level:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker 在啟動的時候,會創(chuàng)建一個內部 topic:“SCHEDULE_TOPIC_XXXX” 根據(jù)延遲 level 數(shù)量,創(chuàng)建對應數(shù)量的 隊列。 也就是說 18 level 對應了18 個隊列。
具體可以在 代碼TopicConfigManager.java 中 看到:
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
要注意的是,Broker 一般是集群模式
部署,也就是說,每個Broker 都會有18個隊列。
TopicConfigManager#TopicConfigManager(BrokerController brokerController)
生產者消息延遲發(fā)送
代碼示例如下:
Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
Broker 存儲延遲消息
上一篇文章已經(jīng)談到,Broker
收到消費者消息后,會進行消息存儲,然后再轉發(fā)到消費隊列(ConsumerQueue),然后再推給消費者。
其實一旦消息轉發(fā)到
存儲延遲消息的流程也類似
- 確定延遲消息投遞到topic 哪個隊列。存儲生產者寫入的消息時,將消息轉發(fā)到 ConsumeQueue 中,消費者就能消費到。 延遲消息不能立即消息到,于是將 topic 名稱修改為
SCHEDULE_TOPIC_XXX
,并根據(jù)延遲消息級別,確定投遞到哪個隊列上。同時還會將原來消息要發(fā)送到的目標 topic 和隊列記錄投遞到哪個隊列。
代碼在CommitLog#asyncPutMessage 中
設置延遲消息的投遞隊列信息代碼如下:
// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果設置的級別超過了最大級別,重置延遲級別if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 計算延遲消息應該投遞到 SCHEDULE_TOPIC_XXXX 到哪個隊列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 記錄原始 topic ,queueid,方便后期投遞到目標 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投遞目標為 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}
消息轉發(fā)
消息轉發(fā)過程其實中會對延遲消息做一些特殊處理
CommitLog中的消息轉發(fā)到CosumeQueue中是異步進行的。在轉發(fā)過程中,會對延遲消息進行特殊處理,主要是計算這條延遲消息需要在什么時候進行投遞。