Flink CDC 最佳实践(以 MySQL 为例)

这篇具有很好参考价值的文章主要介绍了Flink CDC 最佳实践(以 MySQL 为例)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

# MySQL connection configuration
mysql.server-id: 12345
mysql.hostname: localhost
mysql.port: 3306
mysql.username: root
mysql.password: 123456
mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

# Flink CDC Job Configuration
name: mysql-cdc-job
flink.parallelism: 1
flink.checkpoint.interval: 60000
flink.checkpoint.mode: EXACTLY_ONCE

# MySQL CDC Source Configuration
debezium.transforms: unwrap
debezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
database.hostname: localhost
database.port: 3306
database.user: root
database.password: 123456
database.history.kafka.bootstrap.servers: localhost:9092
database.history.kafka.topic: mysql-cdc-history
database.server.id: 12345
database.server.name: test
database.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE

debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrapdatabase.hostnamedatabase.portdatabase.userdatabase.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers 为 Kafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

CREATE DATABASE test_db;

CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';

GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。

步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

CREATE TABLE test_db.test_table (
  id INT PRIMARY KEY,
  name VARCHAR(30),
  age INT,
  email VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE test_db.test_table_cdc (
  `database` VARCHAR(100),
  `table` VARCHAR(100),
  `type` VARCHAR(10),
  `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  `before` JSON,
  `after` JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test-group");

    JdbcSource<RowData> source = JdbcSource.<RowData>builder()
            .setDrivername("com.mysql.jdbc.Driver")
            .setDBUrl("jdbc:mysql://localhost:3306/test_db")
            .setUsername("flink_cdc_user")
            .setPassword("password")
            .setQuery("SELECT id, name, age, email FROM test_table")
            .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING))
            .setFetchSize(1000)
            .build();

    DataStream<RowData> stream = env.addSource(source);

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.
[INFO] Initializing CDC source...
[INFO] CDC source successfully initialized.
[INFO] Starting CDC source...
[INFO] CDC source successfully started.
[INFO] Adding CDC source to Flink job topology...
[INFO] CDC source successfully added to Flink job topology.
[INFO] Starting Flink job...
[INFO] Flink job started successfully.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.
[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.
[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。文章来源地址https://www.toymoban.com/news/detail-467823.html

到了这里,关于Flink CDC 最佳实践(以 MySQL 为例)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink进阶篇-CDC 原理、实践和优化&采集到Doris中

    基于doris官方用doris构建实时仓库的思路,从flinkcdc到doris实时数仓的实践。 原文  Apache Flink X Apache Doris 构建极速易用的实时数仓架构 (qq.com) CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。

    2023年04月08日
    浏览(47)
  • Flink CDC 实时mysql到mysql

    CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。 mysqlcdc需要mysql开启binlog,找到my.cnf,在 [mysqld] 中加入如下信息 [mysqld]

    2024年02月12日
    浏览(47)
  • CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

    继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,

    2024年02月22日
    浏览(48)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(53)
  • flink mysql cdc调试问题记录

    最近需要用到flink cdc作为数据流处理框架,在demo运行中发现一些问题,特此记录问题和解决过程。 Caused by: java.lang.IllegalArgumentException: Can\\\'t find any matched tables, please check your configured database-name: [localdb] and table-name: [flink_cdc_message] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.

    2023年04月17日
    浏览(55)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(48)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(66)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

    flink1.13.6 + flink mysql cdc 1.4.0 flink 1.16.0 + flink mysql cdc 2.3.0 flink 1.16.0 + flink mysql cdc 2.4.0 flink 1.16.0 + flink postgresql cdc 2.3.0 flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题 1、调整chunck大小 : scan.incremental.snapshot.chunk.size 2、设置cdc模式:scan.startup.mode【initial(

    2024年02月16日
    浏览(33)
  • 云计算 - 以阿里云为例,企业上云策略全览与最佳实践

    云采用框架(Cloud Adoption Framework,简称CAF)为企业上云提供策略和技术的指导原则和最佳实践,帮助企业上好云、用好云、管好云,并成功实现业务目标。本云采用框架是基于服务大量企业客户的经验总结,将企业云采用分为四个阶段,并详细探讨企业应在每个阶段采取的业

    2024年03月09日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包