Debezium同步Mysql数据到Kafka

这篇具有很好参考价值的文章主要介绍了Debezium同步Mysql数据到Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Debezium

环境

Kafka:3.3.2

mysql-connector:1.8.1

部署

(0)前提是安装好mysql,开启binlog

(1)下载kafka

1) tar -zxvf kafka_2.12-3.3.2.tgz -C /opt/software/
2) mkdir /opt/software/kafka_2.12-3.3.2/plugin

(2)下载mysql-connector插件

1)tar -zxvf debezium-connector-mysql-1.8.1.Final-plugin.tar.gz -C /opt/software/kafka_2.12-3.3.2/plugin

(3)编辑配置文件

1)vim connect-distributed.properties
2)尾部追加此配置-》plugin.path=/opt/software/kafka_2.12-3.3.2/plugin

(4)启动kafka自带的zk

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zkLog.log 2>&1 &

(5)启动kafka

nohup bin/kafka-server-start.sh config/server.properties > kafkaLog.log 2>&1 & 

(6)启动connect

bin/connect-distributed.sh config/connect-distributed.properties

(7)调用api

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "Test2022@", "database.server.id": "184055", "database.server.name": "mysql1", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

注意:当成功调用api,创建此连接器后会有如下主题产生:dbhistory.inventory、mysql1、mysql1.test.people、mysql1.test.student(因为是监控的是test整库,整库下只有people和student两张表,使所以会有此主题),其中dbhistory.inventory、mysql1主题是存储ddl变更记录

【include.schema.changes】改变的是mysql1主题是否会有ddl,默认是true,如果改为false后,此主题不会有ddl过来,但是一旦又改为false后,会把历史没收集的ddl都收集过来,然后继续收集后续的ddl。

【database.history.kafka.topic】无论include.schema.changes是否为true,都会收集对应的ddl,这个主题仅供内部使用,消费者不能使用。

(8)测试

访问http://192.168.31.100:8083/connectors进行测试,如果出现[“inventory-connector”]则成功。

运维

(1)如果要更新配置,则使用此api进行更新【PUT /connectors/{name}/config】eg:

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/inventory-connector/config  -d '{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "Test2022@", "database.server.id": "184055", "database.server.name": "mysql1", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.inventory", "include.schema.changes": "true", "database.history.store.only.captured.tables.ddl": "true" }'

(2)动态加表

​ 如果刚开始table.include.list只配置了部分表,后续运行过程中需要动态添加新表,此时通过上述更新接口进行更新table.include.list配置即可,此时会对表进行全量+增量的读取。

(3)路由管理

​ 默认情况下,一个表会创建一个主题。如果表太多,主题就会非常多。此时需要把一批表路由到同一个主题。在连接器配置中加入下几个配置,此时会把全部的表投递到同一个dbz-unique-topic-test主题下。

"transforms": "Reroute", 
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", 
"transforms.Reroute.topic.replacement": "dbz-unique-topic-test"

(4)ddl与dml数据的topic合并

​ 通过上述的路由管理,把transforms.Reroute.topic.replacement配置设置成和database.server.name相同的配置。

其他

连接器配置文档

Apache Kafka

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pWCPUTnT-1675678476438)(C:\Users\hp\AppData\Roaming\Typora\typora-user-images\image-20230206143356470.png)]# Debezium

环境

Kafka:3.3.2

mysql-connector:1.8.1

部署

(0)前提是安装好mysql,开启binlog

(1)下载kafka

1) tar -zxvf kafka_2.12-3.3.2.tgz -C /opt/software/
2) mkdir /opt/software/kafka_2.12-3.3.2/plugin

(2)下载mysql-connector插件

1)tar -zxvf debezium-connector-mysql-1.8.1.Final-plugin.tar.gz -C /opt/software/kafka_2.12-3.3.2/plugin

(3)编辑配置文件

1)vim connect-distributed.properties
2)尾部追加此配置-》plugin.path=/opt/software/kafka_2.12-3.3.2/plugin

(4)启动kafka自带的zk

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zkLog.log 2>&1 &

(5)启动kafka

nohup bin/kafka-server-start.sh config/server.properties > kafkaLog.log 2>&1 & 

(6)启动connect

bin/connect-distributed.sh config/connect-distributed.properties

(7)调用api

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "Test2022@", "database.server.id": "184055", "database.server.name": "mysql1", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

