Flink CDC实时同步PG数据库

这篇具有很好参考价值的文章主要介绍了Flink CDC实时同步PG数据库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

版本:

JDK:1.8

Flink:1.16.2

Scala:2.11

Hadoop:3.1.3

github地址:https://github.com/rockets0421/FlinkCDC-PG.git 

一、前置准备工作

1、更改配置文件postgresql.conf

# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable 

wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

更改配置文件postgresql.conf完成,需要重启pg服务生效

2、新建用户并且给用户复制流权限

-- pg新建用户

CREATE USER user WITH PASSWORD 'pwd';

-- 给用户复制流权限
ALTER ROLE user replication;

-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;


-- 把当前库public下所有表查询权限赋给用户

GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

3、发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

4、更改表的复制标识包含更新和删除的值

-- 更改复制标识包含更新和删除之前值
ALTER TABLE xxxxxx REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功)
select relreplident from pg_class where relname='xxxxxx';

 二、Flink读取PG数据

1、加载依赖

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.version}</artifactId>
    <version>${flink.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>kafka-clients</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>

注意:如果依赖中有flink-connector-kafka,可能会有冲突,需要手动<exclisions>排除冲突

2、使用Flink CDC创建pg的source

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public static SourceFunction getPGSource(String database, String schemaList,String tableList, String slotName) {
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "always"); //always:全量+增量  never:增量
        properties.setProperty("debezium.slot.name", "pg_cdc");
        //在作业停止后自动清理 slot
        properties.setProperty("debezium.slot.drop.on.stop", "true");
        properties.setProperty("include.schema.changes", "true");
        // PostGres 数据库
        SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname("localhost")
                .port(5432)
                .database(database) // monitor postgres database
                .schemaList(schemaList)  // monitor inventory schema
                .tableList(tableList) // monitor products table 支持正则表达式
                .username("postgres")
                .password("postgres")
                .decodingPluginName("pgoutput")
                //Flink CDC 默认一张表占用一个 slot。多个未指定 slot.name 的连接会产生冲突。
                .slotName(slotName)
                .deserializer(new MyDebezium()) // 自定义序列化器,解决pg数据库日期格式的数据问题和时区问题
                //.deserializer(new JsonDebeziumDeserializationSchema()) // 
                .debeziumProperties(properties)
                .build();
        return sourceFunction;
    }

3、自定义序列化器

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.time.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

public class MyDebezium implements DebeziumDeserializationSchema<String> {

    // 日期格式转换时区
    private static String serverTimeZone = "Asia/Shanghai";

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 1. 创建一个JSONObject用来存放最终封装好的数据
        JSONObject result = new JSONObject();

        // 2. 解析主键
        Struct  key = (Struct)sourceRecord.key();
        JSONObject keyJs = parseStruct(key);

        // 3. 解析值
        Struct value = (Struct) sourceRecord.value();
        Struct source = value.getStruct("source");

        JSONObject beforeJson = parseStruct(value.getStruct("before"));
        JSONObject afterJson = parseStruct(value.getStruct("after"));

