国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

wordpress建站教程視頻百度指數(shù)需求圖譜

wordpress建站教程視頻,百度指數(shù)需求圖譜,wordpress不顯示主題,北京做網(wǎng)站哪家便宜👏作者簡介:大家好,我是愛吃芝士的土豆倪,24屆校招生Java選手,很高興認(rèn)識大家📕系列專欄:Spring源碼、JUC源碼、Kafka原理🔥如果感覺博主的文章還不錯的話,請&#x1f44…
  • 👏作者簡介:大家好,我是愛吃芝士的土豆倪,24屆校招生Java選手,很高興認(rèn)識大家
  • 📕系列專欄:Spring源碼、JUC源碼、Kafka原理
  • 🔥如果感覺博主的文章還不錯的話,請👍三連支持👍一下博主哦
  • 🍂博主正在努力完成2023計劃中:源碼溯源,一探究竟
  • 📝聯(lián)系方式:nhs19990716,加我進(jìn)群,大家一起學(xué)習(xí),一起進(jìn)步,一起對抗互聯(lián)網(wǎng)寒冬👀

文章目錄

  • Kafka系統(tǒng)基本架構(gòu)
    • broker 服務(wù)器
    • 生產(chǎn)者 producer
    • 消費者 consumer
    • 主題 topic 和分區(qū) partition
      • topic
      • partition 分區(qū)
    • 分區(qū)副本 replica
    • 分區(qū) leader
    • 分區(qū) follower
    • 消息偏移量 offset
    • ISR 同步副本列表
  • kafka 的數(shù)據(jù)存儲結(jié)構(gòu)
    • 物理存儲目錄結(jié)構(gòu)
    • 消息 message 存儲結(jié)構(gòu)
  • kafka 關(guān)鍵原理加強(qiáng)
    • 日志分段切分條件
    • controller 控制器
      • controller 的職責(zé)
      • 分區(qū)的負(fù)載分布
      • 分區(qū) Leader 的選舉機(jī)制
    • 生產(chǎn)者原理解析
      • 生產(chǎn)者工作流程
      • 重要的生產(chǎn)者參數(shù)
        • acks
      • Producer 往 Broker 發(fā)送消息應(yīng)答機(jī)制
        • acks = 0:
        • acks = 1:
        • acks = all:
      • 重要的生產(chǎn)者參數(shù)
        • max.request.size
        • retries 和 retry.backoff.ms
        • compression.type
        • batch.size
        • linger.ms
        • enable.idempotence
        • partitioner.class

Kafka系統(tǒng)基本架構(gòu)

在這里插入圖片描述

自我推導(dǎo)設(shè)計:

  • kafka 是用來存數(shù)據(jù)
  • 現(xiàn)實世界數(shù)據(jù)有分類,所以存儲系統(tǒng)也應(yīng)有數(shù)據(jù)分類管理功能,如 mysql 的表;kafka 有 topic;
  • 如一個 topic 的數(shù)據(jù)全部交給一臺 server 存儲和管理,則讀寫吞吐量有限
  • 所以,一個 topic 的數(shù)據(jù)應(yīng)該可以分成多個部分(partition)分別交給多臺 server存儲和管理;
  • 如一臺 server 宕機(jī),這臺 server 負(fù)責(zé)的 partition 將不可用,所以,一個 partition 應(yīng)有多個副本;
  • 一個 partition 有多個副本,則副本間的數(shù)據(jù)一致性難以保證,因此要有一個 leader 統(tǒng)領(lǐng)讀寫;
  • 一個 leader 萬一掛掉,則該 partition 又不可用,因此還要有 leader 的動態(tài)選舉機(jī)制;
  • 集群有哪些 topic,topic 有哪幾個分區(qū),server 在線情況,等等元信息和狀態(tài)信息需要在集群內(nèi)部及客戶端之間共享,則引入了 zookeeper;
  • 客戶端在讀取數(shù)據(jù)時,往往需要知道自己所讀取到的位置,因而要引入消息偏移量維護(hù)機(jī)制;

broker 服務(wù)器

一臺 kafka 服務(wù)器就是一個 broker。一個 kafka 集群由多個 broker 組成。

生產(chǎn)者 producer

消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端。

消費者 consumer

consumer :消費者,從 kafka broker 取消息的客戶端。

consumer group:消費組,單個或多個 consumer 可以組成一個消費組;

消費組是用來實現(xiàn)消息的廣播(發(fā)給所有的 consumer)和單播(發(fā)給任意一個 consumer)的手段

在這里插入圖片描述

消費者可以對消費到的消息位置(消息偏移量)進(jìn)行記錄;
老版本是記錄在 zookeeper 中;新版本是記錄在 kafka 中一個內(nèi)置的 topic 中(__consumer_offsets)

主題 topic 和分區(qū) partition

topic

Kafka 中存儲數(shù)據(jù)的邏輯分類;你可以理解為數(shù)據(jù)庫中“表”的概念;

