FlinkCDC 入门之数据同步和故障恢复

这篇具有很好参考价值的文章主要介绍了FlinkCDC 入门之数据同步和故障恢复。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

FlinkCDC 是一款基于 Change Data Capture(CDC)技术的数据同步工具,可以用于将关系型数据库中的数据实时同步到 Flink 流处理中进行实时计算和分析,下图来自官网的介绍。FlinkCDC 入门之数据同步和故障恢复

下图1是 FlinkCDC 与其它常见 开源 CDC 方案的对比:

FlinkCDC 入门之数据同步和故障恢复

可以看见的是相比于其它开源产品,FlinkCDC 不仅支持增量同步,还支持全量/全量+增量的同步,同时 FlinkCDC 还支持故障恢复(基于检查点机制实现),能够快速恢复数据同步的进度,并且支持的数据源也很丰富2(在 2.3 版本已支持 MongoDB、MySQL、OceanBase、Oracle、PostgressSQL、SQLServer、TiDB、Db2 等数据源)。

本文将介绍 FlinkCDC 在数据同步和故障恢复等方面的内容(以 MySQL 和 Oracle 为例),同时完整代码也已上传到GitHub。

效果展示

MySQL

FlinkCDC 入门之数据同步和故障恢复

Oracle(相比 MySQL 延迟会稍高)

FlinkCDC 入门之数据同步和故障恢复

数据库配置

MySQL(5.7)

修改my.cnf配置文件(Windows 下是 my.ini 文件),增加以下配置内容:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 对于 MySQL 集群, 不同节点的 server_id 必须不同
server_id=1
# 过期时间
expire_logs_days=30

Tips: 修改完成后需要重启 MySQL 服务

建库建表:

# 建库
create database flink;
# 建表
create table flink.`user` (
	`id` bigint(20) not null,
	`username` varchar(20) default null,
	`password` varchar(63) default null,
	`status` int(2) default null,
	`create_time` datetime default null,
	primary key (`id`)
) ENGINE = InnoDB default CHARSET = utf8mb4;

创建用户并授权:

# 创建用户 flink
CREATE USER flink IDENTIFIED BY 'flink';
# 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
# 将 flink 库的所有权限授权给 flink 用户
GRANT ALL PRIVILEGES ON flink.* TO 'flink'@'%';
# 刷新权限
FLUSH PRIVILEGES;

Oracle(11g)

以 DBA 身份连接:

# SID 需要根据实际情况进行设置, 比如: XE.
export ORACLE_SID=SID
sqlplus /nolog
CONNECT sys/manager AS SYSDBA

配置日志:

alter system set db_recovery_file_dest_size = 20G;
# 日志文件的地址可以根据自己的情况进行设置
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

确认是否配置成功:

archive log list;

FlinkCDC 入门之数据同步和故障恢复

创建用户并授权:

CREATE USER flink IDENTIFIED BY flink;
GRANT CREATE SESSION TO flink;
GRANT FLASHBACK ANY TABLE TO flink;
GRANT SELECT ANY TABLE TO flink;
GRANT SELECT_CATALOG_ROLE TO flink;
GRANT EXECUTE_CATALOG_ROLE TO flink;
GRANT SELECT ANY TRANSACTION TO flink;
GRANT CREATE TABLE TO flink;

建表并增加日志记录:

# 建表
CREATE TABLE flink."user" (
	id NUMBER NOT NULL,
    username VARCHAR2(20),
    password VARCHAR2(63),
    status INTEGER,
    create_time TIMESTAMP,
    PRIMARY KEY(id)
);
# 日志配置
ALTER TABLE flink."user" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

代码配置

运行环境

依赖 版本
Java 17
flink-connector 2.1.0
flink 1.13.0
maven 3.6.2

连接配置

flinkcdc:
  data-source:
    # 默认类型为 MySQL
    addr: localhost:3306
    database: flink
    username: flink
    password: flink
    table-list:
      - user

Tips: 关于数据源的连接完整配置属性可参考 DataSourceProperties.java 文件,关于检查点的配置可参考 CheckPointProperties.java 文件

恢复点配置

为了实现故障恢复(应用停止运行过程中数据库有增删改操作的情况)的情况,需要在代码中进行恢复点的相关配置:

