020網(wǎng)站建設(shè)專(zhuān)業(yè)網(wǎng)站建設(shè)公司
文章目錄
- 背景
- 環(huán)境
- 工具選型
- 實(shí)操
- MM1
- MM2
- 以MM2集群運(yùn)行
- 以Standalone模式運(yùn)行
- 驗(yàn)證
- 附錄
- MM2配置表
- 其他
背景
一個(gè)測(cè)試環(huán)境的kafka集群,Topic有360+,Partition有2000+,部署在虛擬機(jī)上,由于多方面原因,要求遷移至k8s容器內(nèi)(全量遷移),正好可以拿來(lái)練一下手。本文主要記錄對(duì)MM1和MM2的實(shí)際操作過(guò)程,以及使用過(guò)程中遇到的問(wèn)題及解決方案。
環(huán)境
source集群:kafka-2.6.0、2個(gè)broker、虛擬機(jī)
target集群:kafka-2.6.0、3個(gè)broker、k8s
工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)
需求:Topic名稱(chēng)不能改變、數(shù)據(jù)完整
條件:target集群需要開(kāi)啟自動(dòng)創(chuàng)建Topic:auto.create.topics.enable=true
工具選型
本質(zhì)上MM1是Kafka的消費(fèi)者和生產(chǎn)者結(jié)合體,可以有效地將數(shù)據(jù)從源群集移動(dòng)到目標(biāo)群集,但沒(méi)有提供太多其他功能。
并且在MM1多年的使用過(guò)程中發(fā)現(xiàn)了以下局限性:
- 靜態(tài)的黑名單和白名單
- Topic信息不能同步,所有Topic同步到目標(biāo)端都只有一個(gè)Partition
- 必須通過(guò)手動(dòng)配置來(lái)解決active-active場(chǎng)景下的循環(huán)同步問(wèn)題(MM2為解決這個(gè)問(wèn)題,也做了體驗(yàn)很不好的改動(dòng))
- rebalance導(dǎo)致的性能問(wèn)題
- 缺乏監(jiān)控手段
- 無(wú)法保證Exactly Once
- 無(wú)法提供容災(zāi)恢復(fù)
- 無(wú)法同步Topic列表,只能同步有數(shù)據(jù)的Topic
MM2是基于kafka connect框架開(kāi)發(fā)的。與其它的kafka connecet一樣MM2有source connector和sink connetor組成,可以支持同步以下數(shù)據(jù):
- 完整的Topic列表
- Topic配置
- ACL信息(如果有)
- consumer group和offset(kafka2.7.0之后版本才行)
- 其他功能:
- 支持循環(huán)同步檢測(cè)
- 多集群自定義同步(同一個(gè)任務(wù)中,可以多集群同步:A->B、B->C、B->D)
- 提供可監(jiān)控Metrics
- 可通過(guò)配置保證Exactly Once
- …
實(shí)操
秉著實(shí)操前先演練的原則,我自己搭建了一個(gè)和目標(biāo)集群相同配置的集群,用于驗(yàn)證不同工具的操作結(jié)果。有足夠把握之后,再對(duì)目標(biāo)集群實(shí)際操作。
MM1
執(zhí)行 --help 查看參數(shù)選項(xiàng):
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option Description
------ -----------
--abort.on.send.failure <String: Stop Configure the mirror maker to exit onthe entire mirror maker when a send a failed send. (default: true)failure occurs>
--consumer.config <String: config file> Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String: The consumer rebalance listener to useA custom rebalance listener of type for mirror maker consumer.ConsumerRebalanceListener>
--help Print usage information.
--message.handler <String: A custom Message handler which will processmessage handler of type every record in-between consumer andMirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom messageArguments passed to message handler handler for mirror maker.constructor.>
--new.consumer DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of Number of consumption streams.threads> (default: 1)
--offset.commit.interval.ms <Integer: Offset commit interval in ms.offset commit interval in (default: 60000)millisecond>
--producer.config <String: config file> Embedded producer config.
--rebalance.listener.args <String: Arguments used by custom rebalanceArguments passed to custom rebalance listener for mirror maker consumer.listener constructor as a string.>
--version Display Kafka version.
--whitelist <String: Java regex Whitelist of topics to mirror.(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
核心參數(shù)就兩個(gè):消費(fèi)者和生產(chǎn)者的配置文件:
consumer.properties:(消費(fèi)source集群)
bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
producer.properties:(發(fā)送消息至目標(biāo)集群)
bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3
執(zhí)行腳本:
./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"
MM1比較簡(jiǎn)單,只要兩個(gè)配置文件沒(méi)問(wèn)題,sasl配置正確,基本就OK了,適合簡(jiǎn)單的數(shù)據(jù)同步,比如指定topic進(jìn)行同步。
MM2
有四種運(yùn)行MM2的方法:
- As a dedicated MirrorMaker cluster.(作為專(zhuān)用的MirrorMaker群集)
- As a Connector in a distributed Connect cluster.(作為分布式Connect群集中的連接器)
- As a standalone Connect worker.(作為獨(dú)立的Connect工作者)
- In legacy mode using existing MirrorMaker scripts.(在舊模式下,使用現(xiàn)有的MirrorMaker腳本。)
本文介紹第一種和第三種:作為專(zhuān)用的MirrorMaker群集、作為獨(dú)立的Connect工作者,第二種需要搭建connect集群,操作比較復(fù)雜。
以MM2集群運(yùn)行
這種模式是最簡(jiǎn)單的,只需要提供一個(gè)配置文件即可,配置文件定制化程度比較高,根據(jù)業(yè)務(wù)需求配置即可
老樣子,執(zhí)行 --help 看看使用說(shuō)明:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.propertiesMirrorMaker 2.0 driverpositional arguments:mm2.properties MM2 configuration file.optional arguments:-h, --help show this help message and exit--clusters CLUSTER [CLUSTER ...]Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
可以看到,參數(shù)簡(jiǎn)單了許多,核心參數(shù)就一個(gè)配置文件。
mm2.properties:
name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定義集群別名
clusters = event-center, event-center-new# 設(shè)置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 設(shè)置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 開(kāi)啟event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允許同步topic的正則
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2內(nèi)部同步機(jī)制使用的topic,replication數(shù)量設(shè)置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定義參數(shù)
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=60
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 遠(yuǎn)端創(chuàng)建新topic的replication數(shù)量設(shè)置
replication.factor=3
需要注意的是:replication.policy.class 默認(rèn)為:DefaultReplicationPolicy,這個(gè)策略會(huì)把同步至目標(biāo)集群的topic都加上一個(gè)源集群別名的前綴,比如源集群別名為A,topic為:bi-log,該topic同步到目標(biāo)集群后會(huì)變成:A.bi-log,為啥這么做呢,就是為了避免雙向同步的場(chǎng)景出現(xiàn)死循環(huán)。
官方也給出了解釋:
這是 MirrorMaker 2.0 中的默認(rèn)行為,以避免在復(fù)雜的鏡像拓?fù)渲兄貙?xiě)數(shù)據(jù)。 需要在復(fù)制流設(shè)計(jì)和主題管理方面小心自定義此項(xiàng),以避免數(shù)據(jù)丟失。 可以通過(guò)對(duì)“replication.policy.class”使用自定義復(fù)制策略類(lèi)來(lái)完成此操作。
針對(duì)如何自定義策略及使用方法,見(jiàn)我的另一篇文章:
為了保證腳本后臺(tái)運(yùn)行,寫(xiě)一個(gè)腳本包裝一下:
run-mm2.sh:
#!/bin/bashexec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &
之后執(zhí)行腳本即可。
以Standalone模式運(yùn)行
這種模式會(huì)麻煩點(diǎn),需要提供一個(gè)kafka,作為worker節(jié)點(diǎn)來(lái)同步數(shù)據(jù),使用的腳本為:connect-standalone.sh
–help看看如何使用:
./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
需要兩個(gè)配置文件,一個(gè)是作為worker的kafka集群信息(worker.properties),另一個(gè)是同步數(shù)據(jù)的配置(connector.properties)
worker.properties:
bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.properties:
name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 這個(gè)配置會(huì)使同步之后的Topic都加上一個(gè)前綴,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=30
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=30
# 連接器消費(fèi)者預(yù)讀隊(duì)列大小
# readahead.queue.capacity=500
# 使用自定義策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3
執(zhí)行:
./connect-standalone.sh worker.properties connector.properties
這種方式做一個(gè)簡(jiǎn)單的介紹,我最后采用的是上一種方式,比較簡(jiǎn)單直接
驗(yàn)證
驗(yàn)證:
-
消息數(shù)量 OK
使用kafka-tool工具連接上兩個(gè)集群進(jìn)行比對(duì)
-
Topic數(shù)量 OK
- source:
./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt
- sink
./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt
- command.properties示例:
security.protocol = SASL_PLAINTEXT sasl.mechanism = PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
-
新消息是否同步 OK
-
新Topic是否同步 OK
-
Consumer是否同步 NO
./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt
? 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils
-
consumer offset是否同步 NO
-
ACL是否同步 OK
通過(guò)kafka-acls.sh或者客戶(hù)端工具kafka-tool可以查看
附錄
MM2配置表
property | default value | description |
---|---|---|
name | required | name of the connector, e.g. “us-west->us-east” |
topics | empty string | regex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported. |
topics.blacklist | “..internal, ..replica, __consumer_offsets” or similar | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. “.*” |
groups.blacklist | empty string | groups to exclude from replication |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes |
emit.heartbeats.enabled | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints.enabled | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics.enabled | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups.enabled | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time |
offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time |
replication.factor | 2 | used when creating remote topics |
其他
參考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0
https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new
https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf
https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide