Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

这篇具有很好参考价值的文章主要介绍了Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Oracle CDC配置(Non-CDB database)

第一步: 开启归档日志

  1. 使用sysdba角色登录到Oracle数据库

  2. 确保Oracle归档日志(Archive Log)已启用

    select log_mode from v$database; -- 查询结果应为ARCHIVELOG。
    
  3. 若未启用归档日志, 需运行以下命令启用归档日志

    1. 设置归档日志存储大小及位置

      • 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等)
        alter system set db_recovery_file_dest_size = 10G;
        
      • 设置恢复文件的实际物理存储路径;scope=spfile参数设置讲persist到spfile参数文件中,即实例重启后也仍然生效
        alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
        
    2. 立即关闭数据库。这会回滚所有未提交的事务,并断开所有连接的会话,然后关闭数据库实例

      shutdown immediate;
      
    3. 启动数据库,但只到‘挂载’阶段,此时数据库文件对用户还不可用。在这个阶段,DBA可以进行一些特殊的管理任务,比如数据库的恢复或者切换日志模式

      startup mount;
      
    4. 将数据库的日志模式切换为归档日志模式。在归档日志模式下,数据库会保存所有的重做日志文件,这对于数据库恢复和数据库备份非常重要。

      alter database archivelog;
      
    5. 将数据库从‘挂载’状态切换到‘开放’状态,此时数据库对用户可用,可进行正常的数据库操作。

      alter database open;
      
    6. 再次确认归档日志是否已启用!

      select log_mode from v$database; -- 查询结果应为ARCHIVELOG。
      

    注意:

    1. 启用归档日志需要数据库重启,请谨慎操作!
    2. 归档日志会占用大量的磁盘空间,需定期清理过期的归档日志!
  4. 启用Supplemental logging
    为捕获数据库中数据变更前的状态,必须在捕获的表或数据库上启用补充日志(Supplemental logging)

    • 为数据库启用supplemental logging
      ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
      
    • 为指定表启用supplemental logging
      ALTER TABLE db.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
      

第二步: 创建Tablespace表空间

表空间是用来存储数据库对象(如表、索引等)的逻辑结构。

在Oracle中创建一个名为"logminer_tbs"的表空间,数据文件的路径为"/opt/oracle/oradata/SID/logminer_tbs.dbf",大小为25M,并且允许自动扩展,最大大小为无限。

执行以下SQL需要使用sysdba角色登录到Oracle数据库

CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

第三步: 创建用户并授予相应权限

执行以下SQL需要使用sysdba角色登录到Oracle数据库


GRANT CREATE SESSION TO cdc_user; -- 授予用户"cdc_user"创建会话的权限,允许用户连接到数据库。
GRANT SET CONTAINER TO cdc_user; -- 授予用户"cdc_user"切换到不同的容器(Container)的权限。容器是Oracle 12c中的概念,用于隔离和管理数据库资源。
GRANT SELECT ON V_$DATABASE to cdc_user; -- 授予用户"cdc_user"对系统视图"V_$DATABASE"的SELECT权限。该视图提供了关于数据库实例的信息。
GRANT FLASHBACK ANY TABLE TO cdc_user; -- 授予用户"cdc_user"对任意表进行闪回(Flashback)操作的权限。闪回是一种用于还原或查询数据库历史数据的功能。
GRANT SELECT ANY TABLE TO cdc_user; -- 授予用户"cdc_user"对任意表进行SELECT操作的权限。
GRANT SELECT_CATALOG_ROLE TO cdc_user; -- 授予用户"cdc_user"执行SELECT_CATALOG_ROLE角色的权限。SELECT_CATALOG_ROLE角色允许用户查询数据库的元数据信息。
GRANT EXECUTE_CATALOG_ROLE TO cdc_user; -- 授予用户"cdc_user"执行EXECUTE_CATALOG_ROLE角色的权限。EXECUTE_CATALOG_ROLE角色允许用户执行数据库的元数据操作。
GRANT SELECT ANY TRANSACTION TO cdc_user; -- 授予用户"cdc_user"对任意事务进行SELECT操作的权限。
GRANT LOGMINING TO cdc_user; -- 授予用户"cdc_user"进行日志挖掘(Log Mining)的权限。日志挖掘是一种用于分析和提取数据库变更信息的技术。

GRANT LOCK ANY TABLE TO cdc_user; -- 授予用户"flinkuser"锁定任意表的权限。(需开启。不开启的话,无法采集数据)

GRANT EXECUTE ON DBMS_LOGMNR TO cdc_user; -- 授予用户"cdc_user"执行DBMS_LOGMNR包中的过程和函数的权限。DBMS_LOGMNR包提供了用于日志挖掘的功能。
GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc_user; -- 授予用户"cdc_user"执行DBMS_LOGMNR_D包中的过程和函数的权限。DBMS_LOGMNR_D包扩展了DBMS_LOGMNR包的功能。

