flink-cdc之读取mysql变化数据

这篇具有很好参考价值的文章主要介绍了flink-cdc之读取mysql变化数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pom

<properties>
    <flink-version>1.13.0</flink-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

代码

注意开启checkpoint 和不开启是有区别的(savepoint也可以 启动的flink指定时候 -s savepath)

不开启,如果项目重启了,会重新读取所有的数据

开启了,项目重启了额,会根据保留的信息去读取变化的数据


import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCTest {

    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(5000);
//        env.getCheckpointConfig().setCheckpointTimeout(10000);
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//        
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("9.134.70.1")
                .port(3306)
                .username("root")
                .password("xxxxxxx")
                .databaseList("cc")
                .tableList("cc.student")
//                .deserializer(new StringDebeziumDeserializationSchema())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print("==FlinkCDC==");

        //4.启动任务
        env.execute("FlinkCDC");

    }

}

 mysql

flink-cdc之读取mysql变化数据,flink,flink,mysql,android

 flink-cdc之读取mysql变化数据,flink,flink,mysql,android

数据库表

flink-cdc之读取mysql变化数据,flink,flink,mysql,android

 增加一条数据

flink-cdc之读取mysql变化数据,flink,flink,mysql,android

打印日志 op:c 是create

==FlinkCDC==> {"before":null,"after":{"id":1,"name":"name1","age":"1"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689245998000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2781,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1689246005928,"transaction":null}

修改一条数据 age=1 ->age=2  op=u 是update

flink-cdc之读取mysql变化数据,flink,flink,mysql,android

flink打印日志

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"1"},"after":{"id":1,"name":"name1","age":"2"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246092000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3049,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1689246099578,"transaction":null}

删除这条数据 op=d 是delete

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"2"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246163000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3333,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1689246170857,"transaction":null}

由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据,还有一个op=r 是读取的数据

还可以使用flinksql


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkSqlCdc {
    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.使用FLINKSQL DDL模式构建CDC 表
        tableEnv.executeSql("CREATE TABLE user_info ( " +
                " id STRING primary key, " +
                " name int, " +
                " age STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = '9.134.70.1', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'xxxxx', " +
                " 'database-name' = 'cc', " +
                " 'table-name' = 'student' " +
                ")");

        //3.查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //4.启动
        env.execute("FlinkSQLCDC");

    }

}

 同样是增删改查,flink打印日志如下

flink-cdc之读取mysql变化数据,flink,flink,mysql,android文章来源地址https://www.toymoban.com/news/detail-559273.html

到了这里,关于flink-cdc之读取mysql变化数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据技术之 Flink-CDC

    CDC 是 Change Data Capture(变更数据获取)的简称。在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以

    2024年02月05日
    浏览(54)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 【现场问题】flink-cdc,Oracle2Mysql的坑,Oracle区分大小写导致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入为空的数值。 为什么会报这个错误,我们来看DML的执行语句: insert into t_wx_target select

    2024年02月12日
    浏览(48)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(46)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(45)
  • 【Flink-CDC】Flink CDC 介绍和原理概述

    CDC是( Change Data Capture 变更数据获取 )的简称。 核心思想是, 监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 主要分为基于查询和基于

    2024年01月20日
    浏览(46)
  • flink-cdc-学习笔记(一)

    Flink 1.11 引入了 CDC. Flink CDC 是一款基于 Flink 打造一系列数据库的连接器。Flink 是流处理的引擎,其主要消费的数据源是类似于一些点击的日志流、曝光流等数据,但在业务场景中,点击流的日志数据只是一部分,具有更大价值的数据隐藏在用户的业务数据库中。Flink CDC 弥补

    2024年04月10日
    浏览(83)
  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(42)
  • Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比_HiBoyljw的博客-CSDN博客        FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾),并维

    2024年02月07日
    浏览(49)
  • 【Flink CDC(一)】实现mysql整表与增量读取

    MySQL CDC 连接器允许从 MySQL 数据库读取快照数据( 比如:flink任务消费时刻的整表数据 )和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。 本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。   1.1. Maven dependency   1.2. SQL

    2024年03月27日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包