比如,將 app 端日志、微信小程序端日志、業(yè)務(wù)庫訂單表數(shù)據(jù)分別放入不同的 topic

partition 分區(qū)

topic 中數(shù)據(jù)的具體管理單元;

  • 每個 partition 由一個 kafka broker 服務(wù)器管理;
  • 每個 topic 可以劃分為多個 partition,分布到多個 broker 上管理;
  • 每個 partition 都可以有多個副本;

分區(qū)對于 kafka 集群的好處是:實現(xiàn) topic 數(shù)據(jù)的負(fù)載均衡。提高寫入、讀出的并發(fā)度,提高吞吐量。

分區(qū)副本 replica

每個 topic 的每個 partition 都可以配置多個副本(replica),以提高數(shù)據(jù)的可靠性;

每個 partition 的所有副本中,必有一個 leader 副本,其他的就是 follower 副本(observer 副本);follower
定期找 leader 同步最新的數(shù)據(jù);對外提供服務(wù)只有 leader;

分區(qū) leader

partition replica 中的一個角色,在一個 partition 的多個副本中,會存在一個副本角色為 leader;

producer 和 consumer 只能跟 leader 交互(讀寫數(shù)據(jù))。

分區(qū) follower

partition replica 中的一個角色,它通過心跳通信不斷從 leader 中拉取、復(fù)制數(shù)據(jù)(只負(fù)責(zé)備份)。

如果 leader 所在節(jié)點宕機(jī),follower 中會選舉出新的 leader;

消息偏移量 offset

partition 中每條消息都會被分配一個遞增 id(offset);通過 offset 可以快速定位到消息的存儲位置;
kafka 只保證按一個 partition 中的消息的順序,不保證一個 topic 的整體(多個 partition 間)的順序。

在這里插入圖片描述

因為broker將數(shù)據(jù)寫入分區(qū)存儲文件時,永遠(yuǎn)都是追加,所以kafka把自己的數(shù)據(jù)存儲文件稱之為log。

ISR 同步副本列表

ISR 概念:(同步副本)。每個分區(qū)的 leader 會維護(hù)一個 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 編 號 , 只 有 跟 得 上 Leader 的 follower 副 本 才 能 加 入 到 ISR 里 面 , 這 個 是 通 過replica.lag.time.max.ms =10000(默認(rèn)值)參數(shù)配置的,只有 ISR 里的成員才有被選為 leader 的可能

在這里插入圖片描述

踢出 ISR 和重新加入 ISR 的條件:

  • 踢出 ISR 的條件: 由 replica.lag.time.max.ms =10000 決定,如上圖;
  • 重新加入 ISR 的條件: OSR 副本的 LEO(log end offset)追上 leader 的 LEO;

kafka 的數(shù)據(jù)存儲結(jié)構(gòu)

在這里插入圖片描述

物理存儲目錄結(jié)構(gòu)

  • 存儲目錄 名稱規(guī)范: topic 名稱-分區(qū)號

在這里插入圖片描述

注:“t1"即為一個 topic 的名稱;

而“t1-0 / t1-1"則表明這個目錄是 t1 這個 topic 的哪個 partition;

  • 數(shù)據(jù)文件 名稱規(guī)范:

生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制 ;

  1. 每個 partition 的數(shù)據(jù)將分為多個 segment 存儲
  2. 每個 segment 對應(yīng)兩個文件:“.index"文件和“.log"文件。

在這里插入圖片描述

index 和 log 文件以當(dāng)前 segment 的第一條消息的 offset。

在這里插入圖片描述

index 索引文件中的數(shù)據(jù)為: 消息 offset -> log 文件中該消息的物理偏移量位置;

Kafka 中的索引文件以稀疏索引( sparse index )的方式構(gòu)造消息的索引,它并不保證每個消息在索引文件中都有對應(yīng)的索引;每當(dāng)寫入一定量(由 broker 端參數(shù) log.index.interval.bytes 指定,默認(rèn)值為 4096 ,即 4KB )的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,增大或減小 log.index.interval.bytes 的值,對應(yīng)地可以縮小或增加索引項的密度;

查詢指定偏移量時,使用二分查找法來快速定位偏移量的位置。

消息 message 存儲結(jié)構(gòu)

在客戶端編程代碼中,消息的封裝類有兩種:ProducerRecordConsumerRecord

簡單來說,kafka 中的每個 massage 由一對 key-value 構(gòu)成:

Kafka 中的 message 格式經(jīng)歷了 3 個版本的變化了:v0 、 v1 、 v2

在這里插入圖片描述

