FlinkCDC实现主数据与各业务系统数据的一致性(瀚高、TIDB)

这篇具有很好参考价值的文章主要介绍了FlinkCDC实现主数据与各业务系统数据的一致性(瀚高、TIDB)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

         文章末尾附有flinkcdc对应瀚高数据库flink-cdc-connector代码下载地址

1、业务需求

        目前项目有主数据系统和N个业务系统,为保障“一数一源”,各业务系统表涉及到主数据系统的字段都需用主数据系统表中的字段进行实时覆盖,这里以某个业务系统的一张表举例说明:业务系统表TableB字段col_b3与主数据系统表TableA中col_a3不一致,需要用col_a3实时覆盖col_b3生成目标表TableB_new中间表,业务系统存储为国产瀚高数据库,中间库用TIDB。

flink cdc主子表,企业级大数据应用实战(持续更新...),tidb,flink,数据仓库

2、需求分析

         业务系统已上线多年,存在历史数据和新数据,需要分两个阶段进行处理。

        第一阶段,历史数据通过TableA、TableB联合关联生成中间表TableC,其中TableC中的主数据字段已用主数据进行了更新,再将TableC实时同步到瀚高数据库中生成一个新的业务表TableB_new(TableC和TableB_new表结构一致);

        第二阶段,历史数据处理结束后,业务系统直接割接到新表TableB_new,后期新的业务数据用TableB_new与主数据表TableA关联,实时生成中间表TableC,再用FlinkCDC,实时同步TableC数据覆盖TableB_new主数据字段。

3、具体实现

        第一阶段流程图,历史数据处理,由TableA和TableB实时关联生成中间表TableC,再实时同步TableC到新的业务表TableB_new,完成历史数据主数据字段的覆盖:

flink cdc主子表,企业级大数据应用实战(持续更新...),tidb,flink,数据仓库

        第二阶段流程图,业务割接到新表TableB_new实时同步,直接由TableA和TableB_new关联生成TableC,再用cdc任务实时同步到新业务 表TableB_new中,即可完成主数据的覆盖:

      flink cdc主子表,企业级大数据应用实战(持续更新...),tidb,flink,数据仓库

4、FlinkSQL脚本

4.1、第一阶段脚本

4.1.1、TableA实时关联TableB生成中间表TableC
//指定任务名称
set pipeline.name=task_TablA_Table_B_TableC;
//主数据源表TableA
DROP TABLE IF EXISTS TableA;
CREATE TABLE TableA(
   col_a1  varchar(255),
   col_a2  varchar(255),
   col_a3  varchar(255)
) WITH (
   'connector' = 'highgo-cdc',
   'hostname' = '10.*.*.*',
   'port' = '5866',
   'username' = 'cdcuser',
   'password' = '123456a?',
   'database-name' = 'databaseA',
   'schema-name' = 'public',
   'table-name' = 'TableA',
   'slot.name' = 'TableA',
   'decoding.plugin.name' = 'pgoutput',
   'scan.incremental.snapshot.enabled' = 'false'
);
//业务数据源表TableB
DROP TABLE IF EXISTS tableB;
CREATE TABLE tableB(
   col_b1  varchar(255),
   col_b2  varchar(255),
   col_b3  varchar(255)
) WITH (
   'connector' = 'highgo-cdc',
   'hostname' = '10.*.*.*',
   'port' = '5866',
   'username' = 'cdcuser',
   'password' = '123456a?',
   'database-name' = 'databaseB',
   'schema-name' = 'public',
   'table-name' = 'TableB',
   'slot.name' = 'TableB',
   'decoding.plugin.name' = 'pgoutput',
   'scan.incremental.snapshot.enabled' = 'false'
);
//中间表TableC
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
   col_c1  varchar(255),
   col_c2  varchar(255),
   col_c3  varchar(255)
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://10.8.8.8:4000/databaseC',
   'username' = 'root',
   'password' = '*****',
   'table-name' = 'TableC',
   'driver' = 'com.mysql.jdbc.Driver'
);
insert  into tableC  
select 
b.col_b1 as col_c1,
b.col_b2 as col_c2,
CASE
  WHEN a.col_a3 IS NOT NULL THEN  a.col_a3
  ELSE b.col_b3
