11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

这篇具有很好参考价值的文章主要介绍了11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

问题是来自于 群友, 2024.03.29, 也是花了一些时间 来排查这个问题 

大致的问题是用 mysql-cdc 连接了一个 mysql-pxc 集群, 然后创建了一个 test_user 表 

使用 "select * from test_user" 获取数据表的数据, 可以拿到 查询时的快照, 但是 无法获取到后续对于 test_user 表的增量操作的数据, 比如 新增的数据 

正常连接一个普通的 单点 mysql 是可以正常进行数据同步的 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

flink-sql 中基于 mysql-cdc 的 select * from test_user 的具体实现 也是衍生自此问题 

然后 可以 先了解一下这个 查询的过程 

问题的排查思路

当然 这里介绍的仅仅是一个思路, 一个排查方向, 未必能够解决问题 

1. 查看 正常同步, 异常同步 的 mysql 的 bin-log 相关配置, 然后更新 异常同步 mysql 的这边的配置, 查看是否能够正常工作 

2. 日志级别的排查, 查看 taskmanager 这边的日志, 查看一下 BinaryLogClinet, BinlogReader 等等相关日志, 切换日志级别为 DEBUG, 查看 是否有具体的信息, 结合日志信息 和 代码进行一个抽象的分析 

3. 如果能够访问数据库的话, 可以本地部署一个简单的 flink-1.13.6, 然后 本地模拟, 来调试 taskmanager, 当然 在这里这个问题, 我们是不具备条件的, 别人的机器 

4. 使用 mysqlbinlog 等等工具, 向 mysql 集群这边发送请求, 获取 binlog 相关信息, 查看是否正常 

处理过程

1. 查看 正常同步, 异常同步 的 mysql 的 bin-log 相关配置, 然后更新 异常同步 mysql 的这边的配置, 查看是否能够正常工作 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

2. 日志级别的排查, 查看 taskmanager 这边的日志, 查看一下 BinaryLogClinet, BinlogReader 等等相关日志, 切换日志级别为 DEBUG, 查看 是否有具体的信息, 结合日志信息 和 代码进行一个抽象的分析 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

INFO 级别的日志信息如下, 可以得出的大致的现象是 "看这个情况有点像 task_manager 触发连接 mysql binlog 服务这边怎么出问题了, 然后 blc-keepalive 线程, 每隔一分钟 在重试 "


2024-04-01 09:19:32,482 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:19:32,492 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=387473470, skipping 0 events plus 0 rows
2024-04-01 09:19:32,493 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded
2024-04-01 09:19:52,622 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:19:52,635 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=382848711, skipping 0 events plus 0 rows
2024-04-01 09:19:52,636 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded
2024-04-01 09:20:32,494 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:20:32,504 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=387473470, skipping 0 events plus 0 rows
2024-04-01 09:20:32,504 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded
2024-04-01 09:20:52,636 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:20:52,647 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=382848711, skipping 0 events plus 0 rows
2024-04-01 09:20:52,647 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded
2024-04-01 09:21:32,506 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:21:32,515 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=387473470, skipping 0 events plus 0 rows
2024-04-01 09:21:32,515 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded
2024-04-01 09:21:52,648 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2024-04-01 09:21:52,660 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000051', pos=382848711, skipping 0 events plus 0 rows
2024-04-01 09:21:52,660 INFO  io.debezium.connector.mysql.BinlogReader                     [] - Stopped reading binlog after 0 events, no new offset was recorded

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

日志修改为 DEBUG 级别, 可以看到 和 mysql 这边建立了 获取 binlog 的链接之后, 马上就断开了 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

日志映射到代码这边的话 

"Connected to MySQL binlog at 172.16.5.58:3306, starting at binlog file 'bin.000061', pos=161824289, skipping 0 events plus 0 rows" 是在 onConnect 中进行输出的 

"Stopped reading binlog after 0 events, no new offset was recorded" 是在 onConnect 中进行输出的 

然后 requestBinaryLogStream 是向 mysql 这边发送 dump binlog 命令 