GRANT SELECT ON V_$LOG TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOG"的SELECT权限。该视图提供了关于日志文件的信息。
GRANT SELECT ON V_$LOG_HISTORY TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOG_HISTORY"的SELECT权限。该视图提供了关于历史日志文件的信息。
GRANT SELECT ON V_$LOGMNR_LOGS TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOGMNR_LOGS"的SELECT权限。该视图提供了关于日志挖掘所使用的日志文件的信息。
GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOGMNR_CONTENTS"的SELECT权限。该视图提供了关于日志挖掘的内容信息。
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOGMNR_PARAMETERS"的SELECT权限。该视图提供了关于日志挖掘参数的信息。
GRANT SELECT ON V_$LOGFILE TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$LOGFILE"的SELECT权限。该视图提供了关于日志文件的信息。
GRANT SELECT ON V_$ARCHIVED_LOG TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$ARCHIVED_LOG"的SELECT权限。该视图提供了关于已归档日志文件的信息。
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO cdc_user; -- 授予用户"cdc_user"对系统视图"V_$ARCHIVE_DEST_STATUS"的SELECT权限。该视图提供了关于归档目标状态的信息。

Oracle CDC DataStream API实现

所使用软件的版本

  • java 1.8
  • Scala 2.11
  • Flink 1.14.2
  • Flink CDC 2.3.0
  • Source Oracle 19c
  • Sink MySQL 5.7
  • jackson 2.10.2

Oracle CDC DataStream API可实现一个job监控采集一个数据库的多个表.

1. 定义OracleSource


//源数据库连接配置文件
Properties sourceDbProps = DbConfigUtil.loadConfig("oracle.properties");

//Debezium配置
Properties debeziumProps = new Properties();
//参考 https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-property-log-mining-strategy
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
debeziumProps.setProperty("log.mining.continuous.mine", "true");
//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
//以double值来表示它们,这可能会到值精度丢失
//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode","string");

//Oracle CDC数据源
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
        .hostname(sourceDbProps.getProperty("host"))
        .port(Integer.parseInt(sourceDbProps.getProperty("port")))
        .database(sourceDbProps.getProperty("database")) // monitor database
        .schemaList(sourceDbProps.getProperty("schema_list").split(",")) // monitor schema
        .tableList(sourceDbProps.getProperty("table_list").split(",")) // monitor table
        .username(sourceDbProps.getProperty("username"))
        .password(sourceDbProps.getProperty("password"))
        .deserializer(new JsonDebeziumDeserializationSchema())
        .debeziumProperties(debeziumProps)
        .startupOptions(StartupOptions.initial())
        .build();
 

2. 数据处理

参考: MySQL CDC配置及DataStream API实现代码文章来源地址https://www.toymoban.com/news/detail-754748.html

3. Sink到MySQL

参考: MySQL CDC配置及DataStream API实现代码

参考

  1. https://debezium.io/documentation/reference/1.6/connectors/oracle.html
  2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/oracle-cdc.html
  3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

到了这里,关于Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink DataStream API详解

    参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序读取其输入的位置,您可以使用 env.addSource(sourceFunction) 将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定

    2024年02月14日
    浏览(26)
  • Flink学习——DataStream API

            一个flink程序,其实就是对DataStream的各种转换。具体可以分成以下几个部分: 获取执行环境(Execution Environment) 读取数据源(Source) 定义基于数据的转换操作(Transformations) 定义计算结果的输出位置(Sink) 触发程序执行(Execute)         flink 程序可以在各种上

    2024年02月05日
    浏览(28)
  • Flink oracle cdc - Oracle Logminer CDC性能问题

    最近的项目中有用到Flink Oracle CDC实时到监听数据库变化,将变化的数据sink到Kafka。Oracle CDC依赖Debezium组件解析Redo Log与Archive Log,Debezium 通过Oracle 的Logminer解析Log。在我们生产环境遇到 运行一段时间后,再也查询不到数据,直到报miss log file异常(线上环境cron job 将一

    2024年02月08日
    浏览(77)
  • Flink基础之DataStream API

    union联合:被unioin的流中的数据类型必须一致 connect连接:合并的两条流的数据类型可以不一致 connec后,得到的是ConnectedStreams 合并后需要根据数据流是否经过keyby分区 coConnect: 将两条数据流合并为同一数据类型 keyedConnect 目前所使用的大多数Sink, 都是基于2PC的方式来保证状态

    2024年02月05日
    浏览(33)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(29)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(40)
  • Flink CDC系列之:Oracle CDC Connector

    2023年08月23日
    浏览(42)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(36)
  • 《Flink学习笔记》——第五章 DataStream API

    一个Flink程序,其实就是对DataStream的各种转换,代码基本可以由以下几部分构成: 获取执行环境 读取数据源 定义对DataStream的转换操作 输出 触发程序执行 获取执行环境和触发程序执行都属于对执行环境的操作,那么其构成可以用下图表示: 其核心部分就是Transform,对数据

    2024年02月10日
    浏览(33)
  • Flink CDC系列之:Oracle CDC 导入 Elasticsearch

    Flink CDC系列之:Oracle CDC Connector 该 Docker Compose 中包含的容器有: Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据 Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中 Kibana: 可视化 Elasticsearch 中的数据 在 docker-compose.yml 所在目录下运行如下

    2024年02月12日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包