各個字段的含義介紹如下:

  • crc:占用 4 個字節(jié),主要用于校驗消息的內(nèi)容;(循環(huán)冗余校驗碼)
  • magic:這個占用 1 個字節(jié),主要用于標(biāo)識 Kafka 版本。Kafka 0.10.x magic 默認(rèn)值為 1;
  • attributes:占用 1 個字節(jié),這里面存儲了消息壓縮使用的編碼以及 Timestamp 類型。目前 Kafka 支持 gzip、snappy 以及 lz4(0.8.2 引入) 三種壓縮格式;[0,1,2]三位 it 表示壓縮類型。[3]位表示時間錯類型(0,create time;1,append time),[4,5,6,7]位保留;
  • key length:占用 4 個字節(jié)。主要標(biāo)識 Key 的內(nèi)容的長度;
  • key:占用 N 個字節(jié),存儲的是 key 的具體內(nèi)容;(相當(dāng)于讀多少位之后這個就是key,知道key有多長就能讀對key)
  • value length:占用 4 個字節(jié)。主要標(biāo)識 value的內(nèi)容的長度;
  • value:value 即是消息的真實內(nèi)容,在 Kafka 中這個也叫做 payload。(知道value有多長就能讀對value)

通過這些設(shè)置,在發(fā)送的時候,如果網(wǎng)絡(luò)中出現(xiàn)了波動,那么我們就能夠知道消息是有問題的。

在這里插入圖片描述

kafka 關(guān)鍵原理加強(qiáng)

日志分段切分條件

(1)當(dāng)前日志分段文件的大小超過了 broker 端參數(shù) log.segment.bytes 配置的值log.segment.bytes 參數(shù)的默認(rèn)值為 1073741824,即 1GB

(2)當(dāng)前日志分段中消息的最小時間戳與當(dāng)前系統(tǒng)的時間戳的差值大于 log.roll.ms 或 log.roll.hours參數(shù)配置的值。如果同時配置了 log.roll.ms 和 log.roll.hours 參數(shù),那么 log.roll.ms 的優(yōu)先級高默認(rèn)情況下,只配置了 log.roll.hours 參數(shù),其值為 168,即 7 天。

(3)偏移量索引文件或時間戳索引文件的大小達(dá)到 broker 端參數(shù) log.index.size.max.bytes 配置的值。log.index.size .max.bytes 的默認(rèn)值為 10485760,即 10MB

(4)追加的消息的偏移量與當(dāng)前日志分段的起始偏移量之間的差值大于 Integer.MAX_VALUE, 即要追加的消息的偏移量不能轉(zhuǎn)變?yōu)橄鄬ζ屏?#xff08;offset - baseOffset > Integer.MAX_VALUE)。

controller 控制器

Controller 簡單來說,就是 kafka 集群的狀態(tài)管理者

在 Kafka 集群中會有一個或者多個 broker,其中有一個 broker 會被選舉為控制器(Kafka Controller),它負(fù)責(zé)維護(hù)整個集群中所有分區(qū)和副本的狀態(tài)及分區(qū) leader 的選舉。

當(dāng)某個分區(qū)的 leader 副本出現(xiàn)故障時,由控制器負(fù)責(zé)為該分區(qū)選舉新的 leader 副本。當(dāng)檢測到某個分區(qū)的 ISR 集合發(fā)生變化時,由控制器負(fù)責(zé)通知所有 broker 更新其元數(shù)據(jù)信息。當(dāng)使用 kafka-topics.sh腳本為某個 topic 增加分區(qū)數(shù)量時,同樣還是由控制器負(fù)責(zé)分區(qū)的重新分配。

Kafka 中的控制器選舉的工作依賴于 Zookeeper,成功競選為控制器的 broker 會在 Zookeeper 中創(chuàng)建/controller 這個臨時(EPHEMERAL)節(jié)點,此臨時節(jié)點的內(nèi)容參考如下:

{"version":1,"brokerid":0,"timestamp":"1529210278988"}

其中 version 在目前版本中固定為 1,brokerid 表示成為控制器的 broker 的 id 編號,timestamp 表示競選成為控制器時的時間戳。

在任意時刻,集群中有且僅有一個控制器。每個 broker 啟動的時候會去嘗試去讀取 zookeeper 上的/controller 節(jié)點的 brokerid 的值,如果讀取到 brokerid 的值不為-1,則表示已經(jīng)有其它 broker 節(jié)點成功競選為控制器,所以當(dāng)前 broker 就會放棄競選;如果 Zookeeper 中不存在/controller 這個節(jié)點,或者這個節(jié)點中的數(shù)據(jù)異常,那么就會嘗試去創(chuàng)建/controller 這個節(jié)點,當(dāng)前 broker 去創(chuàng)建節(jié)點的時候,也有可能其他 broker 同時去嘗試創(chuàng)建這個節(jié)點,只有創(chuàng)建成功的那個 broker 才會成為控制器,而創(chuàng)建失敗的 broker 則表示競選失敗。每個 broker 都會在內(nèi)存中保存當(dāng)前控制器的 brokerid 值,這個值可以標(biāo)識為 activeControllerId。

controller 競選機(jī)制:簡單說,先來先上!

controller 的職責(zé)

監(jiān)聽 partition 相關(guān)變化

