Flink CDC 实时抽取 Oracle 数据-排错&调优

这篇具有很好参考价值的文章主要介绍了Flink CDC 实时抽取 Oracle 数据-排错&调优。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。对该版本进行试用并成功实现了对 Oracle 的实时数据捕获以及性能调优,现将试用过程中的一些关键细节进行分享。

使用环境

Oracle:11.2.0.4.0(RAC 部署)
Flink:1.13.1
Hadoop:3.2.1

问题

1、无法连接数据库

根据官方文档说明,在 Flink SQL CLI 中输入以下语句:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );

之后尝试通过 select * from TEST 观察,发现无法正常连接 Oracle,报错如下:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

从报错信息来看,可能是由于 Flink CDC 误将连接信息中提供的 MY_SERVICE_NAME (Oracle 的服务名) 错认为 SID。于是尝试阅读 Flink CDC 涉及到 Oracle Connector 的源码,发现在 com.ververica.cdc.connectors.oracle.OracleValidator 中,对于 Oracle 连接的代码如下:

public static Connection openConnection(Properties properties) throws SQLException {
    DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection(
            "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}

由上可以看出,在当前版本的 Flink CDC 中,对于 SID 和 Service Name 的连接方式并未做区分,而是直接在代码中写死了 SID 的连接方式 (即 port 和 dbname 中间使用 “ : ” 分隔开)。

从 Oracle 8i 开始,Oracle 已经引入了 Service Name 的概念以支持数据库的集群 (RAC) 部署,一个 Service Name 可作为一个数据库的逻辑概念,统一对该数据库不同的 SID 实例的连接。据此,可以考虑以下两种方式:

  • 在 Flink CDC 的 create table 语句中,将 database-name 由 Service Name 替换成其中一个 SID。该方式能解决连接问题,但无法适应主流的 Oracle 集群部署的真实场景;
  • 对该源码进行修改。具体可在新建工程中,重写 com.ververica.cdc.connectors.oracle.OracleValidator 方法,修改为 Service Name 的连接方式 (即 port 和 dbname 中间使用 “ / ” 分隔开),即:
    “jdbc:oracle:thin:@” + hostname + “:” + port + “/” + dbname, userName, userpwd);

2、无法连接数据库无法找到 Oracle 表

按照上述步骤,再次通过 select * from TEST 观察,发现依然无法正常获取数据,报错如下:

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

观察到错误日志中提到的表是 MY_SERVICE_NAME.MY_SCHEMA.test,为什么数据库名、Schema 名都是大写,而表名是小写?
注意到该错误由 io.debezium 包报出,通过分析该包的源代码 (通过 Flink CDC 的 pom.xml 文件可知,目前使用的是 debezium 1.5.4 版本) 可知,在 io.debezium.relational.Tables 中有如下代码:

private TableId toLowerCaseIfNeeded(TableId tableId) {
    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

可见,Debezium 的开发者将 “大小写不敏感” 统一定义为了 “需要将表名转换为小写”。对于 Debezium 支持的 PostgreSQL、Mysql 等确实如此。然而对于 Oracle 数据库,“大小写不敏感” 却意味着在内部元信息存储时,需要将表名转换为大写。
因而 Debezium 在读取到 “大小写不敏感” 的配置后,按照上述代码逻辑,只会因为尝试去读取小写的表名而报错。
由于 Debezium 直到目前最新的稳定版本 1.7.1,以及最新的开发版本 1.8.0 都未修复该问题,我们可以通过以下两种方法绕过该问题:

  • 如需使用 Oracle “大小写不敏感” 的特性,可直接修改源码,将上述 toLowercase 修改为 toUppercase (这也是笔者选择的方法);
  • 如果不愿意修改源码,且无需使用 Oracle “大小写不敏感” 的特性,可以在 create 语句中加上 ‘debezium.database.tablename.case.insensitive’=‘false’,如下示例:create table TEST (A string) WITH ('connector'='oracle-cdc', 'hostname'='10.230.179.125', 'port'='1521', 'username'='myname', 'password'='***', 'database-name'='MY_SERVICE_NAME', 'schema-name'='MY_SCHEMA', 'table-name'='TEST', 'debezium.database.tablename.case.insensitive'='false' );

该方法的弊端是丧失了 Oracle “大小写不敏感” 的特性,在 ‘table-name’ 中必须显式指定大写的表名。

需要注明的是,对于 database.tablename.case.insensitive 参数,Debezium 目前仅对 Oracle 11g 默认设置为 true,对其余 Oracle 版本均默认设置为 false。所以读者如果使用的不是 Oracle 11g 版本,可无需修改该参数,但仍需显式指定大写的表名。

3、数据延迟较大

数据延迟较大,有时需要 3-5 分钟才能捕捉到数据变化。对于该问题,在 Flink CDC FAQ 中已给出了明确的解决方案:在 create 语句中加上如下两个配置项:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

那么为什么要这样做呢?我们依然可以通过分析源码和日志,结合 Oracle Logminer 的工作原理来加深对工具的理解。
对 Logminer 的抽取工作,主要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 方法进行。为节约篇幅,本文不列出实际的源码,仅提炼出关键过程绘于下面的流程图,有兴趣的读者可以对照该流程图,结合实际源码进行分析:

Flink CDC 实时抽取 Oracle 数据-排错&调优,Flink,Oracle CDC,flink,oracle,大数据
采用 redo_log_catalog 的方式,可以监控数据表的 DDL 信息,且由于 archive logs 被永久保存到磁盘上,可以在数据库宕机后依然正常获取到宕机前的所有 DDL 和 DML 操作。但由于涉及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。
一般来说,Flink CDC 所需要监控的表,特别是对于业务系统有重大意义的表,一般不会进行 DDL 操作,仅需要捕捉 DML 操作即可,且对于数据库宕机等极特殊情况,也可使用在数据库恢复后进行全量数据更新的方式保障数据的一致性。因而,online_catalog 的方式足以满足我们的需要。

另外,无论使用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步实际需要的日志不同步的问题,因此,加入 ‘debezium.log.mining.continuous.mine’=‘true’ 参数,将实时搜集日志的工作交给 Oracle 自动完成,即可规避这一问题。
按照这两个参数配置后,数据延迟一般可以从数分钟降至 5 秒钟左右。

4、调节参数继续降低数据延迟

上述流程图的第 ③ 步和第 ⑦ 步,提到了根据配置项来确定 LogMiner 监控时序范围,以及确定休眠时间。下面对该过程进行进一步分析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 方法,可了解到 debezium 对监控时序范围和休眠时间的调节原理。为便于读者理解,将该方法用流程图说明如下:
Flink CDC 实时抽取 Oracle 数据-排错&调优,Flink,Oracle CDC,flink,oracle,大数据
从上述的流程图中可以看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长能够尽可能和数据库自身 SCN 增加的步长一致。由此可见:

  • log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的表现有关,和单个表的数据变化情况无关;
  • log.mining.batch.size.default 不仅仅是监控时序范围的起始值,还是监控时序范围变化的阈值。所以如果要实现更灵活的监控时序范围调整,可考虑适当减小该参数;
  • 由于每一次确定监控时序范围时,都会根据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠时间更灵活的调整,可考虑适当增大 log.mining.sleep.time.increment.ms;
  • log.mining.batch.size.max 不能过小,否则会有监控时序范围永远无法追上数据库当前 SCN 的风险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

if (currentBatchSize == batchSizeMax) {
    LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
}

如果当前的监控时序范围达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提示。在实际应用中,观察 Flink CDC 产生的 log 是否包含该提示,便可得知 log.mining.batch.size.max 的值是否合理。

5、Debezium Oracle Connector的隐藏参数

事实上从上文中我们已经了解到了两个隐藏参数:

debezium.database.tablename.case.insensitive (见第二节内容)
debezium.log.mining.continuous.mine (见第三节内容)

这两个参数在 Debezium 的官方文档中均未给出实际说明,但实际上可以使用。通过分析源码,现给出 Debezium Oracle Connector 的所有隐藏参数,以及其说明如下:

Flink CDC 实时抽取 Oracle 数据-排错&调优,Flink,Oracle CDC,flink,oracle,大数据
除了上面我们已经用到的两个参数以外,同样值得重点关注的是 log.mining.history.recorder.class 参数。
由于该参数目前默认为 io.debezium.connector.oracle.logminer.NeverHistoryRecorder,是一个空类;
所以我们在分析 Flink CDC 行为时,通过自定义实现 io.debezium.connector.oracle.logminer.HistoryRecorder 接口的类,可在不修改源码的情况下,实现对 Flink CDC 行为的个性化监控。

更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。
Flink CDC 实时抽取 Oracle 数据-排错&调优,Flink,Oracle CDC,flink,oracle,大数据文章来源地址https://www.toymoban.com/news/detail-793466.html

到了这里,关于Flink CDC 实时抽取 Oracle 数据-排错&调优的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(55)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(69)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(44)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(53)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(46)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(55)
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 社区发

    2024年02月04日
    浏览(54)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(55)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包