实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

这篇具有很好参考价值的文章主要介绍了实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言:

1、springboot引入依赖:

2、yml配置文件

3、创建SQL server CDC变更数据监听器

4、反序列化数据,转为变更JSON对象

5、CDC 数据实体类

6、自定义ApplicationContextUtil

7、自定义sink 交由spring管理,处理变更数据


前言:

        我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》

废话不多说,直接上干货,如有不足还请指正

1、springboot引入依赖:

    <properties>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.0.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
    </dependencies>

2、yml配置文件

spring:
    datasource:
    url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
    username: sa
    password: root
    driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 实时同步SQL Server数据库配置
CDC:
  DataSource:
    host: 127.0.0.1
    port: 1433
    database: HM_5001
    tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
    username: sa
    password: sa

3、创建SQL server CDC变更数据监听器

import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
 * SQL server CDC变更监听器
 **/
@Component
@Slf4j
public class SQLServerCDCListener implements ApplicationRunner, Serializable {

    /**
     * CDC数据源配置
     */
    @Value("${CDC.DataSource.host}")
    private String host;
    @Value("${CDC.DataSource.port}")
    private String port;
    @Value("${CDC.DataSource.database}")
    private String database;
    @Value("${CDC.DataSource.tableList}")
    private String tableList;
    @Value("${CDC.DataSource.username}")
    private String username;
    @Value("${CDC.DataSource.password}")
    private String password;

    private final DataChangeSink dataChangeSink;

    public SQLServerCDCListener(DataChangeSink dataChangeSink) {
        this.dataChangeSink = dataChangeSink;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("开始启动Flink CDC获取ERP变更数据......");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.addSink(dataChangeSink);
        env.execute("SQLServer-stream-cdc");
    }

    /**
     * 构造CDC数据源
     */
    private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = tableList.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(host)
                .port(Integer.parseInt(port))
                .database(database) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(username)
                .password(password)
                /*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化) 
                 */
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }
}

4、反序列化数据,转为变更JSON对象

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;

/**
 * SQLServer消息读取自定义序列化
 **/
@Slf4j
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";

    /**
     *
     * 反序列化数据,转为变更JSON对象
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     *
     * 从源数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
}

5、CDC 数据实体类

import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * CDC 数据实体类
 */
@Data
public class DataChangeInfo implements Serializable {

    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;
}

6、自定义ApplicationContextUtil

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.io.Serializable;

@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
    /**
     * 上下文
     */
    private static ApplicationContext context;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }
    public static ApplicationContext getApplicationContext() {
        return context;
    }
    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

7、自定义sink 交由spring管理,处理变更数据

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

/**
 * 自定义sink 交由spring管理
 * 处理变更数据
 **/
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {

    private static final long serialVersionUID = -74375380912179188L;

    private UserMapper userMapper;

    /**
     * 在open()方法中动态注入Spring容器的类
     * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;
     * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,
     * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        userMapper = ApplicationContextUtil.getBean(UserMapper.class);
    }

    @Override
    public void invoke(DataChangeInfo dataChangeInfo, Context context) {
        log.info("收到变更原始数据:{}", dataChangeInfo);
        // TODO 开始处理你的数据吧
    }

以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。文章来源地址https://www.toymoban.com/news/detail-499803.html

到了这里,关于实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SQL server开启变更数据捕获(CDC)

    多多点赞,会变好看! 多多留言,会变有钱! 变更数据捕获(Change Data Capture ,简称 CDC):记录 SQL Server 表的插入、更新和删除操作。开启cdc的源表在插入、更新和删除操作时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可

    2024年02月11日
    浏览(25)
  • Flink实战-(6)FlinkSQL实现CDC

    FlinkSQL说明 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初

    2023年04月26日
    浏览(39)
  • 基于 Flink SQL CDC 数据处理的终极武器

    来源互联网多篇文章总结 业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存

    2024年02月16日
    浏览(21)
  • 深入浅出 SQL Server CDC 数据同步

    SQL Server 是一款老牌关系型数据库,自 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出,不断迭代更新至今,拥有相当广泛的用户群体。 如今,我们提到 SQL Server 通常指 Microsoft SQL Server 2000 之后的版本。 SQL Server 2008 是一个里程碑版本,加入了大量新特性,包括 新的语法

    2024年02月12日
    浏览(30)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(34)
  • 【flink报错】flink cdc无主键时的操作

    “org.apache.flink.table.api.validationexception: ‘scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys” 报错提示当表没有主键时,必须设置 ‘scan.incremental.snapshot.chunk.key-column’。 这里表没有主键,不是flink table中设置的primary key,而是物理表中没有主键。 如

    2024年04月23日
    浏览(20)
  • Doris通过Flink CDC接入MySQL实战

    1. 创建MySQL库表,写入demo数据 登录测试MySQL 创建MySQL库表,写入demo数据 注意:MySQL需要开通bin-log log_bin=mysql_bin binlog-format=Row server-id=1 2. 创建Doris库表 创建Doris表 3. 启动Flink 启动flink 创建Flink 任务: 输入如下地址,查看flink任务 http://localhost:8081/#/job/running 数据验证:启动后可

    2023年04月10日
    浏览(31)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(63)
  • flink cdc数据同步,DataStream方式和SQL方式的简单使用

    目录 一、flink cdc介绍 1、什么是flink cdc 2、flink cdc能用来做什么 3、flink cdc的优点 二、flink cdc基础使用 1、使用flink cdc读取txt文本数据 2、DataStream的使用方式 3、SQL的方式 总结 flink cdc是一个由阿里研发的,一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数

    2024年02月13日
    浏览(25)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包