// 获取配置的恢复点路径, 首次运行不存在会默认进行创建
var saveDir = checkPointProperties.getSaveDir();
var folder = new File(saveDir);
if (!folder.exists() && !folder.isDirectory()) {
    if (!folder.mkdirs()) {
        throw new IllegalStateException("文件夹创建失败");
    }
}
var dataSourceType = dataSourceProperties.getType().name().toLowerCase();
var dataSourceSaveDir = saveDir + File.separator + dataSourceType;
var savepointDir = SavepointUtils.getSavepointRestore(dataSourceSaveDir);
var configuration = new Configuration();
if (savepointDir != null) {
    // 设置恢复点路径
    var savepointRestoreSettings = SavepointRestoreSettings.forPath(savepointDir);
    SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);
}
// 启用检查点并设置检查点的保存路径
var env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(checkPointProperties.getInterval(), CheckpointingMode.EXACTLY_ONCE);
var checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(checkPointProperties.getStorageType().getPrefix() + dataSourceSaveDir);

通用注意点

为了避免数值类型显示是一堆字符串,需要增加以下配置:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#%E9%80%9A%E7%94%A8-faq Q5
prop.setProperty("bigint.unsigned.handling.mode","long");
prop.setProperty("decimal.handling.mode","double");

ORACLE 配置注意点

为了避免日志增长过快以及读取日志满的问题,需要增加以下配置:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q1
prop.setProperty("log.mining.strategy", "online_catalog");
prop.setProperty("log.mining.continuous.mine", "true");

对于 Oracle 11g,连接配置中需要增加:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q2
prop.setProperty("database.tablename.case.insensitive", "false");

项目运行及使用介绍

下载代码

由于本人将博客相关的示例代码都集中到了一个仓库,因此如果不想拉取整个仓库,推荐使用GitZip for github这个插件,就可以只下载部分的文件(选中指定文件后点击右下角的下载按钮):

FlinkCDC 入门之数据同步和故障恢复

使用介绍

对于需要监控的表,只需要创建相应的实体类,并新建一个类继承AbstractMessageListener(可重写其中的 create、delete、update、read等方法处理相应的事件)即可,其中 FlickCdcMessageListener 注解内的参数填相应的表名即可监听相应的表变更事件(同时需要在 yaml 文件中 tableList 中增加要监听的表,如果是 Oracle 数据库还需要增加日志配置):

import cn.butterfly.flinkcdc.annotation.FlickCdcMessageListener;
import cn.butterfly.flinkcdc.pojo.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 用户表消息监听器
 *
 * @author zjw
 * @date 2023-03-14
 */
@Slf4j
@Component
@FlickCdcMessageListener("user")
public class UserMessageListener extends AbstractMessageListener<User> {

    @Override
    public void create(User user) {
        log.info("新增用户: {}", user);
    }

}

其它注意点

  1. FlinkCDC 默认的同步策略是第一次运行先进行全量同步,后续即可进行增量读取,因此表数据量比较大的时候,重写 AbstractMessageListener#read 方法时需要特别注意处理大量数据的情况。
  2. 由于 Flink CDC 是根据数据库的事务日志来获取数据更改的,如果恢复点之后发生了数据更改,那么在恢复点之后的数据将被重复读取,因此需要考虑重复读取的情况。

总结

本文简单介绍了 FlinkCDC 的数据同步和故障恢复方面的内容,对相关基础知识进行了省略(例如检查点),如果是第一次接触和使用 FlinkCDC,建议先结合官网的示例进行学习,同时建议先通读一篇官方的FAQ。文章来源地址https://www.toymoban.com/news/detail-466157.html

参考文献

  1. 基于 Flink CDC 实现海量数据的实时同步和转换
  2. https://github.com/ververica/flink-cdc-connectors#supported-tested-databases