對 Zookeeper 中的/admin/reassign_partitions 節(jié)點注冊 PartitionReassignmentListener,用來處理分區(qū)重分配的動作。對 Zookeeper 中的/isr_change_notification 節(jié)點注冊 IsrChangeNotificetionListener,用來處理ISR 集合變更的動作。對 Zookeeper 中的/admin/preferred-replica-election 節(jié)點添加PreferredReplicaElectionListener,用來處理優(yōu)先副本選舉。

監(jiān)聽 topic 增減變化

對 Zookeeper 中的/brokers/topics 節(jié)點添加 TopicChangeListener,用來處理 topic 增減的變化;對 Zookeeper 中的/admin/delete_topics 節(jié)點添加 TopicDeletionListener,用來處理刪除 topic 的動作

監(jiān)聽 broker 相關(guān)的變化

對 Zookeeper 中的/brokers/ids/節(jié)點添加 BrokerChangeListener,用來處理 broker 增減的變化。

更新集群的元數(shù)據(jù)信息

從 Zookeeper 中讀取獲取當(dāng)前所有與 topic、partition 以及 broker 有關(guān)的信息并進(jìn)行相應(yīng)的管理。對各topic 所對應(yīng)的Zookeeper 中的/brokers/topics/[topic]節(jié)點添加 PartitionModificationsListener,用來監(jiān)聽 topic 中的分區(qū)分配變化。并將最新信息同步給其他所有 broker

啟動并管理分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)。

如果參數(shù) auto.leader.rebalance.enable 設(shè)置為 true,則還會開啟一個名為“auto-leader-rebalance-task”
的定時任務(wù)來負(fù)責(zé)維護(hù)分區(qū)的 leader

分區(qū)的負(fù)載分布

topic中的每個分區(qū)的每個副本及其leader副本,在集群中的眾多broker中,如何分布的。

有兩大策略:

策略1:kafka自動分布

策略2:topic的創(chuàng)建者手動指定

客戶端請求創(chuàng)建一個 topic 時,每一個分區(qū)副本在 broker 上的分配,是由集群 controller 來決定;其分布策略源碼如下:

private def assignReplicasToBrokersRackUnaware(nPartitions: Int,replicationFactor: Int,brokerList: Seq[Int],fixedStartIndex: Int,startPartitionId: Int): Map[Int, Seq[Int]] = {val ret = mutable.Map[Int, Seq[Int]]()val brokerArray = brokerList.toArrayval startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)var currentPartitionId = math.max(0, startPartitionId)var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)for (_ <- 0 until nPartitions) {if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) {nextReplicaShift += 1}val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.lengthval replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))for (j <- 0 until replicationFactor - 1) {replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))}ret.put(currentPartitionId, replicaBuffer)currentPartitionId += 1}ret
}private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex:
Int, nBrokers: Int): Int = {val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)(firstReplicaIndex + shift) % nBrokers
}
副本因子不能大于 Broker 的個數(shù);partition_0 的第 1 個副本(leader 副本)放置位置是隨機(jī)從 brokerList 選擇的;其他分區(qū)的第 1 個副本(leader)放置位置相對于 paritition_0 分區(qū)依次往后移(也就是如果我們有 5 個 Broker,5 個分區(qū),假設(shè) partition0 分區(qū)放在 broker4 上,那么 partition1 將會放在 broker5上;patition2 將會放在 broker1 上;partition3 在 broker2,依次類);各分區(qū)剩余的副本相對于分區(qū)前一個副本偏移隨機(jī)數(shù) nextReplicaShift

分區(qū) Leader 的選舉機(jī)制

分區(qū) leader 副本的選舉由控制器 controller 負(fù)責(zé)具體實施

當(dāng)創(chuàng)建分區(qū)(創(chuàng)建主題或增加分區(qū)都有創(chuàng)建分區(qū)的動作)或 Leader 下線(此時分區(qū)需要選舉一個新的 leader 上線來對外提供服務(wù))的時候都需要執(zhí)行 leader 的選舉動作。

選舉策略:按照 AR 集合中副本的順序查找第一個存活的副本,并且這個副本在 ISR 集合中;

一個分區(qū)的 AR 集合在 partition 分配的時候就被指定,并且只要不發(fā)生重分配的情況,集合內(nèi)部副本的順序是保持不變的,而分區(qū)的 ISR 集合中副本的順序可能會改變

生產(chǎn)者原理解析

生產(chǎn)者工作流程

在這里插入圖片描述

一個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和 Sender 線程 。

在主線程中由 kafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator, 也稱為消息收集器)中。

Sender 線程負(fù)責(zé)從 RecordAccumulator 獲取消息并將其發(fā)送到 Kafka 中;

RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù) buffer.memory 配置,默認(rèn)值為 33554432B ,即 32M。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,則會導(dǎo)致生產(chǎn)者空間不足,這個時候 KafkaProducer.send()方法調(diào)用要么被阻塞,要么拋出異常,這個取決于參數(shù)max.block.ms 的配置,此參數(shù)的默認(rèn)值為 60000,即 60 秒。

