flink cdc 连接posgresql 数据库相关问题整理

这篇具有很好参考价值的文章主要介绍了flink cdc 连接posgresql 数据库相关问题整理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink cdc 连接posgresql 数据库

01 、flink posgresql cdc

前置工作
1,更改配置文件postgresql.conf
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值
更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改

2,新建用户并且给用户复制流权限
-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';

-- 给用户复制流权限
ALTER ROLE user replication;

-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;

-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
3,发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

DataStream Api

1: maveny依赖引入

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

       <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
2.postgresqlCDC2Kafka.java代码
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

public class postgresqlCDC2Kafka {
    public static void main(String[] args) throws Exception {

        String fileName = args[0];
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileName);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
        //设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://ip:8020/../.."));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "hadoop");

   
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "initial");
        properties.setProperty("debezium.slot.name", "pg_cdc");
        properties.setProperty("debezium.slot.drop.on.stop", "true");
        properties.setProperty("include.schema.changes", "true");

        SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname("192.168.1.xxx")
                .port(5432)
                .database("databseName") // monitor postgres database
                .schemaList("schemaName")  // monitor inventory snachema
                .tableList("schemaName.table1,scheamName.tabl2,...") // monitor products table
                .username("userName")
                .password("password")
                .decodingPluginName("pgoutput")                
                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
                .debeziumProperties(properties)
                .build();

        DataStreamSource<String> pgDataStream =
                env
                .addSource(sourceFunction)
                .setParallelism(1); // use parallelism 1 for sink to keep message ordering

        // 设置kafka配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers","ip1:9092");
        kafkaProps.setProperty("transaction.max.timeout.ms",90000);
//         sink到kafka
        FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<>("topicName"), new SimpleStringSchema(), kafkaProps);
        pgDataStream.addSink(flinkKafkaProducer).name("sink2Kafka");

        env.execute("pg_cdc job");

    }
}

注意:postgresql 11以上,decodingPluginName为pgoutput

02、flink cdc错误整理

1:mysql-cdc指定剔除不需要监听的字段信息时抛出异常:

即指定"‘debezium.column.blacklist’"配置信息时抛出异常


org.apache.kafka.connect.errors.DataException: order_sales is not a valid field name
  at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
  at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
  at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$createRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:364)
  at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$wrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:390)
  at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:126)
  at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:101)
  at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97)
  at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
  at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
  at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

分析:指定debezium.column.blacklist该参数的意思是指在debezium监听到事件后会把记录中的指定字段删除,然后在flink做解析转换的时候找不到字段。

2:cdc source扫描mysql表期间,进行加锁操作。

解决方案:
       给使用的mysql用户授予reload权限即可。详细见:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
       使用'debezium.snapshot.locking.mode'='none'
3:同步锁表
User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

原因是连接MySQL的用户缺乏必要的CDC权限。

Flink CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。
解决办法:创建一个新的MySQL用户并授予其必要的权限。

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
4:Flink作业扫描MySQL全量数据出现fail-over

Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:
flink cdc 连接posgresql 数据库相关问题整理,Flink,数据库,flink,大数据
原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。
解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
5:作业在运行时 mysql cdc source 报 no viable alternative at input ‘alter table std’

flink cdc 连接posgresql 数据库相关问题整理,Flink,数据库,flink,大数据
原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。
6:多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:


FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

7: flinksql cdc时区差8小时的问题
在连接参数中设置 ‘server-time-zone’ = ‘Asia/Shanghai’

比如:WITH (

‘connector’ = ‘mysql-cdc’,

‘hostname’ = ‘xxx’,

‘port’ = ‘3306’,

‘username’ = ‘root’,

‘password’ = ‘root’,

‘database-name’ = ‘xxx’,

‘table-name’ = ‘xxx’,

‘server-time-zone’ = ‘Asia/Shanghai’

不设置的话可能会改变MySQL中时间字段比如datetime减8小时

在sql语句中使用LOCALTIMESTAMP或者手动给时间戳加8小时而不要用current_date等
补充:
如果要sink到MySQL的话,在url后加&serverTimezone=Asia/Shanghai 否则时区也会对不上或者在url上
添加

jdbc:mysql://${hostname}/${db_name}useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true
8:flink cdc Encountered chage event for table xxx.xxxx whose schema isn’t known to this connector

解决方案:

inconsistent.schema.handing.mode=''warn'
9: Flinksql From Mysql-cdc Sink to Hbase Cause Miss Data

flink cdc 连接posgresql 数据库相关问题整理,Flink,数据库,flink,大数据
定位:
1:改源码,增加log
2:查看写入逻辑


#open逻辑,有个定时任务刷新
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
        this.executor = Executors.newScheduledThreadPool(
          1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
        this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
          if (closed) {
            return;
          }
          try {
            flush();
          } catch (Exception e) {
            // fail the sink and skip the rest of the items
            // if the failure handler decides to throw an exception
            failureThrowable.compareAndSet(null, e);
          }
        }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
      }

