flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

这篇具有很好参考价值的文章主要介绍了flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink 与cdc 版本使用搭配:

flink1.13.6 + flink mysql cdc 1.4.0
flink 1.16.0 + flink mysql cdc 2.3.0
flink 1.16.0 + flink mysql cdc 2.4.0
flink 1.16.0 + flink postgresql cdc 2.3.0

flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题

flink cdc

参数说明

1、调整chunck大小 : scan.incremental.snapshot.chunk.size
2、设置cdc模式:scan.startup.mode【initial(默认)、latest-offset】
3、支持chunk key 列设置,默认是第一个字段:scan.incremental.snapshot.chunk.key-column
官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html

原理分析

1、cdc mysql 全量快照阶段split sql :SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?;
备注:id 是主键id

(DBLog)无锁算法论文

链接地址:https://arxiv.org/pdf/2010.12597.pdf , 对此算法感兴趣的可以看这位大佬的分享:https://zhuanlan.zhihu.com/p/600303844

论文部分摘要理解:

  • 全量阶段:
    1、flink cdc 任务启动后按设置的chunk size切分数据,sql如下:
    (sql:SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?; )
    2、同时会启动读取binlog任务,读取chunk对应的binlog,通过binlog对 select chunk的数据做合并操作,此操作是合并在期间执行了update、delete操作,保证insert-only

  • 增量阶段:
    1、不断追加数据
    flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,flink,flink,mysql,大数据
    flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,flink,flink,mysql,大数据

mysql cdc

cdc api 动态加表

1、启动任务,复制checkpoint路径
flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,flink,flink,mysql,大数据2、新增监听的表到tableList(可以使用同一个jar包,在外部传参动态加表)
3、从checkpoint初重启任务即可

flink cdc sql 性能压测

1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

flink cdc api 性能压测

1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

PostgreSqlCDC

执行更新语句,会出现 2 种情况

1、若更新字段包含(部分)主键字段,会先发送一条删除之前主键的记录。op = d , after = null ; 然后再发送一条新主键记录,op = c,且before = null 。
2、若仅更新非主键主键,只会发送一条记录,op = u , before = null。

主体代码如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 必须开启 checkpoint ,因为Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN,否则磁盘使用率会一直很高
        env.enableCheckpointing(1000);

        //监听 postgresql wal 日志
        DebeziumSourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname(host)
                .port(port)
                .username(userName)
                .password(passWord)
                .database(dbName)
                .tableList(tableList)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .slotName(slotName)
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print(">>>").setParallelism(1);

        env.execute();

cdc sink to kafka

AT_LEAST_ONCE 模型要配置 acks = 1

报错

mysql时区错误,The server time zone value ‘EDT’ is unrecognized or represents

登录mysql并查询当前时区:show variables like “%time_zone%”;
执行以下命令修改时区:

set global time_zone = '+8:00'; ##修改mysql全局时区为北京时间,即我们所在的东8区
set time_zone = '+8:00'; ##修改当前会话时区
flush privileges; #立即生效

java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig

缺包,引入 debezium-connector-mysql-1.6.4.Final.jar包会报Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig,看 flink cdc 社区群反馈可能是

Cannot discover a connector using option: ‘connector’=‘mysql-cdc’

删除pom.xml文件flink-connector-mysql-cdc依赖报下的provided

Could not instantiate the executor. Make sure a planner module is on the classpath

包冲突原因。注释或删除jar:flink-table-planner-loader-1.16.0.jar

(source 算子 )The TaskExecutor is shutting down.

加大心跳间隔时间,默认是30s,‘heartbeat.interval’ = ‘60s’文章来源地址https://www.toymoban.com/news/detail-604123.html

到了这里,关于flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(47)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(102)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(64)
  • CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

    继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,

    2024年02月22日
    浏览(47)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(58)
  • flink mysql cdc调试问题记录

    最近需要用到flink cdc作为数据流处理框架,在demo运行中发现一些问题,特此记录问题和解决过程。 Caused by: java.lang.IllegalArgumentException: Can\\\'t find any matched tables, please check your configured database-name: [localdb] and table-name: [flink_cdc_message] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.

    2023年04月17日
    浏览(53)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(74)
  • Flink CDC 最佳实践(以 MySQL 为例)

    1.1 确认 MySQL binlog 模式 确认 MySQL 数据库的 binlog 模式是否为 ROW 。可以在 MySQL 命令行中执行以下语句确认: 如果返回结果中的 Value 字段为 ROW ,则说明 binlog 模式为 ROW 。 1.2 下载并安装 Flink 下载并安装 Flink,可以参考官方文档进行安装。 2.1 配置 MySQL 数据库连接信息 在 F

    2024年02月07日
    浏览(38)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(46)
  • Doris通过Flink CDC接入MySQL实战

    1. 创建MySQL库表,写入demo数据 登录测试MySQL 创建MySQL库表,写入demo数据 注意:MySQL需要开通bin-log log_bin=mysql_bin binlog-format=Row server-id=1 2. 创建Doris库表 创建Doris表 3. 启动Flink 启动flink 创建Flink 任务: 输入如下地址,查看flink任务 http://localhost:8081/#/job/running 数据验证:启动后可

    2023年04月10日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包