然后正常的话这个线程 会阻塞在 listenForEventPackets, 进而 处理 mysql 服务器这边传递过来的各个 binlog 事件, 进而反馈给 flink sql-client, 实现了 数据的实时同步更新 

那么问题可能就在 是向 mysql 这边发送 dump binlog 命令 哪里出现问题了 

因此, 我们使用 mysqlbinlog 工具来进行模拟一下 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

3. 使用 mysqlbinlog 等等工具, 向 mysql 集群这边发送请求, 获取 binlog 相关信息, 查看是否正常

可以看到的是 flink 所在的机器, 去访问 mysql-pxc 集群 确实是出现了问题 

至于 具体的问题, 就得拿到具体的环境才能够进行排查了, 这里就不在深究下去了 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

然后 问题是可以第一次通过查询全量拿到 test_user 的所有数据

我们再 flink 所在的机器使用 mysql 客户端进行访问试试, 是可以正常登录, 正常执行查询的 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

问问 通义千问 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

-- 新增于 2024.04.09

后面运维人员开了一个 mysql-pxc 集群的端口映射出来, 新端口 3307 映射到 mysql-pxc 集群 

但是 mysql-pxc 集群的 binlog 配置貌似存在问题, 然后 pxc0, pxc1, pxc2 机器的 binlog 信息不同步 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

最终问题顺利解决 

topped reading binlog after 0 events, no new offset was recorded,19 flink,flink,mysql,cdc

完 文章来源地址https://www.toymoban.com/news/detail-852377.html

到了这里,关于11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink SQL CDC 数据处理的终极武器

    来源互联网多篇文章总结 业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存

    2024年02月16日
    浏览(36)
  • Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

    flink-cdc官网:Oracle CDC Connector — CDC Connectors for Apache Flink® documentation Flink环境依赖: (3)启用日志归档 (4)检查是否启用了日志归档 (5)创建具有权限的 Oracle 用户 (5.1)。创建表空间 (5.2)。创建用户并授予权限 Flink SQL 客户端连接器测试: 创建 Oracle 链接器 返回内容 以上代

    2024年02月11日
    浏览(44)
  • Flink-SQL 写入PostgreSQL 问题汇总

    ​ 错误信息 问题原因 解决 确定主键属性是否有空值,若为空则确定是否可作为主键属性(若正确则将其置为默认值);若不符合业务情况,则重新定义主属性集合,确保不出现空值 ​ 错误信息 错误原因 解决 原因:postgreSQL包含database,schema,table三级结构,在生产环境未

    2024年02月04日
    浏览(39)
  • Flink-SQL——时态表(Temporal Table)

    这里我们需要注意一下的是虽然我们介绍的是Flink 的 Temporal Table 但是这个概念最早是在数据库中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作

    2024年01月16日
    浏览(53)
  • flink-sql读写hive-1.16

    本文档内容基于 flink-1.16.x ,其他版本的整理,请查看本人博客的 flink 专栏其他文章。 Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样也是一个数据管理平台,可用于发现,定义,和演化数据。 Flink 与 Hive 的集成包

    2024年02月16日
    浏览(63)
  • Flink-SQL——动态表 (Dynamic Table)

    SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文会讨论这种差异,并介绍 Flink 如何在无界数据集上实现与数据库引擎在有界数据上的处理具有相同的语义。 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果

    2024年01月17日
    浏览(47)
  • 【flink-sql实战】flink 主键声明与upsert功能实战

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的 某个(些)列 是唯一的 并且不包含 Null 值 。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。 主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,

    2024年02月03日
    浏览(42)
  • flink-sql对kafka数据进行清洗过滤

    今天这篇blog主要记录使用flink-sql对kafka中的数据进行过滤。 以前对kafka数据进行实时处理时都是使用java来进行flink开发,需要创建一个工程,并且打成jar包再提交,流程固定但对于简单任务来说还是比较繁琐的。 今天我们要对logstash采集到kafka中的数据进行过滤筛选,将筛选

    2024年02月16日
    浏览(40)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包