主線程中發(fā)送過來的消息都會被迫加到 RecordAccumulator 的某個雙端隊列( Deque )中,RecordAccumulator 內(nèi)部為每個分區(qū)都維護(hù)了一個雙端隊列,即 Deque消息寫入緩存時,追加到雙端隊列的尾部;

Sender 讀取消息時,從雙端隊列的頭部讀取。注意:ProducerBatch 是指一個消息批次;與此同時,會將較小的 ProducerBatch 湊成一個較大 ProducerBatch ,也可以減少網(wǎng)絡(luò)請求的次數(shù)以提升整體的吞吐量。

ProducerBatch 大小和 batch.size 參數(shù)也有著密切的關(guān)系。當(dāng)一條消息(ProducerRecord ) 流入RecordAccumulator 時,會先尋找與消息分區(qū)所對應(yīng)的雙端隊列(如果沒有則新建),再從這個雙端隊列的尾部獲取一個 ProducerBatch (如果沒有則新建),查看 ProducerBatch 中是否還可以寫入這個 ProducerRecord,如果可以寫入,如果不可以則需要創(chuàng)建一個新的 Producer Batch。在新建ProducerBatch 時評估這條消息的大小是否超過 batch.size 參數(shù)大小,如果不超過,那么就以 batch.size參數(shù)的大小來創(chuàng)建 ProducerBatch。

如果生產(chǎn)者客戶端需要向很多分區(qū)發(fā)送消息, 則可以將 buffer.memory 參數(shù)適當(dāng)調(diào)大以增加整體的吞吐量。

Sender 從 RecordAccumulator 獲取緩存的消息之后,會進(jìn)一步將<分區(qū),Deque>的形式轉(zhuǎn)變成<Node,List>的形式,其中 Node 表示 Kafka 集群 broker 節(jié)點。

對于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體 broker 節(jié)點建立的連接,也就是向具體的 broker 節(jié)點發(fā)送消息,而并不關(guān)心消息屬于哪一個分區(qū);而對于 KafkaProducer 的應(yīng)用邏輯而言,我們只關(guān)注向哪個分區(qū)中發(fā)送哪些消息,所以在這里需要做一個應(yīng)用邏輯層面到網(wǎng)絡(luò) I/O 層面的轉(zhuǎn)換。

在轉(zhuǎn)換成<Node,List>的形式之后, Sender 會進(jìn)一步封裝成<Node,Request> 的形式,這樣就可以將 Request 請求發(fā)往各個 Node 了,這里的 Request 是 Kafka 各種協(xié)議請求;

但是還是存在發(fā)送失敗的情況,所以請求在從 sender 線程發(fā)往 Kafka 之前還會保存到 InFlightRequests 中,InFlightRequests 保存對象的具體形式為 Map<NodeId,Deque>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到服務(wù)端響應(yīng)的請(NodeId是一個String類型,表示節(jié)點的id編號)。與此同時,InFlightRequests 還提供了許多管理類的方法,并且通過配置參數(shù)還可以限制每個連接(也就是客戶端與 Node 之間的連接)最多緩存的請求數(shù)。這個配置參數(shù)為 max.in.flight.request.per. connection , 默認(rèn)值為 5,即每個連接最多只能緩存 5 個未響應(yīng)的請求,超過該數(shù)值之后就不能再向這個連接發(fā)送更多的請求了,除非有緩存的請求收到了響應(yīng)( Response )。

在上圖中還可以看到,當(dāng)發(fā)送成功了之后,會將Response返回給InFlightRequests 中,然后從隊列里面將對應(yīng)的Request移除,說明這個請求就再也不需要存著了。保存的意義其實也就是為了可以重試,當(dāng)把Response返回的時候,如果失敗了,那么其實是可以重試的,并不是無窮的,而是有一個參數(shù)的上限。

通過比較 Deque 的 size 這個參數(shù)的大小來判斷對應(yīng)的 Node 中是否己經(jīng)堆積了很多未響應(yīng)的消息,如果真是此,那么說明這個 Node 節(jié)點負(fù)載較大或網(wǎng)絡(luò)連接有問題,再繼續(xù)發(fā)送請求會增大請求超時的可能。

但是可能存在一個有序性的問題。

假如代碼中:

send(a);

send(b);

有沒有一種可能,到達(dá)服務(wù)端broker的log數(shù)據(jù)文件中的時候,文件中的數(shù)據(jù)長這樣?

b

a

比如 a先失敗了,進(jìn)緩存, b成功了,a又發(fā)送成功了。也就是producer內(nèi)部的重試機(jī)制,有可能造成數(shù)據(jù)在服務(wù)端存儲的亂序。

這個流程不像我們代碼里自己設(shè)置,如果發(fā)送失敗了去重試,那樣其實就變成同步了,只有一條成功了才繼續(xù)發(fā)嚇一條。

重要的生產(chǎn)者參數(shù)

acks

