中國南昌企業(yè)網(wǎng)站制作seo修改器
深入理解Kafka核心設(shè)計(jì)與實(shí)踐原理_03
- 03_消費(fèi)者
- 3.1消費(fèi)者與消費(fèi)者組
- 3.2客戶端開發(fā)
- 3.2.1 必要的參數(shù)配置
- 3.2.2 訂閱主題與分區(qū)
草稿
03_消費(fèi)者
與生產(chǎn)者對應(yīng)的是消費(fèi)者,應(yīng)用程序可以通過KafkaConsumer來訂閱主題,并從訂閱的主題中拉取消息。不過在使用KafkaConsumer消費(fèi)消息之前需要先了解消費(fèi)者和消費(fèi)組的概念,否則無法理解如何使用 KafkaConsumer。本章首先講解消費(fèi)者與消費(fèi)組之間的關(guān)系,進(jìn)而再細(xì)致地講解如何使用KafkaConsumer。
3.1消費(fèi)者與消費(fèi)者組
消費(fèi)者(Consumer)負(fù)責(zé)訂閱Kafka中的主題(Topic),并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費(fèi)理念中還有一層消費(fèi)組(Consumer Group)的概念,每個(gè)消費(fèi)者都有一個(gè)對應(yīng)的消費(fèi)組。當(dāng)消息發(fā)布到主題后,只會(huì)被投遞給訂閱它的每個(gè)消費(fèi)組中的一個(gè)消費(fèi)者。消費(fèi)者與消費(fèi)組這種模型可以讓整體的消費(fèi)能力具備橫向伸縮性,我們可以增加(或減少)消費(fèi)者的個(gè)數(shù)來提高(或降低)整體的消費(fèi)能力。
對于消息中間件而言,一般有兩種消息投遞模式:點(diǎn)對點(diǎn)(P2P,Point-to-Point)模式和發(fā)布/訂閱(Pub/Sub)模式。
點(diǎn)對點(diǎn)模式是基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息。發(fā)布訂閱模式定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這個(gè)內(nèi)容節(jié)點(diǎn)稱為主題(Topic),主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布者將消息發(fā)布到某個(gè)主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布/訂閱模式在消息的一對多廣播時(shí)采用。
Kafka 同時(shí)支持兩種消息投遞模式,而這正是得益于消費(fèi)者與消費(fèi)組模型的契合:· 如果所有的消費(fèi)者都隸屬于同一個(gè)消費(fèi)組,那么所有的消息都會(huì)被均衡地投遞給每一個(gè)消費(fèi)者,即每條消息只會(huì)被一個(gè)消費(fèi)者處理,這就相當(dāng)于點(diǎn)對點(diǎn)模式的應(yīng)用?!?如果所有的消費(fèi)者都隸屬于不同的消費(fèi)組,那么所有的消息都會(huì)被廣播給所有的消費(fèi)者,即每條消息會(huì)被所有的消費(fèi)者處理,這就相當(dāng)于發(fā)布/訂閱模式的應(yīng)用。
消費(fèi)組是一個(gè)邏輯上的概念,它將旗下的消費(fèi)者歸為一類,每一個(gè)消費(fèi)者只隸屬于一個(gè)消費(fèi)組。每一個(gè)消費(fèi)組都會(huì)有一個(gè)固定的名稱,消費(fèi)者在進(jìn)行消費(fèi)前需要指定其所屬消費(fèi)組的名稱,這個(gè)可以通過消費(fèi)者客戶端參數(shù)group.id來配置,默認(rèn)值為空字符串。消費(fèi)者并非邏輯上的概念,它是實(shí)際的應(yīng)用實(shí)例,它可以是一個(gè)線程,也可以是一個(gè)進(jìn)程。同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者既可以部署在同一臺(tái)機(jī)器上,也可以部署在不同的機(jī)器上。
3.2客戶端開發(fā)
(1)配置消費(fèi)者客戶端參數(shù)及創(chuàng)建相應(yīng)的消費(fèi)者實(shí)例。
(2)訂閱主題。
(3)拉取消息并消費(fèi)。
(4)提交消費(fèi)位移。
(5)關(guān)閉消費(fèi)者實(shí)例。
3.2.1 必要的參數(shù)配置
- bootstrap.servers:該參數(shù)的釋義和生產(chǎn)者客戶端 KafkaProducer 中的相同,用來 指 定 連 接 Kafka 集 群 所 需 的 broker 地 址 清 單,具 體 內(nèi) 容 形 式 為host1:port1,host2:post,可以設(shè)置一個(gè)或多個(gè)地址,中間用逗號(hào)隔開,此參數(shù)的默認(rèn)值為“”。注意這里并非需要設(shè)置集群中全部的broker地址,消費(fèi)者會(huì)從現(xiàn)有的配置中查找到全部的Kafka集群成員。這里設(shè)置兩個(gè)以上的broker地址信息,當(dāng)其中任意一個(gè)宕機(jī)時(shí),消費(fèi)者仍然可以連接到Kafka集群上。
- group.id:消費(fèi)者隸屬的消費(fèi)組的名稱,默認(rèn)值為“”。如果設(shè)置為空,則會(huì)報(bào)出異常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,這個(gè)參數(shù)需要設(shè)置成具有一定的業(yè)務(wù)意義的名稱。
- key.deserializer 和 value.deserializer:與生產(chǎn)者客戶端 KafkaProducer中的key.serializer和value.serializer參數(shù)對應(yīng)。消費(fèi)者從broker端獲取的消息格式都是字節(jié)數(shù)組(byte[])類型,所以需要執(zhí)行相應(yīng)的反序列化操作才能還原成原有的對象格式。這兩個(gè)參數(shù)分別用來指定消息中key和value所需反序列化操作的反序列化器,這兩個(gè)參數(shù)無默認(rèn)值。注意這里必須填寫反序列化器類的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,單單指定StringDeserializer是錯(cuò)誤的。
注意到代碼清單3-1中的initConfig()方法里還設(shè)置了一個(gè)參數(shù)client.id,這個(gè)參數(shù)用來設(shè)定KafkaConsumer對應(yīng)的客戶端id,默認(rèn)值也為“”。如果客戶端不設(shè)置,則KafkaConsumer會(huì)自動(dòng)生成一個(gè)非空字符串,內(nèi)容形式如“consumer-1”“consumer-2”,即字符串“consumer-”與數(shù)字的拼接。
3.2.2 訂閱主題與分區(qū)
一個(gè)消費(fèi)者可以訂閱一個(gè)或多個(gè)主題,代碼清單3-1中我們使用subscribe()方法訂閱了一個(gè)主題,對于這個(gè)方法而言,既可以以集合的形式訂閱多個(gè)主題,也可以以正則表達(dá)式的形式訂閱特定模式的主題。subscribe的幾個(gè)重載方法如下:
如果消費(fèi)者采用的是正則表達(dá)式的方式(subscribe(Pattern))訂閱,在之后的過程中,如果有人又創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么這個(gè)消費(fèi)者就可以消費(fèi)到新添加的主題中的消息。如果應(yīng)用程序需要消費(fèi)多個(gè)主題,并且可以處理不同的類型,那么這種訂閱方式就很有效。
在 subscribe 的重載方法中有一個(gè)參數(shù)類型是ConsumerRebalance-Listener,這個(gè)是用來設(shè)置相應(yīng)的再均衡監(jiān)聽器的。
消費(fèi)者不僅可以通過KafkaConsumer.subscribe()方法訂閱主題,還可以直接訂閱某些主題的特定分區(qū),在KafkaConsumer中還提供了一個(gè)assign()方法來實(shí)現(xiàn)這些功能。
該方法只接受一個(gè)參數(shù)partitions,用來指定需要訂閱的分區(qū)集合。這里補(bǔ)充說明一下TopicPartition類,在Kafka的客戶端中,它用來表示分區(qū)。TopicPartition類只有2個(gè)屬性:topic和partition,分別代表分區(qū)所屬的主題和自身的分區(qū)編號(hào),這個(gè)類可以和我們通常所說的主題—分區(qū)的概念映射
起來。
KafkaConsumer 中的partitionsFor()方法可以用來查詢指定主題的元數(shù)據(jù)信息。