Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)

这篇具有很好参考价值的文章主要介绍了Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前置条件

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

FLUSH PRIVILEGES;

二、创建项目

基于jdk1.8 + springboot2.7.x + elasticsearch7.x

1、pom 主要依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.29</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    <version>2.7.2</version>
</dependency>
<!-- Flink CDC connector for MySQL -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.14.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.14.4</version>
</dependency>

2、yml 配置 MySQL 连接信息

mysql-cdc-es:
  infos:
  ip: 192.168.xxx.xxx
  port: 3306
  dbs: 对应MySQL的schema名字,如 mysql-es
  user: username
  pwd: password
  tables: mysql-es.表名

3、MySQL 连接信息对应配置类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "mysql-cdc-es")
public class MysqlCdcInfo {
    private String ip;
    private int port;
    private String dbs;
    private String user;
    private String pwd;
    private String tables;
}

4、创建数据变更对象

import lombok.Data;

@Data
public class DataChangeInfo {
    /**
     * 变更类型: 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作
     */
    private Integer operatorType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;

    /**
     * 操作的数据
     */
    private String data;
    /**
     * binlog文件名
     */
    private String fileName;
    /**
     * binlog当前读取点位
     */
    private Integer filePos;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private Long operatorTime;

}

5、实现MySQL消息读取自定义序列化

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@Slf4j
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BIN_FILE = "file";
    public static final String POS = "pos";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";

    /**
     * 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;
     * 变更类型: 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作
     */
    private static final Map<String, Integer> OPERATION_MAP = ImmutableMap.of(
            "READ", 0,
            "CREATE", 1,
            "UPDATE", 2,
            "DELETE", 3,
            "TRUNCATE", 4);

    /**
     * 反序列化数据,转为变更JSON对象
     *
     * @param sourceRecord sourceRecord
     * @param collector    collector
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        DataChangeInfo dataChangeInfo = new DataChangeInfo();
        // 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toUpperCase();
        int eventType = OPERATION_MAP.get(type);
        // fixme 一般情况是无需关心其之前之后数据的,直接获取最新的数据即可,但这里为了演示,都进行输出
        dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
        dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
        if (eventType == 3) {
            dataChangeInfo.setData(getJsonObject(struct, BEFORE).toJSONString());
        } else {
            dataChangeInfo.setData(getJsonObject(struct, AFTER).toJSONString());
        }
        dataChangeInfo.setOperatorType(eventType);
        dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        dataChangeInfo.setFilePos(
                Optional.ofNullable(source.get(POS))
                        .map(x -> Integer.parseInt(x.toString()))
                        .orElse(0)
        );
        dataChangeInfo.setDatabase(database);
        dataChangeInfo.setTableName(tableName);
        dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS))
                .map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
        // 输出数据
        collector.collect(dataChangeInfo);
    }

    /**
     * 从元素数据获取出变更之前或之后的数据
     *
     * @param value        value
     * @param fieldElement fieldElement
     * @return JSONObject
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }

    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
}

6、自定义实现用户定义的接收器功能

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DataChangeSink implements SinkFunction<DataChangeInfo> {

    @Override
    public void invoke(DataChangeInfo dataChangeInfo, Context context) {
        // 变更类型: 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作
        Integer operatorType = dataChangeInfo.getOperatorType();
        // TODO 数据处理,不能在方法外注入需要的bean,会报错必须实例化才可以,
        // 所以使用SpringUtil 获取需要的 bean,比如获取 extends ElasticsearchRepository<T, ID>的接口如下所示,然后就可以使用封装的方法进行增删改操作了
        // XXXXXSearchRepository repository = SpringUtil.getBean(XXXXXSearchRepository.class);

    }

}

7、实现MySQL变更监听

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Slf4j
@Component
@RequiredArgsConstructor
public class MysqlEventListener implements ApplicationRunner {

    private final DataChangeSink dataChangeSink;
    private final MysqlCdcInfo mysqlCdcInfo;

    @Override
    public void run(ApplicationArguments args) {
        CompletableFuture.runAsync(() -> {
            try {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                // 设置2个并行源任务
                env.setParallelism(2);
                MySqlSource<DataChangeInfo> mySqlSource = buildDataChangeSource(mysqlCdcInfo);
                DataStream<DataChangeInfo> streamSource = env
                        .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
                        //对接收器使用并行1来保持消息的顺序
                        .setParallelism(1);
                streamSource.addSink(dataChangeSink);
                env.executeAsync("mysql-cdc-es");
            } catch (Exception e) {
                log.error("mysql --> es, Exception=", e);
            }
        }).exceptionally(ex -> {
            ex.printStackTrace();
            return null;
        });
    }

    /**
     * 构造变更数据源
     *
     * @return DebeziumSourceFunction<DataChangeInfo>
     */
    private MySqlSource<DataChangeInfo> buildDataChangeSource(MysqlCdcInfo mysqlCdcInfo) {
        return MySqlSource.<DataChangeInfo>builder()
                .hostname(mysqlCdcInfo.getIp())
                .port(mysqlCdcInfo.getPort())
                .databaseList(mysqlCdcInfo.getDbs())
                // 支持正则匹配
                .tableList(mysqlCdcInfo.getTables())
                .username(mysqlCdcInfo.getUser())
                .password(mysqlCdcInfo.getPwd())
                // initial:初始化快照,即全量导入后增量导入(检测更新数据写入)
                .startupOptions(StartupOptions.initial())
                .deserializer(new MysqlDeserialization())
                .serverTimeZone("GMT+8")
                .build();
    }

}

到此就大功告成啦!代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/master/flink-cdc-mysql2es

参考文章:https://blog.51cto.com/caidingnu/6100996 非常感谢!文章来源地址https://www.toymoban.com/news/detail-575365.html

到了这里,关于Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(44)
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka

    Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工具。 Flink SQL 使得使用标准 SQL 开发流式应用变得简单,免去代码开发。 Flink CDC 本文使用 MySQL CDC 连接器 允许从

    2024年02月16日
    浏览(46)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(53)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(66)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(78)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(60)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(48)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包