国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

石家莊網(wǎng)絡(luò)公司查封??趕eo計費

石家莊網(wǎng)絡(luò)公司查封,??趕eo計費,商城網(wǎng)站建設(shè)經(jīng)驗,深圳品牌設(shè)計公司排行榜Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費者,本文分別從使用方式、實現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費者。 背景信息? Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費者類型&…

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費者,本文分別從使用方式、實現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費者。

背景信息?

Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費者類型,每種消費者類型的集成方式和控制方式都不一樣。了解如下問題,可以幫助您選擇更匹配業(yè)務(wù)場景的消費者類型。

  • 如何實現(xiàn)并發(fā)消費:消費者如何使用并發(fā)的多線程機制處理消息,以此提高消息處理效率?

  • 如何實現(xiàn)同步、異步消息處理:對于不同的集成場景,消費者獲取消息后可能會將消息異步分發(fā)到業(yè)務(wù)邏輯中處理,此時,消息異步化處理如何實現(xiàn)?

  • 如何實現(xiàn)消息可靠處理:消費者處理消息時如何返回響應(yīng)結(jié)果?如何在消息異常情況進行重試,保證消息的可靠處理?

以上問題的具體答案,請參考下文。

功能概述

消息消費流程

如上圖所示, Apache RocketMQ 的消費者處理消息時主要經(jīng)過以下階段:消息獲取--->消息處理--->消費狀態(tài)提交。

針對以上幾個階段,Apache RocketMQ 提供了不同的消費者類型: PushConsumer 、SimpleConsumer 和 PullConsumer。這幾種類型的消費者通過不同的實現(xiàn)方式和接口可滿足您在不同業(yè)務(wù)場景下的消費需求。具體差異如下:

在實際使用場景中,PullConsumer 僅推薦在流處理框架中集成使用,大多數(shù)消息收發(fā)場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求。

若您的業(yè)務(wù)場景發(fā)生變更,或您當(dāng)前使用的消費者類型不適合當(dāng)前業(yè)務(wù),您可以選擇在 PushConsumer 和SimpleConsumer 之間變更消費者類型。變更消費者類型不影響當(dāng)前Apache RocketMQ 資源的使用和業(yè)務(wù)處理。

危險

生產(chǎn)環(huán)境中相同的 ConsumerGroup 下嚴禁混用 PullConsumer 和其他兩種消費者,否則會導(dǎo)致消息消費異常。

PushConsumer?

PushConsumers是一種高度封裝的消費者類型,消費消息僅通過消費監(jiān)聽器處理業(yè)務(wù)并返回消費結(jié)果。消息的獲取、消費狀態(tài)提交以及消費重試都通過 Apache RocketMQ 的客戶端SDK完成。

使用方式

PushConsumer的使用方式比較固定,在消費者初始化時注冊一個消費監(jiān)聽器,并在消費監(jiān)聽器內(nèi)部實現(xiàn)消息處理邏輯。由 Apache RocketMQ 的SDK在后臺完成消息獲取、觸發(fā)監(jiān)聽器調(diào)用以及進行消息重試處理。

示例代碼如下:

// 消費示例:使用PushConsumer消費普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()// 設(shè)置消費者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置消費監(jiān)聽器。.setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {// 消費消息并返回處理結(jié)果。return ConsumeResult.SUCCESS;}}).build();

PushConsumer的消費監(jiān)聽器執(zhí)行結(jié)果分為以下三種情況:

  • 返回消費成功:以Java SDK為例,返回ConsumeResult.SUCCESS,表示該消息處理成功,服務(wù)端按照消費結(jié)果更新消費進度。

  • 返回消費失敗:以Java SDK為例,返回ConsumeResult.FAILURE,表示該消息處理失敗,需要根據(jù)消費重試邏輯判斷是否進行重試消費。

  • 出現(xiàn)非預(yù)期失敗:例如拋異常等行為,該結(jié)果按照消費失敗處理,需要根據(jù)消費重試邏輯判斷是否進行重試消費。

PushConsumer 消費消息時,若消息處理邏輯出現(xiàn)預(yù)期之外的阻塞導(dǎo)致消息處理一直無法執(zhí)行成功,SDK會按照消費超時處理強制提交消費失敗結(jié)果,并按照消費重試邏輯進行處理。消息超時,請參見PushConsumer消費重試策略。

出現(xiàn)消費超時情況時,SDK雖然提交消費失敗結(jié)果,但是當(dāng)前消費線程可能仍然無法響應(yīng)中斷,還會繼續(xù)處理消息。

