Flink CDC实现一个Job同步多个表

这篇具有很好参考价值的文章主要介绍了Flink CDC实现一个Job同步多个表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

直接使用Flink CDC SQL的写法,一个Job只能同步一个表的数据,至于原因,在此不再赘述。

直接上代码吧

第一步,自定义 DebeziumDeserializationSchema

将SourceRecord类转化为自定义的JsonRecord类型

public class JsonStringDebeziumDeserializationSchema
        implements DebeziumDeserializationSchema<JsonRecord> {

    @Override
    public void deserialize(SourceRecord record, Collector<JsonRecord> out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
        String tableName = record.topic();
        //out.collect("source table name is :" + tableName);
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            String insertMapString = extractAfterRow(value, valueSchema);
            JsonRecord jsonRecord = new JsonRecord(tableName, "i", insertMapString);
            out.collect(jsonRecord);
        } else if (op == Envelope.Operation.DELETE) {
            String deleteString = extractBeforeRow(value, valueSchema);
            JsonRecord jsonRecord = new JsonRecord(tableName, "d", deleteString);
            out.collect(jsonRecord);
        } else if (op == Envelope.Operation.UPDATE) {
            String updateString = extractAfterRow(value, valueSchema);
            JsonRecord jsonRecord = new JsonRecord(tableName, "u", updateString);
            out.collect(jsonRecord);
        }
    }

    @Override
    public TypeInformation<JsonRecord> getProducedType() {
        return TypeInformation.of(new TypeHint<JsonRecord>(){});
    }


    private String extractAfterRow(Struct value, Schema valueSchema) throws Exception {
        Struct after = value.getStruct(Envelope.FieldName.AFTER);
        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
        Map<String, Object> map = getRowMap(after, afterSchema);
        ObjectMapper objectMapper = new ObjectMapper();
        return  objectMapper.writeValueAsString(map);
    }

    private String extractBeforeRow(Struct value, Schema valueSchema)
            throws Exception {
        Struct beforeValue = value.getStruct(Envelope.FieldName.BEFORE);
        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
        Map<String, Object> map =  getRowMap(beforeValue, beforeSchema);
        ObjectMapper objectMapper = new ObjectMapper();
        return  objectMapper.writeValueAsString(map);
    }

    private Map<String, Object> getRowMap(Struct value, Schema valueSchema) {
        Map<String, Object> map = new HashMap<>();
        for (Field field : valueSchema.fields()) {
            map.put(field.name(), value.get(field.name()));
        }
        return map;
    }

JsonRecord类定义如下:

@Data
public class JsonRecord {
    private String tableName;
    private String op;
    private String fieldValue;
}

其中fieldValue为字段map序列化后的字符串

第二步,构建DebeziumSourceFunction

public class OracleDebeziumFunctionBuilder {

    public DebeziumSourceFunction build(OracleConnectionOption option) {

        OracleSource.Builder builder = OracleSource.builder();
        builder.hostname(option.getHostName());

        builder.port(option.getPort());

        builder.username(option.getUserName());
        builder.password(option.getPassword());

        builder.database(option.getDatabaseName());

        String[] tableArray = new String[option.getTableNames().size()];
        int count = 0;
        for (String tableName : option.getTableNames()) {
            tableArray[count] = option.getSchemaName() + "." + tableName;
            count++;
        }

        String[] schemaArray = new String[]{option.getSchemaName()};
        builder.tableList(tableArray);
        builder.schemaList(schemaArray);

        // dbzProperties
        Properties dbzProperties = new Properties();
        dbzProperties.setProperty("database.tablename.case.insensitive", "false");
        if (option.isUseLogmine()) {
            dbzProperties.setProperty("log.mining.strategy", "online_catalog");
            dbzProperties.setProperty("log.mining.continuous.mine", "true");
        } else {
            dbzProperties.setProperty("database.connection.adpter", "xstream");
            dbzProperties.setProperty("database.out.server.name", option.getOutServerName());
        }
        builder.debeziumProperties(dbzProperties);
        builder.deserializer(new JsonStringDebeziumDeserializationSchema());
        builder.startupOptions(option.getStartupOption());
        return builder.build();
    }
}
OracleConnectionOption类定义如下:
public class OracleConnectionOption {

    private String hostName;
    private int port;
    private String databaseName;
    private String userName;
    private String password;

    /** 是否支持logmine */
    private boolean useLogmine;

    private String outServerName;

    private List<String> tableNames;

