Flink cdc3.0同步实例(动态变更表结构、分库分表同步)

这篇具有很好参考价值的文章主要介绍了Flink cdc3.0同步实例(动态变更表结构、分库分表同步)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在FLink cdc 2.x的版本,各企业做了许多类似的基础功能改造工作(B站 2022年企业flink cdc实践分享 )。
最近Flink CDC 3.0发布,schema 变更自动同步、整库同步、分库分表等增强功能使 Flink CDC 3.0 在更复杂的数据集成与用户业务场景中发挥作用:用户无需在数据源发生 schema 变更时手动介入,大大降低用户的运维成本;只需对同步任务进行简单配置即可将多表、多库同步至下游,并进行合并等逻辑,显著降低用户的开发难度与入门门槛。Flink CDC 3.0 正式发布。
我们今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,来体验下新上的整库同步、表结构变更同步和分库分表同步的功能。

准备

flink环境

准备 Flink Standalone 集群,下载最新版本 Flink 1.18.0 ,解压后得到 flink-1.18.0 目录。并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint,方便后续观察数据变更。

execution.checkpointing.interval: 3000

使用下面的命令启动 Flink 集群。

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
多次执行 start-cluster.sh 可以拉起多个 TaskManager,保证Total Task Slots >= 2, 不然提交任务会有资源不足异常,比如我这里执行了3次。 或者是修改 conf/flink-conf.yaml 资源配置。

docker构建mysql、doris环境

如果有安装这两个组件,就可以免去docker,接下来的教程将以 docker-compose 的方式准备所需要的组件。
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:

sysctl -w vm.max_map_count=2000000

docker 镜像启动,使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:
  doris:
    image: yagagagaga/doris-standalone
    ports:
      - "8030:8030"
      - "8040:8040"
      - "9030:9030"
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

该 Docker Compose 中包含的容器有:

MySQL: 包含商品信息的数据库 app_db

Doris: 存储从 MySQL 中根据规则映射过来的结果表

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8030/ 来查看 Doris 是否运行正常。

数据准备

进入 MySQL 容器, 或者通过客户端工具连接到mysql

docker-compose exec mysql mysql -uroot -p123456

创建数据库 app_db 和表 orders,products 并插入数据

-- 创建数据库
CREATE DATABASE app_db;

USE app_db;

-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
进入 Doris Web UI。http://localhost:8030/,默认的用户名为 root,默认密码为空。
通过 Web UI 创建 app_db 数据库

create database if not exists app_db;

flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库

通过 FlinkCDC cli 提交任务

下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

下载下面列出的 connector 包,并且移动到 lib 目录下

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0
    flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库

整库同步

编写任务配置 yaml 文件,下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

其中:
source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。
sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。

最后,通过命令行提交任务到 Flink Standalone cluster

bash bin/flink-cdc.sh conf/mysql-to-doris.yaml

提交成功后,返回信息如:
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。job id对应上面的cb049fe4a2112510a77ee46e197054a6
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库

同步变更

接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 区别于官方再新增一条数据
INSERT INTO app_db.orders VALUES (4, 200, 200.00);

也可以拆开每执行一步,刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示:
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
同样的,去修改 shipments, products 表,也能在 Doris 中实时看到同步变更的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件conf/mysql-to-doris-route.yaml说明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db1.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  benodes: 127.0.0.1:8040
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: app_db1.orders\.*
    sink-table: app_db1.ods_orders

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

通过上面的 route 配置,使用正则表达式,可以将诸如 app_db1.order_01、app_db1.order_02 的表汇总到 app_db1.ods_orders 中。从而实现分库分表同步的功能。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
另外官方文档里的写法存在一个问题。
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
正则表达式前面加上’\‘转义,app_db1.orders\.*,否则会抛出异常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
目前已在git提了issue,后面应该会处理这里的问题。
我们在mysql和doris分别创建数据库app_db1, 然后初始化mysql

-- 创建表orders_01
CREATE TABLE `orders_01` (
  `id` int NOT NULL,
  `price` decimal(10,2) NOT NULL,
  `amount` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- 创建表orders_02
CREATE TABLE `orders_02` (
  `id` int NOT NULL,
  `price` decimal(10,2) NOT NULL,
  `amount` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

启动新的job。
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
然后在orders_01,orders_02分别插入数据


INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);

在doris里验证,数据都写入了app_db1.ods_orders
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库

路由表结构不一致无法同步

看Schema Evolution 设计原理,Flink CDC 3.0 在作业拓扑中引入了 SchemaRegistry,结合 SchemaOperator 协调并控制作业拓扑中的 schema 变更事件处理。当上游数据源发生 schema 变更时,SchemaRegistry 会控制 SchemaOperator 以暂停数据流,并将流水线中的数据从 sink 全部刷出以保证 schema 一致性。当 schema 变更事件在外部系统处理成功后,SchemaOperator 恢复数据流,完成本次 schema 变更的处理。
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
所以考虑只修改orders_01,再插入数据看doris同步的变化。

-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);

可以看到doris中的app_db1.orders表结构发生了变化,但是orders_02的id=14这条数据没有正常写入。flink异常提示:java.lang.IllegalStateException: Column size does not match the data size
flinkcdc 3.0整库同步,大数据,flink,flink,大数据,cdc,动态变更,分表分库
而当修改orders_02的表结构,也会有异常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后写入的数据无法正常同步。

结尾

flink cdc的功能越来越强,也再尝试解决用户的使用痛点。不过放到生产环境使用还需要建立在更多的实践测试之上。文章来源地址https://www.toymoban.com/news/detail-775011.html

到了这里,关于Flink cdc3.0同步实例(动态变更表结构、分库分表同步)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(44)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(87)
  • Flink CDC数据同步

    一、什么是FLink Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 接下来,我们来介绍一下 Flink 架构中的重要方面。 任何类型的数据都可以形成一种事

    2024年02月08日
    浏览(43)
  • Flink CDC整库同步

    背景 项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql - greenplumn场景记录实现方案。 实现 1.引入依赖 2.创建FlinkCDCSource 创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的

    2024年04月14日
    浏览(50)
  • Flink CDC整库同步(多表异构同步)

    flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到

    2024年02月11日
    浏览(41)
  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(44)
  • flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(43)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(50)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(51)
  • Flink CDC实现一个Job同步多个表

    直接使用Flink CDC SQL的写法,一个Job只能同步一个表的数据,至于原因,在此不再赘述。 直接上代码吧 将SourceRecord类转化为自定义的JsonRecord类型 JsonRecord类定义如下: 其中fieldValue为字段map序列化后的字符串 第三步,编写main函数 通过OutputTag实现分流

    2024年02月12日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包