注意:当成功调用api,创建此连接器后会有如下主题产生:dbhistory.inventory、mysql1、mysql1.test.people、mysql1.test.student(因为是监控的是test整库,整库下只有people和student两张表,使所以会有此主题),其中dbhistory.inventory、mysql1主题是存储ddl变更记录

【include.schema.changes】改变的是mysql1主题是否会有ddl,默认是true,如果改为false后,此主题不会有ddl过来,但是一旦又改为false后,会把历史没收集的ddl都收集过来,然后继续收集后续的ddl。

【database.history.kafka.topic】无论include.schema.changes是否为true,都会收集对应的ddl,这个主题仅供内部使用,消费者不能使用。

(8)测试

访问http://192.168.31.100:8083/connectors进行测试,如果出现[“inventory-connector”]则成功。

运维

(1)如果要更新配置,则使用此api进行更新【PUT /connectors/{name}/config】eg:

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/inventory-connector/config  -d '{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "Test2022@", "database.server.id": "184055", "database.server.name": "mysql1", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.inventory", "include.schema.changes": "true", "database.history.store.only.captured.tables.ddl": "true" }'

(2)动态加表

​ 如果刚开始table.include.list只配置了部分表,后续运行过程中需要动态添加新表,此时通过上述更新接口进行更新table.include.list配置即可,此时会对表进行全量+增量的读取。

(3)路由管理

​ 默认情况下,一个表会创建一个主题。如果表太多,主题就会非常多。此时需要把一批表路由到同一个主题。在连接器配置中加入下几个配置,此时会把全部的表投递到同一个dbz-unique-topic-test主题下。

"transforms": "Reroute", 
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)", 
"transforms.Reroute.topic.replacement": "dbz-unique-topic-test"

(4)ddl与dml数据的topic合并

​ 通过上述的路由管理,把transforms.Reroute.topic.replacement配置设置成和database.server.name相同的配置。

其他

1、API接口文档

Apache Kafka其中的【REST API】部分文章来源地址https://www.toymoban.com/news/detail-495405.html

到了这里,关于Debezium同步Mysql数据到Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Canal+Kafka实现Mysql数据同步

    canal [kə\\\'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

    2024年02月12日
    浏览(50)
  • cancel框架同步mysql数据到kafka

    1、下载cancel 2、修改conf文件夹下的canal.properties配置文件 3、修改conf/example文件夹下的instance.properties配置文件 在sql查询show binary logs语句得到binlog日志 4、启动 在bin目录下执行 启动程序 注:MySQL需要创建新用户

    2024年02月15日
    浏览(55)
  • 通过kafka connector实现mysql数据自动同步es

    整体思路: 1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列 2、通过listener监听消息队列,代码控制数据插入es ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后

    2024年02月08日
    浏览(45)
  • 使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

    使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

    2024年02月11日
    浏览(42)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(46)
  • flink- mysql同步数据至starrocks-2.5.0之环境搭建

    一般需要以下几个服务: mysql flink flink-taskmanager flink-jobmanager starrocks starrocks-fe starrocks-be docker-compose.yml 配置文件 启动: docker-compose up -d : 登陆 starrocks 注意事项 mysql开启 bin log mysql 基于cdc ,采用 binlog模式,所以要开启binlog, conf/my.cnf : docker 服务放到同一docker网络中 如果不在同

    2024年02月10日
    浏览(50)
  • Debezium同步之同步部署

    目录 1. Docker 安装 参考网站: 1.1 删除旧版本 1.2 安装 Docker Engine-Community 1.

    2023年04月24日
    浏览(25)
  • Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库

    基于 Debezium 的端到端数据流用例,将数据流式传输到 Elasticsearch 服务器,以利用其出色的功能对我们的数据进行全文搜索。 同时把数据流式传输到 PostgreSQL 数据库,通过 SQL 查询语言来优化对数据的访问。 下面的图表显示了数据如何流经我们的分布式系统。首先,Debezium M

    2024年02月13日
    浏览(63)
  • docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

    🚀 本文提供的指令完全可以按顺序逐一执行,已进行了多次测试。因此如果你是直接按照我本文写的指令一条条执行的,而非自定义修改过,执行应当是没有任何问题的。 🚀 本文讲述:使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elas

    2024年02月02日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包