【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程

这篇具有很好参考价值的文章主要介绍了【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。

环境

source集群:kafka-2.6.0、2个broker、虚拟机

target集群:kafka-2.6.0、3个broker、k8s

工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)

需求:Topic名称不能改变、数据完整

条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true

工具选型

本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。

并且在MM1多年的使用过程中发现了以下局限性:

  1. 静态的黑名单和白名单
  2. Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
  3. 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
  4. rebalance导致的性能问题
  5. 缺乏监控手段
  6. 无法保证Exactly Once
  7. 无法提供容灾恢复
  8. 无法同步Topic列表,只能同步有数据的Topic

MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循环同步检测
    • 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
    • 提供可监控Metrics
    • 可通过配置保证Exactly Once

实操

秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。

MM1

执行 --help 查看参数选项:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit on
  the entire mirror maker when a send      a failed send. (default: true)
  failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consuming
                                           from the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to use
  A custom rebalance listener of type      for mirror maker consumer.
  ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will process
  message handler of type                  every record in-between consumer and
  MirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom message
  Arguments passed to message handler      handler for mirror maker.
  constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirror
                                           maker (this is the default so this
                                           option will be removed in a future
                                           version).
--num.streams <Integer: Number of        Number of consumption streams.
  threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.
  offset commit interval in                (default: 60000)
  millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalance
  Arguments passed to custom rebalance     listener for mirror maker consumer.
  listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.
  (String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心参数就两个:消费者和生产者的配置文件:

consumer.properties:(消费source集群)

bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";

producer.properties:(发送消息至目标集群)

bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3

执行脚本:

./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"

MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。

MM2

有四种运行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
  • As a standalone Connect worker.(作为独立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。

以MM2集群运行

这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可

老样子,执行 --help 看看使用说明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties

MirrorMaker 2.0 driver

positional arguments:
  mm2.properties         MM2 configuration file.

optional arguments:
  -h, --help             show this help message and exit
  --clusters CLUSTER [CLUSTER ...]
                         Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,参数简单了许多,核心参数就一个配置文件。

mm2.properties:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2

# 定义集群别名
clusters = event-center, event-center-new

# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*

# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。

针对如何自定义策略及使用方法,见我的另一篇文章:

为了保证脚本后台运行,写一个脚本包装一下:

run-mm2.sh:

#!/bin/bash

exec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &

之后执行脚本即可。

以Standalone模式运行

这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh

–help看看如何使用:

./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties:

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAIN

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties:

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1

# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";

# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

执行:

./connect-standalone.sh worker.properties connector.properties

这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接

验证

验证:

  • 消息数量 OK

    使用kafka-tool工具连接上两个集群进行比对

  • Topic数量 OK

    • source:
    ./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt 
    
    • sink
    ./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt 
    
    • command.properties示例:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 

​ 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通过kafka-acls.sh或者客户端工具kafka-tool可以查看

附录

MM2配置表

property default value description
name required name of the connector, e.g. “us-west->us-east”
topics empty string regex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported.
topics.blacklist “..internal, ..replica, __consumer_offsets” or similar topics to exclude from replication
groups empty string regex of groups to replicate, e.g. “.*”
groups.blacklist empty string groups to exclude from replication
source.cluster.alias required name of the cluster being replicated
target.cluster.alias required name of the downstream Kafka cluster
source.cluster.bootstrap.servers required upstream cluster to replicate
target.cluster.bootstrap.servers required downstream cluster
sync.topic.configs.enabled true whether or not to monitor source cluster for configuration changes
sync.topic.acls.enabled true whether to monitor source cluster ACLs for changes
emit.heartbeats.enabled true connector should periodically emit heartbeats
emit.heartbeats.interval.seconds 5 (seconds) frequency of heartbeats
emit.checkpoints.enabled true connector should periodically emit consumer offset information
emit.checkpoints.interval.seconds 5 (seconds) frequency of checkpoints
refresh.topics.enabled true connector should periodically check for new topics
refresh.topics.interval.seconds 5 (seconds) frequency to check source cluster for new topics
refresh.groups.enabled true connector should periodically check for new consumer groups
refresh.groups.interval.seconds 5 (seconds) frequency to check source cluster for new consumer groups
readahead.queue.capacity 500 (records) number of records to let consumer get ahead of producer
replication.policy.class org.apache.kafka.connect.mirror.DefaultReplicationPolicy use LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms 1 day used when creating heartbeat topics for the first time
checkpoints.topic.retention.ms 1 day used when creating checkpoint topics for the first time
offset.syncs.topic.retention.ms max long used when creating offset sync topic for the first time
replication.factor 2 used when creating remote topics

其他

参考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide

TODO

后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群文章来源地址https://www.toymoban.com/news/detail-677549.html

到了这里,关于【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法

    生产环境Kafka集群压力大,Topic读写压力大,消费的lag比较大,因此通过扩容Topic的分区,增大Topic的读写性能 理论上下游消费者应该能够自动消费到新的分区,例如flume消费到了新的分区,但是实际情况是存在flink消费者没有消费到新的分区 出现无法消费topic新的分区这种情况

    2024年02月14日
    浏览(34)
  • 【kafka】记一次kafka基于linux的原生命令的使用

    环境是linux,4台机器,版本3.6,kafka安装在node 1 2 3 上,zookeeper安装在node2 3 4上。 安装好kafka,进入bin目录,可以看到有很多sh文件,是我们执行命令的基础。 启动kafka,下面的命令的后面带的配置文件的相对路径 遇到不熟悉的sh文件,直接输入名字并回车,就会提示你可用的

    2024年02月05日
    浏览(33)
  • Kafka系列之:基于Apache Kafka Connect实现端到端topic数据字段级加密的详细方法

    与其他通信工具一样,加密在 Apache Kafka 中很有价值,可以保护数据。 希望通过与 Apache Kafka Connect 集成来加密数据来实现这一目标。 Kafka 可以利用多种安全功能,从身份验证和授权到基于 TLS 的数据进出 Kafka 主题的线上流量加密。尽管这些措施可以保护传输中的数据,但它

    2024年02月13日
    浏览(31)
  • Centos8更换yum阿里云源报错Failed to connect to mirrors.aliyuncs.com port 80: Connection refused]

    2021年12月31日CentOS 8 EOL。按照社区规则,CentOS 8的源地址 http://mirror.centos.org/centos/8/ 内容已移除,目前第三方的镜像站中均已移除CentOS 8的源。阿里云的源 http://mirrors.cloud.aliyuncs.com和http://mirrors.aliyun.com 也无法同步到CentOS 8的源。当您在阿里云上继续使用默认配置的CentOS 8的源会

    2024年02月15日
    浏览(35)
  • CondaHTTPError:HTTP 000 CONNECTION FAILED for url<https://mirrors.tuna.tsinghua.edu.cn/anaco

    主要原因:配置没配对 解决办法: ①把镜像源https改成了http ②删除默认源 ③设置一下 引用:解决方法集合CondaHTTPError:HTTP 000 CONNECTION FAILED for url<https://mirrors.tuna.tsinghua.edu.cn/anaco_condahttperror: http 000 connection failed for url-CSDN博客 

    2024年03月20日
    浏览(33)
  • 解决CondaHTTPError: HTTP 000 CONNECTION FAILED for url <https://mirrors.tuna.tsinghua.edu.cn/anaconda/

    背景: Anaconda 创建新的环境conda create -n name 或者安装package conda install xxx 时总出错:CondaHTTPError: HTTP 000 CONNECTION FAILED for url https://mirrors.tuna.tsinghua.edu.cn/anaconda/... 如下: 最终解决方案如下: 在ubuntu 的home 目录下,ctrl+h,显示所有隐藏文件,打开.condarc文件, 或终端 将该文件内

    2024年02月10日
    浏览(29)
  • 解决CondaHTTPError: HTTP 000 CONNECTION FAILED for url <https://mirrors.tuna.tsinghua.edu.cn/anacond

    目录 解决CondaHTTPError: HTTP 000 CONNECTION FAILED for url https://mirrors.tuna.tsinghua.edu.cn/anaconda错误 1. 检查网络连接 2. 更换Conda镜像源 3. 使用代理 4. 升级Conda版本 5. 重新配置环境 结论 在使用Conda进行Python包管理和环境管理时,有时可能会遇到CondaHTTPError: HTTP 000 CONNECTION FAILED for url ​​

    2024年02月05日
    浏览(31)
  • 【Milvus】记录一次基于milvus-backup做的Milvus备份与恢复

    milvus:v2.2.4 go:1.20.2 darwin/amd64 milvus-backup:v0.2.2 https://github.com/zilliztech/milvus-backup/releases 如果你的milvus是2.2.9版本及以上,可以直接下载最新的版本:git clone https://github.com/zilliztech/milvus-backup.git 默认使用的配置文件在config目录下,如果不需要手动指定,直接修改改文件即可,

    2024年02月16日
    浏览(32)
  • CondaHTTPError: HTTP 000 CONNECTION FAILED for url <http://mirrors.tuna.tsinghua.edu.cn/anaconda/pkg

    要在实验室的服务器上复现一下U-Mamba,要先建立一个虚拟环境umamba 但是报错了: 到网上查找解决方法,首先是要先查看自己的conda的通道配置: 通过查找的方法,首先把镜像源的https改为http,并且删除默认源default,这样配置改为了: 修改完之后再创建虚拟环境,但结果仍

    2024年04月15日
    浏览(33)
  • 【Git】git push --mirror 迁移项目到其他代码仓库,且保留分支与提交记录。

    需要把云效代码仓库的某些项目,迁移到公司内部的代码仓库。且需要保留迁移项目的分支和提交记录。 百度各种方法,并不停本地测试,结合以往gitlab操作经验解决。 执行前需要,提前在其他代码仓库里新建好项目(New Project),得到Rename repository Path。 执行逻辑 1.根据远

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包