Flink CDC 3.0 表结构变更的处理流程

这篇具有很好参考价值的文章主要介绍了Flink CDC 3.0 表结构变更的处理流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink CDC 3.0 表结构变更的处理流程

​ 表结构变更主要涉及到三个类SchemaOperatorDataSinkWriterOperatorSink端)和SchemaRegistry(协调器);SchemaOperator接收结构变更消息时会通知sink端和协调器,并等待结构变更操作在协调器执行完毕后在处理后续数据,具体流程参考如下。

前提条件

cdc版本:Flink-cdc 3.0

Flink版本:Flink 1.18

SchemaOperator类

Source抓表结构变更事件推送到SchemaOperator时,SchemaOperator会向协调器(也就是SchemaRegistry)发起变更请求;如果是表结构变更,则向Sink发送flushEvent,让其(Sinkflush内存中数据(Sink是经过DataSinkWriterOperator包装),最后阻塞数据流;

SchemaOperator处理表结构变更事件

# SchemaOperator
@Override
public void processElement(StreamRecord<Event> streamRecord) {
    Event event = streamRecord.getValue();
    //如果是schame change事件
    if (event instanceof SchemaChangeEvent) {
        TableId tableId = ((SchemaChangeEvent) event).tableId();
        LOG.info(
                "Table {} received SchemaChangeEvent and start to be blocked.",
                tableId.toString());
        //处理schame change事件
        handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
        return;
    }
    output.collect(streamRecord);
}

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
    // The request will need to send a FlushEvent or block until flushing finished
    //向协调节点(SchemaRegistry)发送表结构变更请求,是表结构变更会返回true 如果是建表则返回false
    SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
    if (response.isShouldSendFlushEvent()) {
        LOG.info(
                "Sending the FlushEvent for table {} in subtask {}.",
                tableId,
                getRuntimeContext().getIndexOfThisSubtask());
        //向sink发送 flush事件和schame信息
        output.collect(new StreamRecord<>(new FlushEvent(tableId)));
        output.collect(new StreamRecord<>(schemaChangeEvent));
        // The request will block until flushing finished in each sink writer
        // 这个请求查询协调器,当前schame是否执行完毕,如果没有则阻塞等待,直到协调器完成schame change操作
        requestReleaseUpstream();
    }
}

Sink端

Sinkflush掉变更前的数据,并上报给协调器(SchemaRegistry)缓存刷新完成

Sink端处理表结构变更事件,并上报给协调器

# DataSinkWriterOperator 创建sink时会使用DataSinkWriterOperator包装,用于处理FlushEventCreateTableEvent事件
@Override
public void processElement(StreamRecord<Event> element) throws Exception {
    Event event = element.getValue();

    // 处理FlushEvent事件
    if (event instanceof FlushEvent) {
        handleFlushEvent(((FlushEvent) event));
        return;
    }

    // CreateTableEvent marks the table as processed directly
    if (event instanceof CreateTableEvent) {
        processedTableIds.add(((CreateTableEvent) event).tableId());
        this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()
                .processElement(element);
        return;
    }

    // Check if the table is processed before emitting all other events, because we have to make
    // sure that sink have a view of the full schema before processing any change events,
    // including schema changes.
    ChangeEvent changeEvent = (ChangeEvent) event;
    if (!processedTableIds.contains(changeEvent.tableId())) {
        emitLatestSchema(changeEvent.tableId());
        processedTableIds.add(changeEvent.tableId());
    }
    processedTableIds.add(changeEvent.tableId());
    this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()
            .processElement(element);
}

# handleFlushEvent 向协调节点(SchemaRegistry)发送`FlushSuccess`请求
private void handleFlushEvent(FlushEvent event) throws Exception {
    copySinkWriter.flush(false);
    schemaEvolutionClient.notifyFlushSuccess(
            getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
}

协调器

​ 协调节点收到所有Sinkflush完成通知后,然后执行结构变更操作,最后通知完成给等待的requestReleaseUpstream请求。

协调节点处理FlushSuccess请求

public void flushSuccess(TableId tableId, int sinkSubtask) {
    flushedSinkWriters.add(sinkSubtask);
    //所有节点都处理完成
    if (flushedSinkWriters.equals(activeSinkWriters)) {
        LOG.info(
                "All sink subtask have flushed for table {}. Start to apply schema change.",
                tableId.toString());
        PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
        //执行表结构变更操作
        applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
        //通知等待的SchemaOperator,结构变更完成!
        waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));
        if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
            startNextSchemaChangeRequest();
        }
    }
}

更多请参考github:参考地址文章来源地址https://www.toymoban.com/news/detail-842433.html

到了这里,关于Flink CDC 3.0 表结构变更的处理流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(90)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(78)
  • 基于 Flink SQL CDC 数据处理的终极武器

    来源互联网多篇文章总结 业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存

    2024年02月16日
    浏览(36)
  • Flink cdc同步mysql到starrocks(日期时间格式/时区处理)

    flink 1.15.3(此时最新版本为1.16.1) mysql 5.7+ starrocks 2.5.2 mysql同步表结构 mysql中的timestamp字段是可以正常同步的,但是多了8小时,设置了mysql链接属性也没效果 参考下方的链接有两种方式; 参考资料 https://blog.csdn.net/cloudbigdata/article/details/122935333 https://blog.csdn.net/WuBoooo/article/deta

    2024年02月16日
    浏览(46)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(56)
  • Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

    flink-cdc官网:Oracle CDC Connector — CDC Connectors for Apache Flink® documentation Flink环境依赖: (3)启用日志归档 (4)检查是否启用了日志归档 (5)创建具有权限的 Oracle 用户 (5.1)。创建表空间 (5.2)。创建用户并授予权限 Flink SQL 客户端连接器测试: 创建 Oracle 链接器 返回内容 以上代

    2024年02月11日
    浏览(44)
  • 【Flink-CDC】Flink CDC 介绍和原理概述

    CDC是( Change Data Capture 变更数据获取 )的简称。 核心思想是, 监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 主要分为基于查询和基于

    2024年01月20日
    浏览(46)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(50)
  • Flink学习13-Flink CDC

    一、CDC简介 cdc全称 Change Data Capture 变更数据捕获。通俗来讲只要能捕获到变更的数据的技术都可以称为cdc。常见的开源技术有以下几种: canal:https://github.com/alibaba/canal maxwell:https://github.com/zendesk/maxwell Debezium:https://github.com/debezium/debezium flink-cdc:https://github.com/ververica/fli

    2024年01月16日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包