# invoke逻辑
if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
      flush();
}

# snapshot逻辑,当队列中还有数据请求未刷新时才满足
while (numPendingRequests.get() != 0) {
      flush();
}

以RowKey=0为例发现操作已经被封住在Mutation中,且已经被刷新了。但在hbase中并未找到该key.猜测可能在Mutator处理乱序数据了。
搜索查证资料:

https://www.jianshu.com/p/1a753ffcbe2ahttps://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455

解决方案:

1:短期方案:设置'sink.buffer-flush.max-rows'='2'暂时规避该问题,但对rs会有较大压力
2:彻底解决:基于issue改造源码
10:相关参数说明:
snapshot.mode的各种参数,以下是测试效果
properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector
properties.setProperty("snapshot.mode", "initial");每次重启都会读全量
properties.setProperty("snapshot.mode", "initial_only");//读不到数据
properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似
properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到
properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot

flink cdc 连接posgresql 数据库相关问题整理,Flink,数据库,flink,大数据文章来源地址https://www.toymoban.com/news/detail-726906.html

到了这里,关于flink cdc 连接posgresql 数据库相关问题整理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Flink CDC从数据库采集数据,保证数据不丢失:实现断点续传机制

    大数据技术在当前的数据分析和处理中扮演着重要的角色。Apache Flink作为一种快速、可靠的流处理引擎,在大规模数据处理中广受欢迎。本文将介绍如何使用Flink CDC(Change Data Capture)从数据库采集数据,并通过设置checkpoint来支持数据采集中断恢复,从而保证数据不丢失。

    2024年02月04日
    浏览(34)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(79)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(35)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(40)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(38)
  • 11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

    问题是来自于 群友, 2024.03.29, 也是花了一些时间 来排查这个问题  大致的问题是用 mysql-cdc 连接了一个 mysql-pxc 集群, 然后创建了一个 test_user 表  使用 \\\"select * from test_user\\\" 获取数据表的数据, 可以拿到 查询时的快照, 但是 无法获取到后续对于 test_user 表的增量操作的数据, 比如

    2024年04月15日
    浏览(42)
  • 【FlinkCdc】Flink MysqlCdc连接数据库失败,SSLHandshakeException怎么破?

    用Flink MysqlCdc同步一个新数据库时,遇到了一个新异常,javax.net.ssl.SSLHandshakeException。根据异常栈信息,mysqlcdc尝试与mysql server建立连接时,连接失败了,报Communications link failure. The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server

    2024年02月03日
    浏览(38)
  • 数据库索引面试的相关问题

    查看索引的执行计划 索引失效的情况 1、索引列上做了计算,函数,类型转换等操作。索引失效是因为查询过程需要扫描整个索引并回表。代价高于直接全表扫描。 Like匹配使用了前缀匹配符“%abc” 字符串不加引号导致类型转换。 原因: 常见索引的优化的方法 1、前缀索引

    2024年02月22日
    浏览(30)
  • 数据库连接问题 1251

    Navicat连接本地数据库时出现的问题 解决办法 : 打开 输入密码 然后输入 ALTER USER ‘root’@‘localhost’ IDENTIFIED WITH mysql_native_password BY ‘123456’; FLUSH PRIVILEGES;

    2024年02月07日
    浏览(34)
  • 功能测试也可以发现数据库相关的性能问题

    很多同学认为功能测试和性能测试是严格分开的,功能测试人员无法发现性能问题。其实不是这样的,功能测试人员在验证功能时也可以发现性能问题;一些功能反而在功能测试环境不好验证,需要在性能环境上测试。     今天咱们就说一下测试涉及数据库操作的功能时如何

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包