acks 是控制 kafka 服務(wù)端向生產(chǎn)者應(yīng)答消息寫入成功的條件;

生產(chǎn)者根據(jù)得到的確認(rèn)信息,來判斷消息發(fā)送是否成功;

那么涉及到ack的有兩個問題?其參數(shù)起什么作用?ack參數(shù)有哪幾種類型,分別有什么特點。

Producer 往 Broker 發(fā)送消息應(yīng)答機(jī)制

kafka 在 producer 里面提供了消息確認(rèn)機(jī)制。我們可以通過配置來決定消息發(fā)送到對應(yīng)分區(qū)的幾個副 本 才 算 消 息 發(fā) 送 成 功 ???以 在 構(gòu) 造 producer 時 通 過 acks 參 數(shù) 指 定 ( 在 0.8.2.X 前 是 通過 request.required.acks 參數(shù)設(shè)置的)。這個參數(shù)支持以下三種值:

acks = 0:

意味著如果生產(chǎn)者能夠通過網(wǎng)絡(luò)把消息發(fā)送出去,那么就認(rèn)為消息已成功寫入 kafka 。在這種情況下還是有可能發(fā)生錯誤,比如發(fā)送的對象不能被序列化或者網(wǎng)卡發(fā)生故障,但如果是分區(qū)離線或整個集群長時間不可用,那就不會收到任何錯誤。在 acks=0 模式下的運行速度是非??斓?#xff08;這就是為什么很多基準(zhǔn)測試都是基于這個模式),你可以得到驚人的吞吐量和帶寬利用率,不過如果選擇了這種模式,大概率會丟失一些消息。

acks = 1:

意味著 leader 在收到消息并把它寫入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤上)時會返回確認(rèn)或錯誤響應(yīng)。在這個模式下,如果發(fā)生正常的 leader 選舉,生產(chǎn)者會在選舉時收到一個LeaderNotAvailableException 異常,如果生產(chǎn)者能恰當(dāng)?shù)靥幚磉@個錯誤,它會重試發(fā)送悄息,最終消息會安全到達(dá)新的 leader 那里。不過在這個模式下仍然有可能丟失數(shù)據(jù),比如消息已經(jīng)成功寫入 leader,但在消息被復(fù)制到 follower 副本之前 leader 發(fā)生崩潰。

acks = all:

(這個和 request.required.acks = -1 含義一樣):意味著 leader 在返回確認(rèn)或錯誤響應(yīng)之前,會等待所有同步副本都收到悄息。如果和 min.insync.replicas 參數(shù)結(jié)合起來,就可以決定在返回確認(rèn)前至少有多少個副本能夠收到悄息,生產(chǎn)者會一直重試直到消息被成功提交。不過這也是最慢的做法,因為生產(chǎn)者在繼續(xù)發(fā)送其他消息之前需要等待所有副本都收到當(dāng)前的消息。

根據(jù)實際的應(yīng)用場景,我們設(shè)置不同的 acks,以此保證數(shù)據(jù)的可靠性。

acks含義
0Producer 往集群發(fā)送數(shù)據(jù)不需要等到集群的確認(rèn)信息,不確保消息發(fā)送成功。安全性最低但是效率最高。
1Producer 往集群發(fā)送數(shù)據(jù)只要 leader 成功寫入消息就可以發(fā)送下一條,只確保 Leader 接收成功。
-1 或 allProducer 往集群發(fā)送數(shù)據(jù)需要所有的 ISR Follower 都完成從 Leader 的同步才會發(fā)送下一條,確保Leader 發(fā)送成功和所有的副本都成功接收。安全性最高,但是效率最低。

