長沙做電商網(wǎng)站設(shè)計友情鏈接檢測工具
文章目錄
- 前言
- kafka2.0與3.x對比
- 準(zhǔn)備工作
- JDK安裝
- kafka安裝
- 服務(wù)器增加hosts
- 修改Kraft協(xié)議配置文件
- 格式化存儲目錄
- 啟動集群
- 停止集群
- 測試Kafka集群
- 創(chuàng)建topic
- 查看topic列表
- 查看消息詳情
- 生產(chǎn)消息
- 消費消息
- 查看消費者組
- 查看消費者組列表
前言
相信很多同學(xué)都用過Kafka2.0吧,其中需要zookepper集群來做元數(shù)據(jù)管理和集群選舉,大大增加了運維成本,而且也很是影響Kafka性能。言歸正傳今天我們就分享一期Kafka3.x Kraft模式集群搭建,簡直不要太爽。
kafka2.0與3.x對比
上圖中黑色代表broker(消息代理服務(wù)),褐色/藍(lán)色代表Controller(集群控制器服務(wù))
左圖(kafka2.0):一個集群所有節(jié)點都是broker角色,kafka從三個broker中選舉出來一個Controller控制器,控制器將集群元數(shù)據(jù)信息(比如主題分類、消費進(jìn)度等)保存到zookeeper,用于集群各節(jié)點之間分布式交互。
右圖(kafka3.0):假設(shè)一個集群有四個broker,指定三個作為Conreoller角色(藍(lán)色),從三個Controller中選舉出來一個Controller作為主控制器(褐色),其他的2個備用。zookeeper不再被需要!相關(guān)的元數(shù)據(jù)信息以kafka日志的形式存在(即:以消息隊列消息的形式存在)。
controller通信端口:9093, 作用與zk的2181端口類似 。
這樣做的好處有以下幾個:
- Kafka 不再依賴外部框架,而是能夠獨立運行;
- controller 管理集群時,不再需要從 zookeeper 中先讀取數(shù)據(jù),集群性能上升;
- 由于不依賴 zookeeper,集群擴展時不再受到 zookeeper 讀寫能力限制;
在搭建kafka3.0集群之前, 我們需要先做好kafka實例角色規(guī)劃。(三個broker, 需要通過主動配置指定三個作為Controller, Controller需要奇數(shù)個, 這一點和zk是一樣的)
主機名稱 | ip | 角色 | node.id |
---|---|---|---|
senfel-test | 192.168.112.10 | broker,controller | 1 |
senfel-test2 | 192.168.112.130 | broker,controller | 2 |
senfel-test3 | 192.168.112.129 | broker,controller | 3 |
準(zhǔn)備工作
JDK安裝
kafka3.x不再支持JDK8,依然可用,建議安裝JDK11或JDK17。
新建kafka持久化日志數(shù)據(jù)mkdir -p /data/kafka;并保證安裝kafka的用戶具有該目錄的讀寫權(quán)限。
各個機器節(jié)點執(zhí)行:
安裝jdk(kafka3.x不再支持JDK8,目前暫時還可以用,建議安裝JDK11或JDK17, 這里安裝jdk11)
下載安裝jdk省略
kafka安裝
下載kafka
cd /
mkdir tools
cd tools
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -xf kafka_2.12-3.5.2.tgz
chown -R kafka:kafka kafka_2.12-3.5.2*
mkdir -p /data/kafka
chown -R kafka:kafka /data/kafka
服務(wù)器增加hosts
vim /etc/hosts,各個節(jié)點,添加如下內(nèi)容:
192.168.112.10 data-vm1
192.168.112.130 data-vm2
192.168.112.129 data-vm3
修改Kraft協(xié)議配置文件
在kafka3.x版本中,使用Kraft協(xié)議代替zookeeper進(jìn)行集群的Controller選舉,所以要針對它進(jìn)行配置。
vim /tools/kafka_2.12-3.5.2/config/kraft/server.properties
具體配置參數(shù)如下:
# data-vm1節(jié)點
node.id=1
process.roles=broker,controller
listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# data-vm2節(jié)點
node.id=2
process.roles=broker,controller
listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
# data-vm3節(jié)點
node.id=3
process.roles=broker,controller
listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
node.id:這將作為集群中的節(jié)點 ID,唯一標(biāo)識,按照我們事先規(guī)劃好的(上文),在不同的服務(wù)器上這個值不同。其實就是kafka2.0中的broker.id,只是在3.0版本中kafka實例不再只擔(dān)任broker角色,也有可能是controller角色,所以改名叫做node節(jié)點。
process.roles:一個節(jié)點可以充當(dāng)broker或controller或兩者兼而有之。按照我們事先規(guī)劃好的(上文),在不同的服務(wù)器上這個值不同。多個角色用逗號分開。
listeners: broker將使用9092端口,而kraft controller控制器將使用9093端口。
advertised.listeners: 這里指定kafka通過代理暴漏的地址,如果都是局域網(wǎng)使用,就配置PLAINTEXT://:9092即可。
controller.quorum.voters:這個配置用于指定controller主控選舉的投票節(jié)點,所有process.roles包含controller角色的規(guī)劃節(jié)點都要參與,即:zimug1、zimug2、zimug3。其配置格式為:node.id1@host1:9093,node.id2@host2:9093
log.dirs:kafka 將存儲數(shù)據(jù)的日志目錄,在準(zhǔn)備工作中創(chuàng)建好的目錄。
num_partitions: 分區(qū),一般根據(jù)broker數(shù)量進(jìn)行確定
offset.topic.replication.factor: 副本數(shù),一般根據(jù)broker數(shù)量進(jìn)行確定
所有kafka節(jié)點都要按照上文中的節(jié)點規(guī)劃進(jìn)行配置,完成config/kraft/server.properties配置文件的修改。
格式化存儲目錄
生成一個唯一的集群ID(在一臺kafka服務(wù)器上執(zhí)行一次即可),這一個步驟是在安裝kafka2.0版本的時候不存在的。
sudo /tools/kafka_2.12-3.5.2/bin/kafka-storage.sh random-uuid
MsBU_ZELT2G0E0oPFX6Gxw
使用生成的集群ID+配置文件格式化存儲目錄log.dirs,
所以這一步確認(rèn)配置及路徑確實存在,
并且kafka用戶有訪問權(quán)限(檢查準(zhǔn)備工作是否做對)
主機服務(wù)器執(zhí)行命令:
sudo /tools/kafka_2.12-3.5.2/bin/kafka-storage.sh format
-t MsBU_ZELT2G0E0oPFX6Gxw
-c /tools/kafka_2.12-3.5.2/config/kraft/server.properties
格式化操作完成之后,log.dirs?目錄下多出一個Meta.properties文件?,存儲了當(dāng)前的kafka節(jié)點的id(node.id),當(dāng)前節(jié)點屬于哪個集群(cluster.id)
[root@senfel-test tools]# cd /data/kafka/
[root@senfel-test kafka]# ll
總用量 8
-rw-r–r–. 1 root root 249 6月 4 15:46 bootstrap.checkpoint
-rw-r–r–. 1 root root 86 6月 4 15:46 meta.properties
[root@senfel-test kafka]# cat meta.properties
#Tue Jun 04 15:46:15 CST 2024
cluster.id=MsBU_ZELT2G0E0oPFX6Gxw
version=1
node.id=1
[root@senfel-test kafka]#
senfel-test2:
sudo /tools/kafka_2.12-3.5.2/bin/kafka-storage.sh format
-t MsBU_ZELT2G0E0oPFX6Gxw
-c /tools/kafka_2.12-3.5.2/config/kraft/server.properties
[root@senfel-test2 tools]# cd /data/kafka/
[root@senfel-test2 kafka]# ll
總用量 8
-rw-r–r–. 1 root root 249 6月 4 15:49 bootstrap.checkpoint
-rw-r–r–. 1 root root 86 6月 4 15:49 meta.properties
[root@senfel-test2 kafka]# cat meta.properties
#Tue Jun 04 15:49:55 CST 2024
cluster.id=N2TANgN1TDyZLAbpN36IxA
version=1
node.id=2
[root@senfel-test2 kafka]#
senfel-test3
sudo /tools/kafka_2.12-3.5.2/bin/kafka-storage.sh format
-t MsBU_ZELT2G0E0oPFX6Gxw
-c /tools/kafka_2.12-3.5.2/config/kraft/server.properties
[root@senfel-test3 /]# cd /data/kafka/
[root@senfel-test3 kafka]# ll
總用量 8
-rw-r–r–. 1 root root 249 6月 4 15:52 bootstrap.checkpoint
-rw-r–r–. 1 root root 86 6月 4 15:52 meta.properties
[root@senfel-test3 kafka]# cat meta.properties
#Tue Jun 04 15:52:11 CST 2024
cluster.id=KI0RD9FcQnaXNXeq49Gjuw
version=1
node.id=3
[root@senfel-test3 kafka]#
啟動集群
有防火墻記得放行:
[root@senfel-test bin]# vim /etc/sysconfig/iptables
-A INPUT -p tcp -m state --state NEW -m tcp --dport 9092 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 9093 -j ACCEPT
[root@senfel-test bin]# systemctl restart iptables
啟動命令:
/tools/kafka_2.12-3.5.2/bin/kafka-server-start.sh
/tools/kafka_2.12-3.5.2/config/kraft/server.properties
后臺運行:
nohup /tools/kafka_2.12-3.5.2/bin/kafka-server-start.sh
/tools/kafka_2.12-3.5.2/config/kraft/server.properties 2>&1 &
腳本:
vim kafka-start.sh
#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#啟動所有的kafka
for kafka in $kafkaServers
dossh -T $kafka <<EOFnohup /tools/kafka_2.12-3.5.2/bin/kafka-server-start.sh /tools/kafka_2.12-3.5.2/config/kraft/server.properties 1>/dev/null 2>&1 &
EOF
echo 從節(jié)點 $kafka 啟動kafka3.0...[ done ]
sleep 5
done
chmod +x kafka-start.sh
sh kafka-start.sh
停止集群
一鍵停止kafka集群各節(jié)點的腳本,與啟動腳本的使用方式及原理是一樣的。
停止命令:
sudo /tools/kafka_2.12-3.5.2/bin/kafka-server-stop.sh
執(zhí)行腳本:
#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#停止所有的kafka
for kafka in $kafkaServers
dossh -T $kafka <<EOFcd /tools/kafka_2.12-3.5.2bin/kafka-server-stop.sh
EOF
echo 從節(jié)點 $kafka 停止kafka...[ done ]
sleep 5
done
測試Kafka集群
創(chuàng)建topic
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-topics.sh
–create
–topic senfel-test-topic
–bootstrap-server data-vm1:9092
Created topic senfel-test-topic.
查看topic列表
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-topics.sh
–list
–bootstrap-server data-vm1:9092
senfel-test-topic
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-topics.sh --list --bootstrap-server data-vm2:9092
senfel-test-topic
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-topics.sh --list --bootstrap-server data-vm3:9092
senfel-test-topic
[root@senfel-test2 logs]#
查看消息詳情
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-topics.sh
–describe
–topic senfel-test-topic
–bootstrap-server data-vm1:9092
Topic: senfel-test-topic TopicId: h2Cz-yq2RYeMY3ylHuk3iw PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: senfel-test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: senfel-test-topic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: senfel-test-topic Partition: 2 Leader: 3 Replicas: 3 Isr: 3
生產(chǎn)消息
[root@senfel-test2 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-console-producer.sh
–topic senfel-test-topic
–bootstrap-server data-vm1:9092
消費消息
[root@senfel-test3 logs]# /tools/kafka_2.12-3.5.2/bin/kafka-console-consumer.sh
–topic senfel-test-topic
–from-beginning
–bootstrap-server data-vm2:9092
–group my-group
查看消費者組
#檢查消費者postition
[root@senfel-test kafka]# sudo /tools/kafka_2.12-3.5.2/bin/kafka-consumer-groups.sh
–bootstrap-server data-vm2:9092
–describe
–all-groups
sudo /tools/kafka_2.12-3.5.2/bin/kafka-consumer-groups.sh
–bootstrap-server data-vm2:9092
–describe
–group my-group-new
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-group senfel-test-topic 0 2 2 0 console-consumer-490ae3b3-e944-49be-a6e7-c4bae3696a0f /192.168.112.129 console-consumer
my-group senfel-test-topic 1 4 4 0 console-consumer-490ae3b3-e944-49be-a6e7-c4bae3696a0f /192.168.112.129 console-consumer
my-group senfel-test-topic 2 0 0 0 console-consumer-490ae3b3-e944-49be-a6e7-c4bae3696a0f /192.168.112.129 console-consumer
[root@senfel-test kafka]#
查看消費者組列表
[root@senfel-test kafka]# /tools/kafka_2.12-3.5.2/bin/kafka-consumer-groups.sh
–bootstrap-server data-vm1:9092
–list
my-group