flink 1.13.x集成 CDC 2.3.0

这篇具有很好参考价值的文章主要介绍了flink 1.13.x集成 CDC 2.3.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用 flink 1.13.0 和 CDC 2.3.0 的 demo

public class TMySqlCDC {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);
        Properties dbProps = new Properties();
        dbProps.put("database.serverTimezone", "UTC");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.18.126")
                .port(3306)
                .databaseList("xn_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
                .tableList("xn_test.hl_t")// set captured table
                .includeSchemaChanges(true)
                .username("root")
                .password("123456")
                .debeziumProperties(dbProps)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_cdc")
                .print();

        env.execute();
    }

}

踩坑一

Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

原因

https://github.com/ververica/flink-cdc-connectors/pull/1407

简单讲,Flink 运行机器时区和Mysql Server 时区不匹配,database.serverTimezone 配置配置影响
具体代码可以查看CDC com.ververica.cdc.connectors.mysql.MySqlValidator#checkTimeZone

解决办法

手动指定下Flink 运行的时区,和连接的数据库时区信息保持一致

dbProps.put("database.serverTimezone", "UTC");

踩坑二

Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder

原因

flink 为了解决包冲突,对一些通用的工具包做了shaded,传送门flink-shaded

flink-cdc-connectors 2.3.0 版本引用了 flink 1.16.0,这个版本的flink使用了 flink-shaded-guava:30.1.1-jre-15.0版本。
而 flink 1.13.0 使用的是 flink-shaded-guava:18.0-13.0 版本,两个版本的 shaded package 不一样引起的

解决

那既然是 shaded 引用,在 cdc 中再次 shaded 一下,让 cdc 里面引用到的 guava30 变为 guava18
clone cdc,基于tag release-2.3.0 创建分支,修改 flink-cdc-connectors 的 pom.xml 文件,引入cdc 后排除 guava 依赖。

# 编译基于已经 release 的tag
git branch supos/release-2.3.0 release-2.3.0
git checkout supos/release-2.3.0

修改shaded配置

<!-- 在 maven-shade-plugin 插件中添加configuration -->
<relocation>
    <pattern>org.apache.flink.shaded.guava30</pattern>
    <shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
</relocation>

使用 mvn version 修改版本

# 要发布到公司内部仓库,修改为 snapshot 版本
 mvn versions:set -DnewVersion=supos-2.3.0-SNAPSHOT -DgenerateBackupPoms=false

执行编译

# -Drat.skip=true 发布审计插件,文件需要有 license 头,可选  [-T 8 # 多线程编译]
mvn clean install -Drat.skip=true -DskipTests -T 8
# 将源码打包到jar 包,发布到内部私有仓库上
mvn clean source:jar install deploy -Drat.skip=true -DskipTests -T 8

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  01:48 min
[INFO] Finished at: 2023-03-03T14:14:05+08:00
[INFO] ------------------------------------------------------------------------

引入cdc

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink.cdc.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>flink-shaded-guava</artifactId>
            <groupId>org.apache.flink</groupId>
        </exclusion>
    </exclusions>
</dependency>

run ~

参考
maven-shaded-plugin
maven-version-plugin
maven-source-plugin
maven snapshot

maven 插件生命周期

flink-cdc 2.4.0 发布了,要适配 flink1.13,研究了一下 maven 生命周期

cdc 2.4.0 开始,有多个模块使用了 guava30,如果要挨个模块中配置就不太方便了,通过maven 父子模块管理,在父模块中统一处理

<!-- flink-cdc-connectors pom -->
<configuration>
      <relocations>
          <relocation>
              <pattern>org.apache.flink.shaded.guava30</pattern>
              <shadedPattern>org.apache.flink.shaded.guava18</shadedPattern>
          </relocation>
      </relocations>
</configuration>

注意,在子模块中,要引用一下父工程

    <parent>
        <artifactId>flink-cdc-connectors</artifactId>
        <groupId>com.ververica</groupId>
        <version>{version}</version>
    </parent>

构建

mvn clean install -Drat.skip=true -DskipTests -T 8

通过观察,插件是有生命周期的,如果在父模块中配置过 shade,把 guava30 -> guava18,子模块中配置 guava30 - guava11 是不生效的,需要配置成 guava18 - > guava11 才能生效。

父模块的配置不会影响到子模块,有些统一处理的方案可以在父模块中直接配置为全局

给自己定个flag,没事了写点东西,记录下工作。文章来源地址https://www.toymoban.com/news/detail-596272.html

到了这里,关于flink 1.13.x集成 CDC 2.3.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(71)
  • 使用Flink CDC从数据库采集数据,保证数据不丢失:实现断点续传机制

    大数据技术在当前的数据分析和处理中扮演着重要的角色。Apache Flink作为一种快速、可靠的流处理引擎,在大规模数据处理中广受欢迎。本文将介绍如何使用Flink CDC(Change Data Capture)从数据库采集数据,并通过设置checkpoint来支持数据采集中断恢复,从而保证数据不丢失。

    2024年02月04日
    浏览(44)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(43)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(42)
  • 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

    目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil 7、自定义sink 交由spring管理,处理变更数据         我的场景是从SQL Server数据库获取指定表的增量数据,查

    2024年02月10日
    浏览(87)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(48)
  • 实时Flink的数据库与Kafka集成优化案例

    在现代数据处理系统中,实时数据处理和分析是至关重要的。Apache Flink是一个流处理框架,可以用于实时数据处理和分析。在许多场景下,Flink需要与数据库和Kafka等消息系统进行集成,以实现更高效的数据处理。本文将讨论Flink与数据库和Kafka集成的优化案例,并提供实际示

    2024年02月20日
    浏览(41)
  • 实测解决 flink cdc mysql 时间字段差8小时/差13小时问题

    关键代码: 其中的:com.ysservice.utils.MySqlDateTimeConverter,根据自己的MySqlDateTimeConverter类路径进行修改 全量阶段和增量阶段的时间问题还不一样,实测本方式能全部解决,解决的同学记得回来点个赞!

    2024年02月16日
    浏览(39)
  • 《Spring Boot 实战派》--13.集成NoSQL数据库,实现Elasticsearch和Solr搜索引擎

             关于搜索引擎 我们很难实现 Elasticseach 和 Solr两大搜索框架的效果;所以本章针对两大搜索框架,非常详细地讲解 它们的原理和具体使用方法, 首先 介绍什么是搜索引擎 、如何用 MySQL实现简单的搜索引擎,以及Elasticseach 的 概念和接口类; 然后介绍Elasticseach

    2023年04月09日
    浏览(88)
  • Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成、实时数据入库入仓、最详细的教程。Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据库中获取变更流并接入到Flink中,Apache Flink作为一款非常优秀的流处理引擎,其SQL API又

    2024年02月13日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包