        //将数据封装到JSONObject中
        result.put("db", source.get("db").toString().toLowerCase());
        //result.put("schema", source.get("schema").toString().toLowerCase()); 架构名 看是否需要
        result.put("table", source.get("table").toString().toLowerCase());
        result.put("key", keyJs);
        result.put("op", value.get("op").toString());
        result.put("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)));
        result.put("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)));
        result.put("before", beforeJson);
        result.put("after", afterJson);


        //将数据发送至下游
        collector.collect(result.toJSONString());
    }

    private JSONObject parseStruct(Struct valueStruct) {
        if (valueStruct == null) return null;

        JSONObject dataJson = new JSONObject();
        for (Field field : valueStruct.schema().fields()) {
            Object v = valueStruct.get(field);
            String type = field.schema().name();
            Object val = null;

            if (v instanceof Long) {
                long vl = (Long) v;
                val = convertLongToTime(vl, type);
            } else if (v instanceof Integer){
                int iv = (Integer) v;
                val = convertIntToDate(iv, type);
            } else if (v == null) {
                val = null;
            } else {
                val = convertObjToTime(v, type);
            }
            dataJson.put(field.name().toLowerCase(), val);
        }
        return dataJson;
    }

    private Object convertObjToTime(Object obj, String type) {
        Object val = obj;
        if (Time.SCHEMA_NAME.equals(type) || MicroTime.SCHEMA_NAME.equals(type) || NanoTime.SCHEMA_NAME.equals(type)) {
            val = java.sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString();
        } else if (Timestamp.SCHEMA_NAME.equals(type) || MicroTimestamp.SCHEMA_NAME.equals(type) || NanoTimestamp.SCHEMA_NAME.equals(type) || ZonedTimestamp.SCHEMA_NAME.equals(type)) {
            val = java.sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString();
        }
        return val;
    }

    private Object convertIntToDate(int obj, String type) {
        SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
        Object val = obj;
        if (Date.SCHEMA_NAME.equals(type)) {
            val = org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalDate().toString();
        }
        return val;
    }

    private Object convertLongToTime(long obj, String type) {
        SchemaBuilder time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time");
        SchemaBuilder date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date");
        SchemaBuilder timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp");
        Object val = obj;
        if (Time.SCHEMA_NAME.equals(type)) {
            val = org.apache.kafka.connect.data.Time.toLogical(time_schema, (int)obj).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalTime().toString();
        } else if (MicroTime.SCHEMA_NAME.equals(type)) {
            val = org.apache.kafka.connect.data.Time.toLogical(time_schema, (int)(obj / 1000)).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalTime().toString();
        } else if (NanoTime.SCHEMA_NAME.equals(type)) {
            val = org.apache.kafka.connect.data.Time.toLogical(time_schema, (int)(obj / 1000 / 1000)).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalTime().toString();
        } else if (Timestamp.SCHEMA_NAME.equals(type)) {
            LocalDateTime t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalDateTime();
            val = java.sql.Timestamp.valueOf(t).toString();
        } else if (MicroTimestamp.SCHEMA_NAME.equals(type)) {
            LocalDateTime t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalDateTime();
            val = java.sql.Timestamp.valueOf(t).toString();
        } else if (NanoTimestamp.SCHEMA_NAME.equals(type)) {
            LocalDateTime t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalDateTime();
            val = java.sql.Timestamp.valueOf(t).toString();
        } else if (Date.SCHEMA_NAME.equals(type)) {
            val = org.apache.kafka.connect.data.Date.toLogical(date_schema, (int)obj).toInstant().atZone(ZoneId.of(serverTimeZone)).toLocalDate().toString();
        }
        return val;
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

注:这段代码来自于https://huaweicloud.csdn.net/63356ef0d3efff3090b56c01.html中的scala版本

三、项目遇坑

1、类型不匹配报错问题

使用DataStream API方式读取多表数据,就无法定义数据格式,所以统一转为JsonObject格式,如果需要使用流中的数据去进行sql占位符的填充,再去pg中执行,就会提示类型不匹配

方法一:使用函数,用PG的函数去进行类型的强转

a.a1 = b.b1::int8 或者 a.a1::varchar = b.b1

缺点:需要改动SQL语句

方法二:隐式类型自动转换

MySQL、Oracle等都是默认对数据类型进行了隐式的转换,在其他数据库varchar等字符串类型和数字可以进行自动的隐式转换,但是PG确没有这么处理,但可以通过PG的自定义类型转换定义自己想要的隐式类型转换

--创建类型转换
--注:创建cast需要有pg_cast系统表的权限
--注:当创建类型转换使用自动隐式转换的话如果出现多个匹配的转换此时pg会因为不知道选择哪一个去处理类型转换而报错,
--如果出现多个隐式自动转换都匹配此时还是需要手动添加转换以达到效果,或者删除多余的类型转换
CREATE CAST (INTEGER AS VARCHAR) WITH INOUT AS IMPLICIT;
CREATE CAST (VARCHAR AS INTEGER) WITH INOUT AS IMPLICIT;
CREATE CAST (BIGINT AS VARCHAR) WITH INOUT AS IMPLICIT;
CREATE CAST (VARCHAR AS BIGINT) WITH INOUT AS IMPLICIT;
CREATE CAST (DATE VARCHAR) WITH INOUT AS IMPLICIT;
CREATE CAST (VARCHAR AS DATE ) WITH INOUT AS IMPLICIT;

--查询当前的类型转换
--这个查询是当前所有的CAST
select 
	(select typname from pg_type where oid = t.castsource) as "castsource",
	(select typname from pg_type where oid = t.casttarget) as "casttarget",
	castcontext,
	castmethod
from pg_cast as t

注意:如果使用隐式转换,需要在连接 URL 中通过设置 stringtype=unspecified 来禁用 JDBC 驱动对数据类型的预测

2、没有发布所有表时

如果已经所有表进行发布,那就不用看这个问题
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

如果上游数据库只发不了某些表,那么在进行CDC操作时,就需要手动指定publication.name的发布名称,否则debezium会自动执行发布所有表这一操作,一旦账号权限不足,会在数据库造成异常,提示permission denied for database xxx;那么也会无法读取数据文章来源地址https://www.toymoban.com/news/detail-546265.html

到了这里,关于Flink CDC实时同步PG数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(35)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(35)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(89)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(27)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(35)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(37)
  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(33)
  • flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(30)
  • flink cdc 连接posgresql 数据库相关问题整理

    01 、flink posgresql cdc 前置工作 1,更改配置文件postgresql.conf wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值 更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改 2,新建用户并且给用户复制流权限 3,发

    2024年02月07日
    浏览(31)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包