【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)

这篇具有很好参考价值的文章主要介绍了【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介:

    FlinkCDC读取Mysql数据源,程序中使用了自定义反序列化器,完整的Flink结构,开箱即用。

本工程提供

1、项目源码及详细注释,简单修改即可用在实际生产代码

2、成功编译截图

3、自己编译过程中可能出现的问题

4、mysql建表语句及测试数据

5、修复FlinkCDC读取Mysql数据时间戳差8小时问题

6、自定义反序列化器让FlinkCDC读取和后续的处理更方便

一、成功编译成功信息如下

【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用),Flink解决方案,flink,大数据

二、 自己编译过程中可能出现的问题

FLINK:ClassNotFoundException: org.apache.flink.streaming.api.functions.source.SourceFunction_一杯咖啡半杯糖的博客-CSDN博客

 三、mysql建表语句及测试数据,Mysql版本5.7

CREATE TABLE `cdc_test` (
  `id` int(11) NOT NULL,
  `name` varchar(12) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

INSERT INTO cdc.cdc_test (id,name,create_time) VALUES
     (1,'li',NULL),
     (2,'zhang','2023-09-13 09:27:18'),
     (3,'liu','2023-09-13 09:30:18'),
     (4,'wu1','2023-09-13 10:30:18'),
     (5,'aa','2023-09-13 09:30:18');

四、项目源码及详细注释,简单修改即可用在实际生产代码,修复FlinkCDC读取Mysql数据时间戳差8小时问题,自定义反序列化器让FlinkCDC读取和后续的处理更方便

最后项目源码如下

链接:https://pan.baidu.com/s/1WtTZwftyIPPgLlEQv50qUA?pwd=6tzz 
提取码:6tzz

如果链接失效请私信我

 文章来源地址https://www.toymoban.com/news/detail-733144.html

到了这里,关于【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(44)
  • Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析

    学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions 学习笔记如下: 用户可以通过实现接口来完成自定义 Functions。 实现接口并使用的样例: 使用匿名类实现的样例: 使用 Lambda 表达式实现(Java 8)样例: 所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获

    2024年01月18日
    浏览(48)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Flink 读写MySQL数据(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。 另外,JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将

    2023年04月09日
    浏览(46)
  • 【Flink】FlinkCDC获取mysql数据时间类型差8小时时区解决方案

    1、背景: 在我们使用FlinkCDC采集mysql数据的时候,日期类型是我们很常见的类型,但是FlinkCDC读取出来会和数据库的日期时间不一致,情况如下 FlinkCDC获取的数据中create_time字段1694597238000转换为时间戳2023-09-13 17:27:18  而数据库中原始数据如下,并没有到下午5点,这就导致了

    2024年02月07日
    浏览(53)
  • FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    本文介绍了  来源单表-目标源单表同步,多来源单表-目标源单表同步。 注:1.16版本、1.17版本都可以使用火焰图,生产上最好关闭,详情见文章末尾 Flink版本:1.16.2 环境:Linux CentOS 7.0、jdk1.8 基础文件: flink-1.16.2-bin-scala_2.12.tgz、 flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:

    2024年02月11日
    浏览(45)
  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(78)
  • Hudi(16):Hudi集成Flink之读取方式

    目录 0. 相关文章链接 1. 流读(Streaming Query) 2. 增量读取(Incremental Query) 3. 限流  Hudi文章汇总          当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支

    2024年02月06日
    浏览(67)
  • 【Flink】 Flink实时读取mysql数据

    准备 你需要将这两个依赖添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    浏览(44)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包