生產(chǎn)者將 acks 設(shè)置為 all,是否就一定不會丟數(shù)據(jù)呢?(-1是代表所有的ISR副本拿到
否!如果在某個時刻 ISR 列表只剩 leader 自己了,那么就算 acks=all,收到這條數(shù)據(jù)還是只有一個節(jié)點;

可以配合另外一個參數(shù)緩解此情況: 最小同步副本數(shù)>=2
BROKER 端參數(shù): min.insync.replicas(默認(rèn) 1)

如果讓min.insync.replicas = 分區(qū)副本總數(shù),那么這個分區(qū)的可用性就會極大程度的降低(經(jīng)常不可用,因為只要有一個副本掉隊,那么整個分區(qū)就不可用了,導(dǎo)致分區(qū)經(jīng)常不能讀寫)

所以綜上所述,kafka中沒有一個盡善盡美的方案,都是在取舍中進(jìn)行的。

但是還存在一個問題,就是如果我的數(shù)據(jù)又不能丟,產(chǎn)生數(shù)據(jù)的速度還特別快,那么一定得配置 ack = -1

但是數(shù)據(jù)寫入速度就跟不上產(chǎn)生得速度,那就會在我的生產(chǎn)端出現(xiàn)數(shù)據(jù)積壓。

解決辦法就是:**加分區(qū),擴(kuò)集群 ** 生產(chǎn)者寫的時候,有的寫0 有的寫1 有的寫2 這樣就增加了吞吐量。也就是加分區(qū)增加并行度,擴(kuò)展了吞吐量。

一般情況 配置 ack = 1,中庸之道,畢竟節(jié)點不可能老掛。

kafka本身,作為流失計算的緩沖用的,流式計算,多半是無法保證百分之一百正確的。

重要的生產(chǎn)者參數(shù)

max.request.size

這個參數(shù)用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為 1048576B ,即 lMB 一般情況下,這個默認(rèn)值就可以滿足大多數(shù)的應(yīng)用場景了。

這個參數(shù)還涉及一些其它參數(shù)的聯(lián)動,比如 broker 端(topic 級別參數(shù))的 message.max.bytes 參數(shù)( 默 認(rèn) 1000012), 如 果 配 置 錯 誤 可 能 會 引 起 一 些 不 必 要 的 異 常 ; 比 如 將 broker 端 的message.max.bytes 參數(shù)配置為 10 ,而 max.request.size 參數(shù)配置為 20,那么當(dāng)發(fā)送一條大小為 15B的消息時,生產(chǎn)者客戶端就會報出異常;

retries 和 retry.backoff.ms

retries 參數(shù)用來配置生產(chǎn)者重試的次數(shù),默認(rèn)值為 0,即在發(fā)生異常的時候不進(jìn)行任何重試動作。消息在從生產(chǎn)者發(fā)出到成功寫入服務(wù)器之前可能發(fā)生一些臨時性的異常,比如網(wǎng)絡(luò)抖動、 leader 副本的選舉等,這種異常往往是可以自行恢復(fù)的,生產(chǎn)者可以通過配置 retries 大于 0 的值,以此通過內(nèi)部重試來恢復(fù)而不是一味地將異常拋給生產(chǎn)者的應(yīng)用程序。如果重試達(dá)到設(shè)定的次數(shù),那么生產(chǎn)者就會放棄重試并返回異常。

重試還和另一個參數(shù) retry.backoff.ms 有關(guān),這個參數(shù)的默認(rèn)值為 100,它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。

如果將 retries 參數(shù)配置為非零值,并且 max .in.flight.requests.per.connection 參數(shù)配置為大于 1 的值,那可能會出現(xiàn)錯序的現(xiàn)象:如果批次 1 消息寫入失敗,而批次 2 消息寫入成功,那么生產(chǎn)者會重試發(fā)送批次 1 的消息,此時如果批次 1 的消息寫入成功,那么這兩個批次的消息就出現(xiàn)了錯序。對于某些應(yīng)用來說,順序性非常重要 ,比如 MySQL binlog 的傳輸,如果出現(xiàn)錯誤就會造成非常嚴(yán)重的后果;

一般而言,在需要保證消息順序的場合建議把參數(shù) max.in.flight.requests.per.connection 配置為 1 ,而不是把 retries 配置為 0,不過這樣也會影響整體的吞吐。

compression.type

這個參數(shù)用來指定消息的壓縮方式,默認(rèn)值為“none ",即默認(rèn)情況下,消息不會被壓縮。

該參數(shù)還可以配置為 “gzip”,“snappy” 和 “l(fā)z4”。

對消息進(jìn)行壓縮可以極大地減少網(wǎng)絡(luò)傳輸、降低網(wǎng)絡(luò) I/O,從而提高整體的性能 。

**消息壓縮是一種以時間換空間的優(yōu)化方式,如果對時延有一定的要求,則不推薦對消息進(jìn)行壓縮;這個也是一種權(quán)衡 **

batch.size

每個 Batch 要存放 batch.size 大小的數(shù)據(jù)后,才可以發(fā)送出去。比如說 batch.size 默認(rèn)值是 16KB,那么里面湊夠 16KB 的數(shù)據(jù)才會發(fā)送。

理論上來說,提升 batch.size 的大小,可以允許更多的數(shù)據(jù)緩沖在 recordAccumulator 里面,那么一次Request 發(fā)送出去的數(shù)據(jù)量就更多了,這樣吞吐量可能會有所提升。

但是 batch.size 也不能過大,要是數(shù)據(jù)老是緩沖在 Batch 里遲遲不發(fā)送出去,那么發(fā)送消息的延遲就會很高。

一般可以嘗試把這個參數(shù)調(diào)節(jié)大些,利用生產(chǎn)環(huán)境發(fā)消息負(fù)載測試一下。

所以對應(yīng)參數(shù)的設(shè)置需要 最佳實踐 和 均衡問題來實現(xiàn)。

linger.ms

這個參數(shù)用來指定生產(chǎn)者發(fā)送 ProducerBatch 之前等待更多消息( ProducerRecord )加入ProducerBatch 時間,默認(rèn)值為 0。生產(chǎn)者客戶端會在 ProducerBatch 填滿或等待時間超過 linger.ms

增大這個參數(shù)的值會增加消息的延遲,但是同時能提升一定的吞吐量。

在這里插入圖片描述

在這里插入圖片描述

linger.ms = 0

batchsize = 100

那么是不是batchsize 就無效了。

設(shè)置了linger.ms = 0 理論上應(yīng)該馬上走,但是此時 sender線程忙不過來,所以只能阻塞著。

linger.ms = 0 相當(dāng)于一旦有車就立馬接走,沒車就阻塞著。

enable.idempotence

是否開啟冪等性功能,詳見后續(xù)原理加強(qiáng);

int a = 1;

a++; // 非冪等操作

val map = new HashMap()

map.put(“a”,1); // 冪等操作

在 kafka 中,同一條消息,生產(chǎn)者如果多次重試發(fā)送,在服務(wù)器中的結(jié)果如果還是只有一條,這就是具備冪等性;否則,就不具備冪等性!

所以,producer內(nèi)部有重試機(jī)制

1.有可能造成服務(wù)端數(shù)據(jù)的亂序

2.有可能造成服務(wù)端數(shù)據(jù)的重復(fù)

partitioner.class

用來指定分區(qū)器,默認(rèn):org.apache.kafka.internals.DefaultPartitioner

默認(rèn)分區(qū)器的分區(qū)規(guī)則:

  • 如果數(shù)據(jù)中有 key,則按 key 的 murmur hash 值 % topic 分區(qū)總數(shù)得到目標(biāo)分區(qū)
  • 如果數(shù)據(jù)只有 value,則在各個分區(qū)間

自定義 partitioner 需要實現(xiàn) org.apache.kafka.clients.producer.Partitioner 接口

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {// 這里可以根據(jù)自己的業(yè)務(wù)邏輯計算出要發(fā)送到哪個分區(qū)int numPartitions = cluster.partitionCountForTopic(topic);if (key == null) {return 0;}return Math.abs(key.hashCode() % numPartitions);}@Overridepublic void close() {// do nothing}@Overridepublic void configure(Map<String, ?> configs) {// do nothing}
}使用自定義的Partitioner也很簡單,只需要在創(chuàng)建Producer時指定即可。例如:Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);
http://m.aloenet.com.cn/news/36014.html

