【大数据】Flink CDC 的概览和使用

这篇具有很好参考价值的文章主要介绍了【大数据】Flink CDC 的概览和使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.什么是 CDC

CDCChange Data Capture数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  • ✅ 插入(Insert):当新数据被插入到数据库表中时。
  • ✅ 更新(Update):当数据库表中的现有数据被修改时。
  • ✅ 删除(Delete):当数据从数据库表中被删除时。

2.什么是 Flink CDC

Flink CDC 是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步数据管道实时分析实时应用 等功能。

本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。

🚀 https://github.com/ververica/flink-cdc-connectors

3.Flink CDC 前生今世

3.1 Flink CDC 1.x

Flink CDC 1.x 开启了 Flink 在 CDC 上的实践之路,Flink CDC 1.x 第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x 只能说是可以用,但不能生产上用,为什么:

  • 1.x 版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。
  • 各方面功能还不够完善,比如自动加表、DDL 事件传递等。

【大数据】Flink CDC 的概览和使用,# Flink,大数据,flink,CDC,Flink CDC,数据集成,实时同步

总体而言 Flink CDC 1.x 只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

【大数据】Flink CDC 的概览和使用,# Flink,大数据,flink,CDC,Flink CDC,数据集成,实时同步

3.2 Flink CDC 2.x

2.x 版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。

【大数据】Flink CDC 的概览和使用,# Flink,大数据,flink,CDC,Flink CDC,数据集成,实时同步

3.3 Flink CDC 3.x

近期,Flink CDC 发布了全新的 3.0 版本,并宣布捐赠回 Flink 主项目,在新的 3.0 版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与 SeaTunnelDataXChunjun 等一系列老牌数据集成项目同台竞技,让我们拭目以待。

【大数据】Flink CDC 的概览和使用,# Flink,大数据,flink,CDC,Flink CDC,数据集成,实时同步

4.Flink CDC 使用

在本地启动一个 MySQL 的 Docker 环境。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

创建表:

create database cdc_test;
use cdc_test;

create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

在 IDEA 中新建一个Java 项目。

导入依赖:

<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink-cdc.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
</dependency>

编写代码:

public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(60000L);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("cdc_test.cdc_table") // set captured table
            .username("root")
            .password("debezium")
            .includeSchemaChanges(true)
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
            .print();
        env.execute();
    }
}

添加日志配置:文章来源地址https://www.toymoban.com/news/detail-821495.html

<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one or more
  ~ contributor license agreements.  See the NOTICE file distributed with
  ~ this work for additional information regarding copyright ownership.
  ~ The ASF licenses this file to You under the Apache License, Version 2.0
  ~ (the "License"); you may not use this file except in compliance with
  ~ the License.  You may obtain a copy of the License at
  ~
  ~    http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->

<configuration>
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
             <pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
          </encoder>
       </appender>

       <root level="INFO">
          <appender-ref ref="STDOUT" />
       </root>
</configuration>

5.Debezium 标准 CDC Event 格式详解

{
    "before": null,
    "after": {
        "id": 1,
        "name": "xing.yu",
        "age": 26,
        "new_column": "dewu"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1702723640000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "cdc_table",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2394,
        "row": 0,
        "thread": 39,
        "query": null
    },
    "op": "c",
    "ts_ms": 1702723640483,
    "transaction": null
}
{
    // 表数据更新前的值,update/delete
    "before": {},
    // 表数据更新后的值,create/update
    "after": {},
    // 元数据信息
    "source": {},
    // 操作类型 c/d/u
    "op": "",
    // 记录解析时间
    "ts_ms": "",
    "transaction": ""
}

到了这里,关于【大数据】Flink CDC 的概览和使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据】Flink CDC 的概览和使用

    CDC ( Change Data Capture , 数据变更抓取 )是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

    2024年01月24日
    浏览(49)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(43)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(48)
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 社区发

    2024年02月04日
    浏览(52)
  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(44)
  • flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(44)
  • flink sqlserver cdc实时同步(含sqlserver安装配置等)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md 如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件: 需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持) ; 需要开启SQL Server代理; 启用CDC功能。 ok,接下来

    2024年02月08日
    浏览(42)
  • flink postgresql cdc实时同步(含pg安装配置等)

    类型 版本/描述 docker 20.10.9 Postgresql 10.6 初始化账号密码:postgres/postgres 普通用户:test1/test123 数据库:test_db flink 1.13.6 step1 : 拉取 PostgreSQL 10.6 版本的镜像: step2 :创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为 postgre

    2024年02月11日
    浏览(45)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(58)
  • 使用flink sqlserver cdc 同步数据到StarRocks

    前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现: sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程 1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本) 我这个使用的是docker环

    2024年02月10日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包