到了这里,关于FlinkCDC 入门之数据同步和故障恢复的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • FlinkCDC系列:数据同步对部分字段的处理,只更新部分字段

    在flinkCDC源数据配置中,只对表中的部分字段关注,通过监控部分字段进行数据更新或者不更新,对数据进行同步。主要通过以下两个参数: column.exclude.list 默认: 空字符串 一个可选的、以逗号分隔的正则表达式列表,与列的完全限定名称匹配以从更改事件记录值中排除。列

    2024年02月06日
    浏览(39)
  • 063、故障处理之快速恢复数据

    目的:尽快修复数据,恢复业务 常用备份恢复技术 MVCC 是TiDB数据库原生的一项功能,默认使用无需配置,它使用多个历史快照的方式来维护数据在某个时间点对并发访问的一致性。 dml方式 tidb_snapshot 参数 ddl方式 flashback table ; recover table dml + ddl 方式 dumpling 工具 步骤一: 查

    2024年02月15日
    浏览(35)
  • 【vsan数据恢复】vsan逻辑架构出现故障的数据恢复案例

    VSAN数据恢复环境: 一套有三台服务器节点的VSAN超融合基础架构,每台服务器节点上配置2块SSD硬盘和4块机械硬盘。 每个服务器节点上配置有两个磁盘组,每个磁盘组使用1个SSD硬盘作为缓存盘,2个机械硬盘作为容量盘。三台服务器节点上共配置6个磁盘组,共同组成VSAN存储空

    2024年01月19日
    浏览(31)
  • 服务器数据恢复-EVA存储磁盘故障导致存储崩溃的数据恢复案例

    EVA系列存储是一款以虚拟化存储为实现目的的中高端存储设备。EVA存储中的数据在EVA存储设备工作过程中会不断进行迁移,如果运行的任务比较复杂,EVA存储磁盘负载加重,很容易出现故障的。EVA存储通过大量磁盘的冗余空间和故障后rss冗余磁盘动态迁移来保护存储中的数据

    2024年02月11日
    浏览(48)
  • HBase的数据高可用与自动故障恢复

    HBase是一个分布式、可扩展、高性能的列式存储系统,基于Google的Bigtable设计。它是Hadoop生态系统的一部分,可以与HDFS、MapReduce、ZooKeeper等组件集成。HBase具有高可用性、高性能和自动故障恢复等特点,适用于大规模数据存储和实时数据处理。 在现代互联网应用中,数据高可

    2024年02月21日
    浏览(31)
  • 服务器数据恢复-V7000存储2块磁盘故障脱机的数据恢复案例

    服务器数据恢复环境: P740+AIX+Sybase+V7000存储,存储阵列柜上共12块SAS机械硬盘(其中一块为热备盘)。 服务器故障: 存储阵列柜中有磁盘出现故障,工作人员发现后更换磁盘,新更换的磁盘数据同步到40%左右时,阵列柜中的另一块磁盘也出现问题,数据同步中断,逻辑盘无

    2024年02月07日
    浏览(43)
  • MySQL-备份+日志:介质故障与数据库恢复

    本关任务: 备份数据库,然后再恢复它。 为了完成本关任务,你需要掌握: 1.MySQL的恢复机制; 2.MySQL提供的备份与恢复工具。 和大多数DBMS一样,MySQL利用备份、日志文件实现恢复。 具体理论知识在此不详细介绍。 MySQL提供了以下工具: 逻辑备份工具:mysqldump 物理备份工具

    2024年02月05日
    浏览(81)
  • 分布式数据库事务故障恢复的原理与实践

    关系数据库中的事务故障恢复并不是一个新问题,自70年代关系数据库诞生之后就一直伴随着数据库技术的发展,并且在分布式数据库的场景下又遇到了一些新的问题。本文将会就事务故障恢复这个问题,分别讲述单机数据库、分布式数据库中遇到的问题和几种典型的解决方

    2024年02月03日
    浏览(40)
  • 服务器数据恢复—nas硬盘故障导致raid6失效、存储无法访问的数据恢复案例

    服务器故障分析: 一台nas存储中有一组由十几块硬盘组建的raid6磁盘阵列。 nas存储中的raid6阵列成员盘出现故障离线,磁盘阵列崩溃,nas存储无法正常访问。 北亚企安数据恢复工程师将nas存储内的所有硬盘编号后取出,经过硬件工程师检测后,发现所有硬盘(包括离线的硬盘

    2024年02月08日
    浏览(32)
  • 【服务器数据恢复】Raid磁盘阵列常见故障类型&原因分析

    由于raid的特点和优势,磁盘阵列技术被广泛应用于服务器和存储等商用领域。由于用户基数大,出现故障的情况也不少。通过这篇文章介绍一下常见的raid磁盘阵列数故障类型和原因。   故障类型一、磁盘阵列处于降级状态时未及时rebuild。 RAID磁盘阵列的数据安全冗余是利用

    2023年04月25日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包