最新版Flink CDC MySQL同步MySQL(一)

这篇具有很好参考价值的文章主要介绍了最新版Flink CDC MySQL同步MySQL(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.概述

Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据

2.支持的连接器

连接器 数据库 驱动
mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4
mysql-cdc MySQL: 5.6, 5.7, 8.0.x、RDS MySQL: 5.6, 5.7, 8.0.x、PolarDB MySQL: 5.6, 5.7, 8.0.x、Aurora MySQL: 5.6, 5.7, 8.0.x、MariaDB: 10.x、PolarDB X: 2.0.1 JDBC Driver: 8.0.28
oceanbase-cdc OceanBase CE: 3.1.x, 4.x、OceanBase EE: 2.x, 3.x, 4.x OceanBase Driver: 2.4.x
oracle-cdc Oracle: 11, 12, 19, 21 Oracle Driver: 19.3.0.0
postgres-cdc PostgreSQL: 9.6, 10, 11, 12, 13, 14 JDBC Driver: 42.5.1
sqlserver-cdc Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 9.4.1.jre8
tidb-cdc TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 JDBC Driver: 8.0.27
db2-cdc Db2: 11.5 Db2 Driver: 11.5.0.0
vitess-cdc Vitess: 8.0.x, 9.0.x MySql JDBC Driver: 8.0.26

3.支持的 Flink 版本

下表显示了 Flink CDC Connectors 与 Flink ®的版本对应关系:

Flink CDC版本_ Flink 版本_
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*
2.1.* 1.13.*
2.2.* 1.13.*、1.14.*
2.3.* 1.13.*、1.14.*、1.15.*、1.16.0
2.4.* 1.13.*、1.14*、1.15.*、1.16.*、1.17.0

4.特征

支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理。

DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。

Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。

5.表/SQL API 的用法

我们需要几个步骤来使用提供的连接器设置 Flink 集群。

首先我们安装了 1.17+ 版本的 Flink 集群(java 8+)。

注意: 如果需要安装Flink请查看笔者对应的博客 flink高可用集群搭建(Standalone模式)
本文用到的jar包flink-connector-jdbc-3.1.1-1.17.jar和flink-sql-connector-mysql-cdc-2.2.1.jar

下载 连接器 SQL jar (或自行构建)。

将下载的jar包放在FLINK_HOME/lib/.

重启Flink集群。

注意:目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的

6.使用 Flink CDC 对 MySQL 进行流式 ETL

本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。

假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中

在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。

架构概述如下:
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据

7.环境准备

需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)

注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程

8.在 Flink SQL CLI 中使用 Flink DDL 创建表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.50.163',
    'port' = '3306',
    'username' = 'root',
    'password' = '****',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

编辑目标数据库Flink Sql代码,如下所示:

CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc
    'connector' = 'jdbc',
    #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    #需要访问的数据库驱动
    'driver' = 'com.mysql.cj.jdbc.Driver',
    #目标数据库账号
    'username' = 'root',
    #目标据库密码
    'password' = '***',
    #目标数据库表
    'table-name' = 'product'
  );

在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系

-- Flink SQL
Flink SQL> CREATE TABLE product (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'product'
  );

9.将源数据表加载到目标MySQL

使用Flink SQL将表product与 表查询products表写入目标MySQL。

-- Flink SQL
Flink SQL> insert into product select * from products;

具体操作步骤如下所示:
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据

这是源数据库,操作添加数据,如下图所示:
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据
目标数据库同步操作如下图
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据

10.flink可视化界面查看Running JOBS

红框勾选为运行的同步任务
最新版Flink CDC MySQL同步MySQL(一),flink,flink,mysql,大数据
至此Flink CDC MySQL同步MySQL第一节讲解完毕,后面会更新其复杂操作文章来源地址https://www.toymoban.com/news/detail-539078.html

到了这里,关于最新版Flink CDC MySQL同步MySQL(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(51)
  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(72)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(68)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(39)
  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(70)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(39)
  • Flink cdc同步mysql到starrocks(日期时间格式/时区处理)

    flink 1.15.3(此时最新版本为1.16.1) mysql 5.7+ starrocks 2.5.2 mysql同步表结构 mysql中的timestamp字段是可以正常同步的,但是多了8小时,设置了mysql链接属性也没效果 参考下方的链接有两种方式; 参考资料 https://blog.csdn.net/cloudbigdata/article/details/122935333 https://blog.csdn.net/WuBoooo/article/deta

    2024年02月16日
    浏览(34)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(43)
  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(68)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包