Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

这篇具有很好参考价值的文章主要介绍了Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink-cdc官网:Oracle CDC Connector — CDC Connectors for Apache Flink® documentation

Flink环境依赖:

(1)下载postgresql jdbc  jar包 
postgresql-42.3.5 和 flink-sql-connector-oracle-cdc-2.2.0.jar将包放到flink 下 lib目录里面
下载地址https://jdbc.postgresql.org/download.html

flink-connector-jdbc_2.12_1.14.4.jar 包
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.14.4

(2)以 DBA 身份连接到数据库
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
  CONNECT sys/password AS SYSDBA

(3)启用日志归档

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
•	启用日志归档需要重启数据库,尝试时注意
•	归档日志会占用大量磁盘空间,建议定期清理过期日志

(4)检查是否启用了日志归档

-- Should now "Database log mode: Archive Mode"
archive log list;
必须为捕获的表或数据库启用补充日志记录,以便数据更改捕获已更改数据库行的之前状态。下面说明了如何在表/数据库级别进行配置。
-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(5)创建具有权限的 Oracle 用户

(5.1)。创建表空间

sqlplus sys/password@host:port/SID AS SYSDBA;
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;


(5.2)。创建用户并授予权限

sqlplus sys/password@host:port/SID AS SYSDBA;
  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;

  GRANT CREATE TABLE TO flinkuser;
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

Flink SQL 客户端连接器测试:

  1. 创建Oracle链接器
CREATE TABLE TEST_source (
       ID INT,
       PRIMARY KEY (ID) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = 'Oracle_IP地址',
    'port' = '1521',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'ORA19C',
    'schema-name' = 'FLINKUSER',
    'table-name' = 'TEST'
    'debezium.log.mining.strategy'='online_catalog'

);
2.创建postgresql链接器接收端
create table flink_cdc_sink1(
ID INT,
primary key(ID) NOT ENFORCED)
with(
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://pg库_IP地址:5432/ postgres?currentSchema=public', 
'username' = 'postgres',
'password' = '123456',  
'table-name' = 'sink1'
);
3.插入数据 
insert into flink_cdc_sink1  select ID from  TEST_source;
4.问题:数据同步不过去

Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

解决方案:检查flink-connector-jdbc.jar 版本问题 替换即可
FLINK Oracle to Postgresql JAVA
1. java编码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 测试 flink cdc 实时获取oracle数据变化
 */
public class FlinkCdcOracleExample {

    public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        String sourceDDL ="CREATE TABLE oracle_source (\n" +
                "     ID INT, \n" +
                "     NAME STRING, \n" +
                "     PRIMARY KEY (ID) NOT ENFORCED \n" +
                "     ) WITH (\n" +
                "     'connector' = 'oracle-cdc',\n" +
                "     'hostname' = 'Oracle_IP地址',\n" +
                "     'port' = '1521',\n" +
                "     'username' = 'flinkuser',\n" +
                "     'password' = 'flinkpw',\n" +
                "     'database-name' = 'ORA19C',\n" +
                "     'schema-name' = 'FLINKUSER',\n" +           // 注意这里要大写
                "     'table-name' = 'tablename',\n" + 

"     'debezium.log.mining.strategy'='online_catalog'\n"+
                "     )";
        // 创建一张用于输出的表
        String sinkDDL = "CREATE TABLE outTable (\n" +
                " id INT,\n" +
                " name STRING, \n" +
                " PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = 'jdbc:postgresql://PG库_IP地址:5432/postgres?currentSchema=public',\n" +
                " 'username' = 'postgres',\n" +
                " 'password' = '123456',\n" +
                " 'table-name' = 'pg_sink'\n" +
                ")";
        /*String transformSQL =
                "select * from  oracle_source ";*/
        String transformSQL =
                "INSERT INTO outTable " +
                        "SELECT ID,NAME " +
                        "FROM oracle_source";
        //执行source表ddl
        tableEnv.executeSql(sourceDDL);
        //TableResult tableResult = tableEnv.executeSql("select * from oracle_source");
        //tableResult.print();
        //执行sink表ddl
      tableEnv.executeSql(sinkDDL);
        //执行逻辑sql语句
        TableResult tableResult = tableEnv.executeSql(transformSQL);
        tableResult.print();
        env.execute();
    }
}

返回内容 以上代码是修改后的应不会有下图报错

 Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

注:报这个错误,但数据可以同步过去

 错误:可以读取oracle表内的数据,但jdbc连接postgres 报错,数据传不过去

Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

