金華網(wǎng)站建設(shè)網(wǎng)絡(luò)推廣公司排名
1 生產(chǎn)者
生產(chǎn)邏輯
- 配置生產(chǎn)者客戶端參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例。
- 構(gòu)建待發(fā)送的消息。
- 發(fā)送消息
- 關(guān)閉實(shí)列
參數(shù)說明
- bootstrap.servers :用來指定生產(chǎn)者客戶端鏈接Kafka集群搜需要的broker地址清單,具體格式 host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)地址中間,號分割,參數(shù)默認(rèn) 空串。這里要注意并不需要配置所有的broker地址,應(yīng)為生產(chǎn)者會在broker中找到其他的broker地址,但是建議配置兩個(gè)以上,當(dāng)其中一個(gè)broker宕機(jī)時(shí)還可以通過另外一個(gè)工作。
- key.serializer和value.serializer:broker端接受的消息必須以字節(jié)數(shù)組的形式存在。
- client.id : 默認(rèn) “” 用來設(shè)置KafkaProducer對應(yīng)的客戶端id
- max.block.ms:默認(rèn)值 60000 用來控制KafkaProducer 中send()方法和partitionsFor()方法的阻塞時(shí)間
- partitioner.class:用來指定分區(qū)器
- enable.idempotence:默認(rèn)值 false 是否開啟冪等性
- interceptor.classes 用來設(shè)置生產(chǎn)者攔截器
- max.in.flight.requests.per.connection:5 限制每個(gè)連接最多緩存的請求數(shù)
- metadata.max.age.ms: 300000 5分鐘 如果在這個(gè)時(shí)間內(nèi)元數(shù)據(jù)沒有更新的話就強(qiáng)制更新。
- transactional.id:null 設(shè)置事務(wù)id 必須唯一
- batch.size 16384(16KB): 生產(chǎn)者客戶端中用于緩存消息的緩沖區(qū)大小。
序列化器(Serializer)
生產(chǎn)者發(fā)送消息到kafka是需要將對象序列化城流才能訪問到kafka,消費(fèi)者需要把流反序列化 才能進(jìn)行 消費(fèi)。
分區(qū)器
消息在通過send()方法發(fā)送到broker的過程中,有可能需要經(jīng)過攔截器、序列化器和分區(qū)器(partitioner)的一系列作用之后才能被真正的發(fā)往broker。攔截器一般不是必須的,而序列化器時(shí)必須的必須的。消息經(jīng)過序列化之后就需要確定它發(fā)送的分區(qū),如果消息ProducerRecord中指定了partition字段,那么就不需要分區(qū)器的作用,因?yàn)閜artition代表的就是所要發(fā)往的分區(qū)。
分區(qū)器時(shí)通過kay來計(jì)算partition的值,分區(qū)器的作用就是為消息分配分區(qū)。
kafka的默認(rèn)分區(qū)器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
生產(chǎn)者攔截器(Interceptor)
生產(chǎn)者攔截器主要用來在消息發(fā)送前做一些準(zhǔn)備工作,如按照規(guī)則過濾不符合條件的消息,修改消息等,也可以用來做一些定制化的需求,kafkaProducer在將消息序列化和計(jì)算分區(qū)之前會調(diào)用攔截器的onSend()方法來對消息進(jìn)行相應(yīng)的定制化
原理分析
- 主線程中由KafkaPartition創(chuàng)建消息
- 通過攔截器
- 通過序列化器
- 通過分區(qū)器
- 到達(dá)消息累加器(RecordAccumulator)主要是用來收集消息方便 Sender可以批量發(fā)送