END  as col_c3
from TableB  t1   left join TableA t2  on b.fk=a.id;
4.1.2、TableC实时同步到TableB_new
//指定任务名称
set pipeline.name=task_TableC_TableB_new;
//中间表TableC  TIDB
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
   col_c1  varchar(255),
   col_c2  varchar(255),
   col_c3  varchar(255)
) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '10.*.*.*:4000',
    'database-name' = 'databaseC',
    'table-name' = 'TableC'
);
//业务结果表 写入瀚高数据库
DROP TABLE IF EXISTS TableB_new;
CREATE TABLE TableB_new(
   col_b1  varchar(255),
   col_b2  varchar(255),
   col_b3  varchar(255)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:highgo://10.*.*.*:5866/databaseB?currentSchema=public',
    'username' = 'sysdba',
    'password' = '****',
    'table-name' = 'TableB_new',
    'driver' = 'com.highgo.jdbc.Driver'
);
insert  into TableB_new 
select
 col_c1 as col_b1,
 col_c2 as col_b2,
 col_c3 as col_b3,
from  TableC;

4.2、第二阶段脚本

4.2.1、TableA实时关联TableB_new生成中间表TableC
//指定任务名称
set pipeline.name=task_TablA_Table_B_TableC;
//主数据源表TableA
DROP TABLE IF EXISTS TableA;
CREATE TABLE TableA(
   col_a1  varchar(255),
   col_a2  varchar(255),
   col_a3  varchar(255)
) WITH (
   'connector' = 'highgo-cdc',
   'hostname' = '10.*.*.*',
   'port' = '5866',
   'username' = 'cdcuser',
   'password' = '123456a?',
   'database-name' = 'databaseA',
   'schema-name' = 'public',
   'table-name' = 'TableA',
   'slot.name' = 'TableA',
   'decoding.plugin.name' = 'pgoutput',
   'scan.incremental.snapshot.enabled' = 'false'
);
//业务数据源表 也是目标表TableB_new
DROP TABLE IF EXISTS TableB_new;
CREATE TABLE tableB(
   col_b1  varchar(255),
   col_b2  varchar(255),
   col_b3  varchar(255)
) WITH (
   'connector' = 'highgo-cdc',
   'hostname' = '10.*.*.*',
   'port' = '5866',
   'username' = 'cdcuser',
   'password' = '123456a?',
   'database-name' = 'databaseB',
   'schema-name' = 'public',
   'table-name' = 'TableB_new',
   'slot.name' = 'TableB_new',
   'decoding.plugin.name' = 'pgoutput',
   'scan.incremental.snapshot.enabled' = 'false'
);
//中间表TableC
DROP TABLE IF EXISTS TableC;
CREATE TABLE TableC(
   col_c1  varchar(255),
   col_c2  varchar(255),
   col_c3  varchar(255)
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://10.8.8.8:4000/databaseC',
   'username' = 'root',
   'password' = '*****',
   'table-name' = 'TableC',
   'driver' = 'com.mysql.jdbc.Driver'
);
insert  into tableC  
select 
b.col_b1 as col_c1,
b.col_b2 as col_c2,
CASE
  WHEN a.col_a3 IS NOT NULL THEN  a.col_a3
  ELSE b.col_b3
END  as col_c3
from TableB_new  t1   left join TableA t2  on b.fk=a.id;
4.2.2、TableC实时同步到TableB_new

        与4.1.2脚本一致,略

      备:flink-cdc-connector代码:支持瀚高数据库Highgo下载地址:

 https://github.com/lujisen/flink-cdc-connectors.githttp://xn--flink-cdc-connector-jz52b18z5q4dqpxn文章来源地址https://www.toymoban.com/news/detail-779141.html