相關(guān)文章:

  • 類似于眾人幫的做任務(wù)賺傭金網(wǎng)站seo案例分析及解析
  • 給你一個網(wǎng)站你怎么做的嗎網(wǎng)盤網(wǎng)頁版
  • 深圳建設(shè)網(wǎng)站制作公司訊展網(wǎng)站優(yōu)化推廣
  • 網(wǎng)站制作的基本步驟診斷網(wǎng)站seo現(xiàn)狀的方法
  • 蕪湖建設(shè)工程質(zhì)量監(jiān)督站網(wǎng)站福建seo快速排名優(yōu)化
  • 網(wǎng)站建設(shè)用啥技術(shù)新聞頭條 今天
  • 怎么下載wordpress內(nèi)置的主題廣州seo公司如何
  • 源代碼如何做網(wǎng)站百度移動端點贊排名軟件
  • ps6做網(wǎng)站點哪里保存seo最新
  • 成都三合一網(wǎng)站建設(shè)推廣新產(chǎn)品最好的方法
  • 武漢哪里做網(wǎng)站好找個免費的網(wǎng)站
  • 提供網(wǎng)站制作百度風(fēng)云排行榜官網(wǎng)
  • 網(wǎng)站建設(shè)公司該怎么銷售微信推廣方法
  • 上海企業(yè)招聘信息發(fā)布平臺長沙seo優(yōu)化推薦
  • 網(wǎng)站建設(shè)原因分析win7系統(tǒng)優(yōu)化軟件
  • 中國新聞社招聘2023年褲子seo關(guān)鍵詞
  • 中國最近軍事新聞視頻桂林網(wǎng)站優(yōu)化
  • 網(wǎng)站推廣解釋中國有幾個搜索引擎
  • 網(wǎng)站廣告輪播代碼運營是做什么的
  • 邀請注冊推廣賺錢seo深圳優(yōu)化
  • 如何在記事本中做網(wǎng)站鏈接長沙自動seo
  • 黃石建設(shè)信息網(wǎng)站國內(nèi)網(wǎng)絡(luò)銷售平臺有哪些
  • 公司網(wǎng)站建設(shè)開發(fā)濟(jì)南興田德潤優(yōu)惠嗎推廣平臺排行榜app
  • 行業(yè)網(wǎng)站建設(shè)費用百度seo推廣軟件
  • 公司 做網(wǎng)站推廣信息發(fā)布平臺
  • 做網(wǎng)站是什么專業(yè)什么工作百度后臺推廣登錄
  • 做pc端網(wǎng)站要成本么廣告推廣軟件
  • wordpress loading優(yōu)化
  • wordpress手機(jī)版怎么注冊seo站
  • 做設(shè)計排版除了昵圖網(wǎng)還有什么網(wǎng)站中國新冠疫情最新消息