    private String schemaName;

    private StartupOptions startupOption;
}

第三步,编写main函数

通过OutputTag实现分流文章来源地址https://www.toymoban.com/news/detail-525573.html

public class CdcStartup {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        OracleConnectionOption connectionOption = new OracleConnectionOption();
        connectionOption.setHostName(...);
        connectionOption.setPort(...);
        connectionOption.setDatabaseName(...);
        connectionOption.setUserName(...);
        connectionOption.setPassword(...);
        connectionOption.setUseLogmine(...);
        connectionOption.setOutServerName(...);
        connectionOption.setSchemaName(...);
        connectionOption.setStartupOption(StartupOptions.initial());
        List<String> tableNames =new ArrayList();
        // 添加要同步的表名;
        tableNames.add("") ;
        OracleDebeziumFunctionBuilder functionBuilder = new OracleDebeziumFunctionBuilder();
        DebeziumSourceFunction sourceFunction = functionBuilder.build(connectionOption);
        DataStreamSource<JsonRecord> dataStreamSource = env.addSource(sourceFunction );
        //sink
        Map<String, OutputTag<JsonRecord>> outputTagMap = new HashMap<>();
        for (String tableName : tableNames) {
            outputTagMap.put(tableName , new OutputTag(tableName, TypeInformation.of(JsonRecord.class)));
        }

        SingleOutputStreamOperator mainStream = dataStreamSource.process(new ProcessFunction<JsonRecord, Object>() {
            @Override
            public void processElement(JsonRecord value, Context ctx, Collector<Object> out) throws Exception {
                int index = value.getTableName().lastIndexOf(".");
                String originalName= value.getTableName().substring(index + 1);
                ctx.output(outputTagMap.get(originalName), value);
            }
        });

       for (String tableName : tableNames) {
            
            DataStream outputStream = mainStream.getSideOutput(outputTagMap.get(tableName));
            CustomSinkFunction sinkFunction = new CustomSinkFunction ();//自定义sink
            outputStream.addSink(sinkFunction).name(tableName);
        }
        env.execute();
    }
}

到了这里,关于Flink CDC实现一个Job同步多个表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(78)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • 基于大数据平台(XSailboat)的计算管道实现MySQL数据源的CDC同步--flink CDC

    笔者在先前的一篇文档《数据标签设计 – 大数据平台(XSailboat)的数据标签模块》 提到了关于数据标签的模块,现已实现并应用于项目中。在项目中遇到这样一种情形: 如果打标信息和业务数据是在一个数据库实例中,那么只需要连接两张表进行查询即可。但是数据标签作为

    2024年01月17日
    浏览(68)
  • FLink多表关联实时同步

    Oracle-Debezium-Kafka-Flink-PostgreSQL Flink消费Kafka中客户、产品、订单(ID)三张表的数据合并为一张订单(NAME)表。 Oracle内创建三张表 PostgreSQL内创建一张表 其他前置环境 Oracle、PostgreSQL、Kafka、FLink、Debezium-Server的部署参见本系列其他文章搭建。 采用前置条件中的语句建表即可,

    2023年04月25日
    浏览(65)
  • Flink CDC数据同步

    一、什么是FLink Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 接下来,我们来介绍一下 Flink 架构中的重要方面。 任何类型的数据都可以形成一种事

    2024年02月08日
    浏览(47)
  • Flink CDC整库同步

    背景 项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql - greenplumn场景记录实现方案。 实现 1.引入依赖 2.创建FlinkCDCSource 创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的

    2024年04月14日
    浏览(53)
  • FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    本文介绍了  来源单表-目标源单表同步,多来源单表-目标源单表同步。 注:1.16版本、1.17版本都可以使用火焰图,生产上最好关闭,详情见文章末尾 Flink版本:1.16.2 环境:Linux CentOS 7.0、jdk1.8 基础文件: flink-1.16.2-bin-scala_2.12.tgz、 flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:

    2024年02月11日
    浏览(46)
  • 基于 Flink CDC 的实时同步系统

    摘要: 本文整理自科杰科技大数据架构师张军,在 FFA 2022 数据集成专场的分享。本篇内容主要分为四个部分: 功能概述 架构设计 技术挑战 生产实践 Tips: 点击 「阅读原文」 查看原文视频演讲 ppt 科杰科技是专门做大数据服务的供应商,目前的客户包括能源、金融、证券等

    2024年02月05日
    浏览(48)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(56)
  • flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包