到了这里,关于FlinkCDC实现主数据与各业务系统数据的一致性(瀚高、TIDB)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 并发情况如何实现加锁来保证数据一致性?

    ReentrantLock(可重入锁),指的是一个线程再次对已持有的锁保护的临界资源时,重入请求将会成功。 简单的与我们常用的Synchronized进行比较: ReentrantLock Synchronized 锁实现机制 依赖AQS 监视器模式 灵活性 支持响应超时、中断、尝试获取锁 不灵活 释放形式 必须显示调用unloc

    2024年02月05日
    浏览(49)
  • 深入理解高并发下的MySQL与Redis缓存一致性问题(增删改查数据缓存的一致性、Canal、分布式系统CAP定理、BASE理论、强、弱一致性、顺序、线性、因果、最终一致性)

    一些小型项目,或极少有并发的项目,这些策略在无并发情况下,不会有什么问题。 读数据策略:有缓存则读缓存,然后接口返回。没有缓存,查询出数据,载入缓存,然后接口返回。 写数据策略:数据发生了变动,先删除缓存,再更新数据,等下次读取的时候载入缓存,

    2024年03月20日
    浏览(40)
  • 基于 Flink & Paimon 实现 Streaming Warehouse 数据一致性管理

    摘要:本文整理自字节跳动基础架构工程师李明,在 Apache Paimon Meetup 的分享。本篇内容主要分为四个部分: 背景 方案设计 当前进展 未来规划 点击查看原文视频 演讲PPT ​ 早期的数仓生产体系主要以离线数仓为主,业务按照自己的业务需求将数仓分为不同的层次,例如 DW

    2024年02月14日
    浏览(29)
  • Redis如何实现主从复制?有没有办法保证数据一致性?

    Redis通过主从复制(Master-Slave Replication)实现数据复制和高可用性。主节点负责接收和处理写操作,并将数据同步到从节点上。 主从复制的实现步骤如下: 配置主从关系:在从节点上的配置文件中配置主节点的IP地址和端口号。 从节点连接主节点:从节点启动时会自动连接主

    2024年02月13日
    浏览(31)
  • MySQL 和 Redis 如何保证数据一致性,通过MySQL的binlog实现

    1、简介         MySQL 和 Redis 如何保证数据一致性,目前大多讨论的是先更新Redis后更新MySQL,还是先更新MySQL 后更新Redis,这两种方式在实际的应用场景中都不能确保数据的完全一致性,在某些情况下会出现问题,本文介绍使用 Canal 工具,通过将自己伪装成MySQL的从节点,读

    2024年02月02日
    浏览(46)
  • Spring Boot整合canal实现数据一致性解决方案解析-部署+实战

    🏷️ 个人主页 :牵着猫散步的鼠鼠  🏷️ 系列专栏 :Java全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   1.前言 2.canal部署安装 3.Spring Boot整合canal 3.1数据库与缓存一致性问题概述 3.2 整合canel 4.总结 canal [kə\\\'næl]  ,译意为水道/管道/沟渠,主要用途是 基于

    2024年03月19日
    浏览(43)
  • [Etcd]分布式系统中如何使用乐观锁保证Mysql和Etcd数据最终一致性

    在写业务代码时,很多时候需要保证数据存储在不同中间件中的一致性。以笔者为例,就遇到了需要将mysql中已存储的数据转存到etcd中,同时还要考虑到并发场景下如何保证数据最终一致性的问题。 该问题形象地表示的话,可以将时间线展开如下 服务A1更新db数据为 {\\\"key1\\\":

    2024年02月02日
    浏览(40)
  • 分布式系统架构设计之分布式数据存储的扩展方式、主从复制以及分布式一致性

    在分布式系统中,数据存储的扩展是为了适应业务的增长和提高系统的性能。分为水平扩展和垂直扩展两种方式,这两种方式在架构设计和应用场景上有着不同的优势和局限性。 水平扩展是通过增加节点或服务器的数量来扩大整个系统的容量和性能。在数据存储领域,水平扩

    2024年02月03日
    浏览(51)
  • Sharding-JDBC 自定义一致性哈希算法 + 虚拟节点 实现数据库分片策略

    分片操作是分片键 + 分片算法,也就是分片策略。目前Sharding-JDBC 支持多种分片策略: 标准分片策略 对应StandardShardingStrategy。提供对SQL语句中的=, IN和BETWEEN AND的分片操作支持。 复合分片策略 对应ComplexShardingStrategy。复合分片策略。提供对SQL语句中的=, IN和BETWEEN AND的分片操作

    2024年02月02日
    浏览(43)
  • 博客摘录「 Redis( 缓存篇 ==> 超详细的缓存介绍与数据一致性解决方案 &; 代码实现」

    Redis 旁路缓存 由于高并发原因,先更新数据库和先更新缓存策略都会因为延迟时间而导致数据不一致问题。 两种策略 先删除缓存,再更新数据库; 先更新数据库,再删除缓存。 因为缓存的写入通常要远远快于数据库的写入 ,所以先更新数据库再删缓存,删完缓存,下次访

    2024年02月15日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包