內(nèi)部原理

在PushConsumer類型中,消息的實時處理能力是基于SDK內(nèi)部的典型Reactor線程模型實現(xiàn)的。如下圖所示,SDK內(nèi)置了一個長輪詢線程,先將消息異步拉取到SDK內(nèi)置的緩存隊列中,再分別提交到消費線程中,觸發(fā)監(jiān)聽器執(zhí)行本地消費邏輯。

PushConsumer原理

?可靠性重試

PushConsumer 消費者類型中,客戶端SDK和消費邏輯的唯一邊界是消費監(jiān)聽器接口??蛻舳薙DK嚴格按照監(jiān)聽器的返回結(jié)果判斷消息是否消費成功,并做可靠性重試。所有消息必須以同步方式進行消費處理,并在監(jiān)聽器接口結(jié)束時返回調(diào)用結(jié)果,不允許再做異步化分發(fā)。消息重試具體信息,請參見PushConsumer消費重試策略。

使用PushConsumer消費者消費時,不允許使用以下方式處理消息,否則 Apache RocketMQ 無法保證消息的可靠性。

  • 錯誤方式一:消息還未處理完成,就提前返回消費成功結(jié)果。此時如果消息消費失敗,Apache RocketMQ 服務(wù)端是無法感知的,因此不會進行消費重試。

  • 錯誤方式二:在消費監(jiān)聽器內(nèi)將消息再次分發(fā)到自定義的其他線程,消費監(jiān)聽器提前返回消費結(jié)果。此時如果消息消費失敗,Apache RocketMQ 服務(wù)端同樣無法感知,因此也不會進行消費重試。

順序性保障

基于 Apache RocketMQ?順序消息的定義,如果消費者分組設(shè)置了順序消費模式,則PushConsumer在觸發(fā)消費監(jiān)聽器時,嚴格遵循消息的先后順序。業(yè)務(wù)處理邏輯無感知即可保證消息的消費順序。

適用場景

PushConsumer嚴格限制了消息同步處理及每條消息的處理超時時間,適用于以下場景:

  • 消息處理時間可預(yù)估:如果不確定消息處理耗時,經(jīng)常有預(yù)期之外的長時間耗時的消息,PushConsumer的可靠性保證會頻繁觸發(fā)消息重試機制造成大量重復(fù)消息。

  • 無異步化、高級定制場景:PushConsumer限制了消費邏輯的線程模型,由客戶端SDK內(nèi)部按最大吞吐量觸發(fā)消息處理。該模型開發(fā)邏輯簡單,但是不允許使用異步化和自定義處理流程。

SimpleConsumer

SimpleConsumer 是一種接口原子型的消費者類型,消息的獲取、消費狀態(tài)提交以及消費重試都是通過消費者業(yè)務(wù)邏輯主動發(fā)起調(diào)用完成

使用方式

SimpleConsumer 的使用涉及多個接口調(diào)用,由業(yè)務(wù)邏輯按需調(diào)用接口獲取消息,然后分發(fā)給業(yè)務(wù)線程處理消息,最后按照處理的結(jié)果調(diào)用提交接口,返回服務(wù)端當(dāng)前消息的處理結(jié)果。示例如下:

// 消費示例:使用 SimpleConsumer 消費普通消息,主動獲取消息處理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()// 設(shè)置消費者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置從服務(wù)端接受消息的最大等待時間.setAwaitDuration(Duration.ofSeconds(1)).build();
try {// SimpleConsumer 需要主動獲取消息,并處理。List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);// 消費處理完成后,需要主動調(diào)用 ACK 提交消費結(jié)果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);}});
} catch (ClientException e) {// 如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。logger.error("Failed to receive message", e);
}

SimpleConsumer主要涉及以下幾個接口行為:

?可靠性重試

SimpleConsumer消費者類型中,客戶端SDK和服務(wù)端通過ReceiveMessageAckMessage接口通信??蛻舳薙DK如果處理消息成功則調(diào)用AckMessage接口;如果處理失敗只需要不回復(fù)ACK響應(yīng),即可在定義的消費不可見時間到達后觸發(fā)消費重試流程。更多信息,請參見SimpleConsumer消費重試策略。

順序性保障

基于 Apache RocketMQ?順序消息的定義,SimpleConsumer在處理順序消息時,會按照消息存儲的先后順序獲取消息。即需要保持順序的一組消息中,如果前面的消息未處理完成,則無法獲取到后面的消息。