解决:修改maven依赖
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-oracle-cdc</artifactId>
    <version>2.2.0</version>
</dependency>

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.5</version>
</dependency>
flink sql  创建oracle 接收器
create table flink_cdc_sink (
ID INT,
NAME STRING
)with(
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@192.168.58.202:1521:ORA19C',
'username' = 'flinkuser',
 'password' = 'flinkpw', 
'table-name' = 'TEST2',
 'driver' = 'oracle.jdbc.driver.OracleDriver');
报错:

Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理文章来源地址https://www.toymoban.com/news/detail-514038.html

jdbc 连接oracle错误处理

解决方法:目前flink 1.14不支持jdbc 连接oracle 需要安装 flink 1.15 处理
Flink 1.15 安装 需要使用java11 
1.官网下载java 11
https://www.oracle.com/java/technologies/downloads/#java11
2.解压 jdk tar 
linux>tar -xzvf jdk-11.0.15.1_linux-x64_bin.tar.gz
3.修改环境配置文件
linux> vim /etc/profile
# Java11环境变量配置
JAVA_HOME=/devtools/java/java11/jdk-11.0.15
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME CLASSPATH PATH
 
# Java8环境变量配置
JAVA_HOME=/devtools/java/java8/jdk1.8.0_321
PATH=$PATH:$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/lib
export JAVA_HOME PATH CLASSPATH
4.重启电脑生效
5.下载flink 1.15
linux>Wget https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
6.配置 flink 1.15
linux>vim conf/flink-conf.yaml

jobmanager.rpc.address: jobIP地址
# 配置high-availability mode
high-availability: zookeeper
# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针 
high-availability.storageDir: hdfs://cluster/flinkha/
# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)
high-availability.zookeeper.quorum: IPA:2181,IPB:2181,IPC:2181 
# (可选)设置zookeeper的root目录
#high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 注释以下配置
# jobmanager.bind-host: localhost
# taskmanager.bind-host: localhost
#taskmanager.host: localhost
#rest.address: localhost
#rest.bind-address: localhost

#配置yarn 高可用重试次数
yarn.application-attempts: 10
注意:必须要操作上面的“注释以下配置” 否则Web UI 访问不了 其余配置一样,可以参考最上面的搭建。
 
 
                    

到了这里,关于Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Oracle CDC Connector源码解读

    flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。 flink oracle

    2024年02月02日
    浏览(44)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(47)
  • Flink CDC 实时抽取 Oracle 数据-排错&调优

    Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。对该版本进行试用并成功实现了对 Oracle 的实时数据捕获以及性能调优,现将试用过程中的一些关键细节进行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    浏览(46)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql(无主键)

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 具体环境设置和maven依赖请看上篇:Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 现在操作的是源表和目标表都无主键数

    2024年02月15日
    浏览(40)
  • 实例讲解C++连接各种数据库,包含SQL Server、MySQL、Oracle、ACCESS、SQLite 和 PostgreSQL、MongoDB 数据库

      C++ 是一种通用的编程语言,可以使用不同的库和驱动程序来连接各种数据库。以下是一些示例代码,演示如何使用 C++ 连接 SQL Server、MySQL、Oracle、ACCESS、SQLite 和 PostgreSQL、MongoDB 数据库。 连接 SQL Server 数据库 要使用 C++ 连接 SQL Server 数据库,可以使用 Microsoft 的 ADODB 库。以

    2024年02月05日
    浏览(66)
  • 【现场问题】flink-cdc,Oracle2Mysql的坑,Oracle区分大小写导致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入为空的数值。 为什么会报这个错误,我们来看DML的执行语句: insert into t_wx_target select

    2024年02月12日
    浏览(47)
  • FLINK CDC postgresql (Stream与SQL)

    Postgres CDC Connector — CDC Connectors for Apache Flink® documentation flink cdc捕获postgresql数据 1)更改配置文件 需要更改 # 更改wal日志方式为logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个 # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 # 中断

    2023年04月27日
    浏览(43)
  • Flink CDC Oracle 用户权限不足 ORA-01031: insufficient privileges

    Flink CDC Oracle用户权限不足 版本:flink1.14.5 、flinkcdc 2.2.1、oracle11g、 场景:flink cdc 实时抽取oracle的数据表。DBA为了数据库安全考虑,对访问用户权限进行控制。将oracle的flinkuser用户XE下的orders表授权只读权限给readuser用户。授权情况如下: 此时执行flink oracle cdc 任务: taskmange

    2024年02月12日
    浏览(58)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(46)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包