Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

这篇具有很好参考价值的文章主要介绍了Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境说明:

flink 1.15.2

Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

mysql 版本:5.7

windows11 IDEA 本地运行

先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation

1. Oracle 开启 log archiving

(1).启用 log archiving
        a:以DBA用户连接数据库 
             sqlplus / as sysdba
        b:启用 log archiving (会重启数据库)
             alter system set db_recovery_file_dest_size = 10G;
             alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
             shutdown immediate;
             startup mount;
             alter database archivelog;
             alter database open;
        c:检查 log archiving 是否开启  -- Should now "Database log mode: Archive Mode"
             archive log list;

    (2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。
        为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列;
            ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

        —为数据库启用补充日志修改数据库添加补充日志数据;
            ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    (3).创建具有权限的Oracle用户
        a:创建表空间
            sqlplus / as sysdba
              CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
              exit;
          b:创建用户并赋权  flinkuser  flinkpw 
            sqlplus / as sysdba
              CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
              GRANT CREATE SESSION TO flinkuser;
              GRANT SET CONTAINER TO flinkuser;
              GRANT SELECT ON V_$DATABASE to flinkuser;
              GRANT FLASHBACK ANY TABLE TO flinkuser;
              GRANT SELECT ANY TABLE TO flinkuser;
              GRANT SELECT_CATALOG_ROLE TO flinkuser;
              GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
              GRANT SELECT ANY TRANSACTION TO flinkuser;
              GRANT LOGMINING TO flinkuser;
            
              GRANT CREATE TABLE TO flinkuser;
              GRANT LOCK ANY TABLE TO flinkuser;
              GRANT ALTER ANY TABLE TO flinkuser;
              GRANT CREATE SEQUENCE TO flinkuser;
            
              GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
              GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
            
              GRANT SELECT ON V_$LOG TO flinkuser;
              GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
              GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
              GRANT SELECT ON V_$LOGFILE TO flinkuser;
              GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
              GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
              exit;

2. Oracle 建表,并配置补充日志

CREATE TABLE "USER_INFO" (    
ID NUMBER, 
USERNAME VARCHAR2(255), 
PASSWORD VARCHAR2(255), 
PRIMARY KEY (ID));

ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3. Mysql 建表

CREATE TABLE user_new (
  id int(11) NOT NULL,
  username varchar(255) DEFAULT NULL,
  password varchar(255) DEFAULT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.Maven依赖

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.15.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
            <!--            此标签会移除jar包,当需要打包到集群运行时加上此标签-->
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.15.2</version>
            <!--            <scope>provided</scope>-->
            <!--            此标签会移除jar包,当需要打包到集群运行时加上此标签-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.17.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.9</version>
        </dependency>
    </dependencies>

5.demo如下:


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OracleCdcToMysql {

    public static void main(String[] args) {

        //1.获取stream的执行环境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

        String sourceTable = "CREATE TABLE oracle_cdc_source " +
                "( ID INT, " +
                "USERNAME STRING, " +
                "PASSWORD STRING, " +
                "PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" +
                "'connector' = 'oracle-cdc',\n" +
                "'hostname' = '1.1.1.1',\n" +
                "'port' = '1521',\n" +
                "'username' = 'flinkcdcuser',\n" +
                "'password' = 'flinkpw',\n" +
                "'database-name' = 'LMDB',\n" +//select name from v$database;
                "'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual;
                "'debezium.snapshot.mode' = 'schema_only',\n" +
                //snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。
                //snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。
                "'scan.incremental.snapshot.enabled' = 'true',\n" +
                //scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括:
                // (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。
                "'scan.incremental.snapshot.chunk.size' = '8096' ,\n" +
                //表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。
                "'scan.snapshot.fetch.size' = '1024',\n" +
                //读取表快照时每个轮询的最大读取大小。
                "'connect.max-retries' = '3',\n" +
                //连接器应该重试构建Oracle数据库服务器连接的最大重试次数。
                "'connection.pool.size'= '20',\n" +
                //连接池大小
                "'debezium.log.mining.strategy' = 'online_catalog',\n" +
                //online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。
                // 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。
                "'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" +
                "'debezium.log.mining.continuous.mine'='true'," +
                "  'table-name' = 'USER_INFO'\n" +
                ")";
        tEnv.executeSql(sourceTable);
//        tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  ID INT,\n" +
                "  USERNAME STRING,\n" +
                "  PASSWORD STRING,\n" +
                "PRIMARY KEY(ID) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" +
                "'username' = 'flink_cdc_user',\n" +
                "'password' = 'flink@cdc',\n"+
                "  'table-name' = 'user_new',\n" +
                "  'connection.max-retry-timeout' = '60s'\n" +
                ")";
        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source");
    }
}

本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。

下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。flink 实时同步oracle,flink,oracle,mysql

 下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。

flink 实时同步oracle,flink,oracle,mysql

具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。

具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心

6.打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。文章来源地址https://www.toymoban.com/news/detail-665036.html

到了这里,关于Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(42)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(35)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(33)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(26)
  • Flink CDC 实时抽取 Oracle 数据-排错&调优

    Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。对该版本进行试用并成功实现了对 Oracle 的实时数据捕获以及性能调优,现将试用过程中的一些关键细节进行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    浏览(30)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(35)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(53)
  • flink postgresql cdc实时同步(含pg安装配置等)

    类型 版本/描述 docker 20.10.9 Postgresql 10.6 初始化账号密码:postgres/postgres 普通用户:test1/test123 数据库:test_db flink 1.13.6 step1 : 拉取 PostgreSQL 10.6 版本的镜像: step2 :创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为 postgre

    2024年02月11日
    浏览(33)
  • flink sqlserver cdc实时同步(含sqlserver安装配置等)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md 如果要使用flink cdc做sqlserver的实时同步,需要满足以下条件: 需要安装SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持) ; 需要开启SQL Server代理; 启用CDC功能。 ok,接下来

    2024年02月08日
    浏览(32)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包