Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

这篇具有很好参考价值的文章主要介绍了Flink CDC 基于mysql binlog 实时同步mysql表(无主键)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境说明:

flink 1.15.2

mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据

windows11 IDEA 本地运行

具体前提设置,请看这篇,包含 binlog 设置、Maven......

Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客

经过不懈努力,终于从阿里help页面找到了支持无主键同步的参数:

MySQL_实时计算 Flink版-阿里云帮助中心

Flink CDC 基于mysql binlog 实时同步mysql表(无主键),mysql,flink,数据库

 然后就开始一顿模式,各种参数调试,终于达到了目的,无主键表实时同步,只不过在sink表关联目标表时,要指定几个字段为主键,这样就不会有重复的覆盖情况了,多给几个字段作为主键,不就避免重复冲突了嘛。比如id+date+local等,具体看表字段。

demo如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class MysqlToMysqlNonePrimaryKey {

    public static void main(String[] args) {
        //1.获取stream的执行环境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);

        String sourceTable = "CREATE TABLE mysql_cdc_source (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING\n" +
                ") WITH (\n" +
                "'connector' = 'mysql-cdc',\n" +
                "'hostname' = 'localhost',\n" +
                "'port' = '3306',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'database-name' = 'test_cdc',\n" +
                "'debezium.snapshot.mode' = 'initial',\n" +
                "'scan.incremental.snapshot.enabled' = 'false',\n" +
                //如果开启增量快照,必须设置主键。
                //默认开启增量快照。增量快照是一种读取全量数据快照的新机制。与旧的快照读取相比,增量快照有很多优点,包括:
                //读取全量数据时,Source可以是并行读取。
                //读取全量数据时,Source支持chunk粒度的检查点。
                //读取全量数据时,Source不需要获取全局读锁(FLUSH TABLES WITH read lock)。
                //如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。
                "'scan.incremental.snapshot.chunk.key-column' = 'id' ,\n" +
                //可以指定某一列作为快照阶段切分分片的切分列。无主键表必填,选择的列必须是非空类型(NOT NULL)。
                //有主键的表为选填,仅支持从主键中选择一列。
                "  'table-name' = 'user'\n" +
                ")";
        tEnv.executeSql(sourceTable);
//        tEnv.executeSql("select * from mysql_cdc_source").print();
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  id INT,\n" +
                "  username STRING,\n" +
                "  password STRING\n" +
                "  ,PRIMARY KEY (id,username,password) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/" + "test_cdc" + "?rewriteBatchedStatements=true',\n" +
                "'username' = 'root',\n" +
                "'password' = 'root',\n" +
                "'table-name' = 'user_new'\n" +
                ")";
        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
    }
}

由于无主键, debezium.snapshot.mode' = 'initial',这个参数会导致,程序运行几次,源表数据就会同步几次到目标表,并不会去重,如果想一直这个参数运行,需要在插入前先清空表,但是如果是数据量大的,推荐还是先用这个参数同步历史数据,完成后,再改为 schema_only,启动程序,然后把上面一个程序干掉。

上面设置的主键是三个字段,id、username、password,这三个字段不能为null,如果有数据为null,程序在启动的时候,就会报错,虽然没有打印到控制台上,但是可以看到控制台程序结束了,不是一直在运行,并且数据也是同步不过去的。所以挑选主键字段时一定要确定此字段一定不为null,如果为null的话,就需要能接受转换处理,比如:varchar 类型 将null值转换为空字符串

insert into mysql_cdc_sink select case when id is null then 0 else id end,case when username is null then '' else username  end,case when password is null then '' else password end from mysql_cdc_source

具体如何处理,还看业务需求。不过,在数据同步时,尽量要做到不对数据做任何变动。如果是可以加入清洗,那就随便玩。

具体数据变化时同步的情况还需自行探索。文章来源地址https://www.toymoban.com/news/detail-718830.html

到了这里,关于Flink CDC 基于mysql binlog 实时同步mysql表(无主键)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(39)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(34)
  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(36)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(42)
  • Flink CDC2.4 整库实时同步MySql 到Doris

            Flink 1.15.4          目前有很多工具都支持无代码实现Mysql - Doris 的实时同步         如:SlectDB 已发布的功能包                 Dinky SeaTunnel TIS 等等          不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动

    2024年02月11日
    浏览(43)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(27)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(66)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(46)
  • Flink CDC获取mysql 主从分库,分库分表的binlog

    Flink CDC可以获取MySQL主从分库,分库分表的binlog,但是需要注意以下几点: Flink CDC需要配置MySQL的binlog模式为row,以及开启GTID(全局事务标识符),以便正确地识别和处理binlog事件 Flink CDC需要配置MySQL的主从复制关系,以及指定主库或从库的地址,以便正确地连接和读取bin

    2024年02月11日
    浏览(31)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包