適用場景

SimpleConsumer提供原子接口,用于消息獲取和提交消費結(jié)果,相對于PushConsumer方式更加靈活。SimpleConsumer適用于以下場景:

  • 消息處理時長不可控:如果消息處理時長無法預(yù)估,經(jīng)常有長時間耗時的消息處理情況。建議使用SimpleConsumer消費類型,可以在消費時自定義消息的預(yù)估處理時長,若實際業(yè)務(wù)中預(yù)估的消息處理時長不符合預(yù)期,也可以通過接口提前修改。

  • 需要異步化、批量消費等高級定制場景:SimpleConsumer在SDK內(nèi)部沒有復(fù)雜的線程封裝,完全由業(yè)務(wù)邏輯自由定制,可以實現(xiàn)異步分發(fā)、批量消費等高級定制場景。

  • 需要自定義消費速率:SimpleConsumer是由業(yè)務(wù)邏輯主動調(diào)用接口獲取消息,因此可以自由調(diào)整獲取消息的頻率,自定義控制消費速率。

PullConsumer

使用建議?

PushConsumer合理控制消費耗時,避免無限阻塞

對于PushConsumer消費類型,需要嚴格控制消息的消費耗時,盡量避免出現(xiàn)消息處理超時導(dǎo)致消息重復(fù)。如果業(yè)務(wù)經(jīng)常會出現(xiàn)一些預(yù)期外的長時間耗時的消息,建議使用SimpleConsumer,并設(shè)置好消費不可見時間。

http://m.aloenet.com.cn/news/39412.html

相關(guān)文章:

  • 鄭州網(wǎng)站建設(shè)電話seo外鏈推廣員
  • 下載app到手機seo系統(tǒng)是什么
  • 外貿(mào)網(wǎng)站開發(fā)公司百度上怎么發(fā)布信息啊
  • 臨沂蒼山網(wǎng)站建設(shè)百度聯(lián)盟
  • wordpress 電話鶴壁seo推廣
  • 做任務(wù)賺q紅包的網(wǎng)站百度統(tǒng)計收費嗎
  • ps網(wǎng)站頭部如何優(yōu)化培訓(xùn)方式
  • 找別人做網(wǎng)站需要注意什么百度權(quán)重10的網(wǎng)站
  • 臨沂網(wǎng)站建設(shè)設(shè)計百度識圖網(wǎng)站
  • 網(wǎng)上服裝設(shè)計培訓(xùn)班seo推廣具體做什么
  • 鄭州網(wǎng)站seo優(yōu)微信朋友圈廣告在哪里做
  • 廣州小型網(wǎng)站建設(shè)公司平面設(shè)計正規(guī)培訓(xùn)機構(gòu)
  • 視頻彈幕網(wǎng)站怎么做百度搜索提交入口
  • 個人做商城網(wǎng)站大概多少錢友鏈網(wǎng)站
  • 做網(wǎng)站商城開發(fā)什么語言最快seo站內(nèi)優(yōu)化培訓(xùn)
  • 煙臺企業(yè)網(wǎng)站開發(fā)清博大數(shù)據(jù)輿情監(jiān)測平臺
  • 自己的主機做服務(wù)器網(wǎng)站如何備案網(wǎng)站多少錢
  • 網(wǎng)站建設(shè)成果seo 頁面鏈接優(yōu)化
  • wordpress不用郵件驗證注冊谷歌seo服務(wù)
  • 哪家網(wǎng)站專門做折扣銷售seo搜索優(yōu)化網(wǎng)站推廣排名
  • wordpress 搜索詞天津seo顧問
  • 網(wǎng)站建設(shè)哪里可以學(xué)seo軟件系統(tǒng)
  • wordpress ip security重慶seo網(wǎng)站
  • wordpress用qq登錄上海搜索排名優(yōu)化公司
  • 記事本做網(wǎng)站插圖片安卓內(nèi)核級優(yōu)化神器
  • 網(wǎng)站系統(tǒng)建設(shè)需要什么資質(zhì)競價網(wǎng)站推廣
  • 織夢視頻網(wǎng)站模板今天最新新聞
  • 地方旅游網(wǎng)站建設(shè)方案自己可以創(chuàng)建網(wǎng)站嗎
  • 國內(nèi)html5視頻網(wǎng)站建設(shè)網(wǎng)站分析培訓(xùn)班
  • 做個簡單網(wǎng)站大概多少錢中文搜索引擎有哪些平臺