flink cdc数据同步,DataStream方式和SQL方式的简单使用

这篇具有很好参考价值的文章主要介绍了flink cdc数据同步,DataStream方式和SQL方式的简单使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、flink cdc介绍

1、什么是flink cdc

2、flink cdc能用来做什么

3、flink cdc的优点

二、flink cdc基础使用

1、使用flink cdc读取txt文本数据

2、DataStream的使用方式

3、SQL的方式

总结


一、flink cdc介绍

1、什么是flink cdc

flink cdc是一个由阿里研发的,一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件。

2、flink cdc能用来做什么

flink cdc能感知数据库的所有修改、新增、删除操作,并以流的形式,进行实时的触发和反馈。如:你想监听一个表的数据是否有变动,并且需要把变动的数据读取出来,插入到另外的表里,或者对该数据进行其他处理。在我们传统的开发里,如果不使用cdc技术,是不是就只能通过定时任务去定时的获取数据?或者在执行数据修改操作时调用指定的接口来进行数据上报?并且还要拿新数据和旧数据进行比较,才能得到自己想要的结果?flink cdc就是解决这种问题的,它是cdc里面的佼佼者,它能在数据表被修改时,进行实时的反馈。

3、flink cdc的优点

① 低延迟:毫秒级的延迟

② 高吞吐:每秒能处理数百万个事件

③ 高可用及结果的准确性、良好的容错性,动态扩展、全天候24小时运行

二、flink cdc基础使用

1、使用flink cdc读取txt文本数据

① 项目目录

flink cdc sql,数据库,java,flink

② 需要用到的flink依赖(有些可以不用的,看实际需要使用哪些功能):

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-parser</artifactId>
            <version>1.13.0</version>
        </dependency>

③ 具体代码(TestFlinkController)

package com.bug.controller;

import com.bug.util.flink.TextFlatUtil;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.springframework.web.bind.annotation.*;

/**
 * 1、flink读取本地txt文件数据
 */
public class TestFlinkController {

    /**
     * 1、flink读取本地txt文件数据
     * @param args args
     */
    public static void main(String[] args) throws Exception {
        String path = "D:\\javaprojects\\my_springboot1\\my_springboot1\\src\\main\\resources\\flinkText\\flinkTest.txt";
        //创建执行环境
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        //读取txt文件数据
        DataSet<String> dataSet = environment.readTextFile(path);
        //处理读取的数据
        DataSet<Tuple3<String, String, String>> out = dataSet.flatMap(new TextFlatUtil());
        //输出
        out.print();
    }

}

 TextFlatUtil代码:

package com.bug.util.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

/**
 * 1、flink读取本地txt文件数据
 */
public class TextFlatUtil implements FlatMapFunction<String, Tuple3<String, String, String>> {

    @Override
    public void flatMap(String value, Collector<Tuple3<String, String, String>> collector) {
        for(String word : value.split("\n")){
            String[] res = word.split("\t");
            collector.collect(new Tuple3<>(res[0],res[1],res[2]));
        }
    }
}

flinkTest.txt文件值: 

801165935581855745	小明1	年龄1
801165936156475393	小明3	年龄3
801165936567517185	小明5	年龄5
801165936991141889	小明7	年龄7
801165937460903937	小明9	年龄9

④ 输出效果

flink cdc sql,数据库,java,flink

2、DataStream的使用方式

① 数据库修改配置my.cnf文件:binlog_format=row

flink cdc sql,数据库,java,flink

 ② 直接上代码

package com.bug.flinkcdc;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 2、DataStream的方式
 */
public class TestFlinkStream {
    /**
     * 2、DataStream的方式
     */
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//线程数
        //开启ck
//        env.enableCheckpointing(60*1000);//60秒启动一次checkpoint
//        env.getCheckpointConfig().setCheckpointTimeout(30*1000);//设置超时时间,默认是10min
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//Checkpoint级别,EXACTLY_ONCE精准一次,AT_LEAST_ONCE最多一次
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//设置两次checkpoint的最小时间间隔
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//允许的最大checkpoint并行度
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));//设置checkpoint的地址
        //构建sourceFunction环境,正式开发可以把一些配置提取出来写成公共配置即可
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("***.***.***.***")//ip地址
                .port(***)//端口号
                .username("***")//用户名
                .password("***")//密码
                .databaseList("xiaobug")//数据库名称
                .tableList("xiaobug.test_flink")//表名称
                .deserializer(new StringDebeziumDeserializationSchema())//反序列化
                .startupOptions(StartupOptions.initial())//同步方式,initial全量和增量,latest增量
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        //数据输出
        dataStreamSource.print();
        //启动
        env.execute();
    }

}

③ 效果

flink cdc sql,数据库,java,flink

 3、SQL的方式

① 数据库修改配置my.cnf文件:binlog_format=row

 flink cdc sql,数据库,java,flink

 ② 代码

package com.bug.flinkcdc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 3、SQL的方式
 */
public class TestFlinkSQL {
    /**
     * 3、SQL的方式
     */
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//线程数
        StreamTableEnvironment tev = StreamTableEnvironment.create(env);
        //正式开发时可以把这些语句做成单独的sql文件,更方便管理和维护,with的配置也可以做成公共的,然后读取即可
        tev.executeSql("CREATE TABLE test_flink (" +
                " userid String primary key," +
                " username String," +
                " userAge String," +
                " userCardid String" +
                " ) with ( " +
                " 'connector' = 'mysql-cdc'," +     //别名
                " 'hostname' = '***.***.***.***'," +  //数据库ip地址
                " 'port' = '***'," +               //端口号
                " 'username' = '***'," +           //用户名
                " 'password' = '***'," +           //密码
                " 'database-name' = 'xiaobug'," +   //数据库名称
                " 'table-name' = 'test_flink' " +   //表名称
                ")");
        //查询数据sql,也可以写在单独的文件里,然后引用即可,复杂的连表查询也是可以的,但需要其他表也进行加载
        Table table = tev.sqlQuery("select * from test_flink");
        //输出,正式开发可以用sql语句的insert into进行插入,直接实现表到表的同步
        DataStream<Tuple2<Boolean, Row>> dataStream = tev.toRetractStream(table,Row.class);
        dataStream.print();
        //启动
        env.execute("FlinkSQLCDC");
    }

}

 ③ 效果

flink cdc sql,数据库,java,flink

 

总结

搞定啦,就是这么简单!flinkcdc的进阶:怎样确保数据的一致性、可靠性、不重复、不丢失,后面有时间再写啦。

测试的时候还碰到了一个jar包版本的问题,sql的方式一定要使用1.13.0以上的版本,不然会报错!

还有flink-sql-parser的这个包也一定要添加,不然会出现下面这个提示:

org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList;文章来源地址https://www.toymoban.com/news/detail-644983.html

到了这里,关于flink cdc数据同步,DataStream方式和SQL方式的简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(50)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(48)
  • 业务数据同步工具介绍和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介绍 Sqoop : SQ L-to-Had oop ( Apache已经终止Sqoop项目 ) 用途:把关系型数据库的数据转移到HDFS(Hive、Hbase)(重点使用的场景);Hadoop中的数据转移到关系型数据库中。Sqoop是java语言开发的,底层使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是数据块的转移,没有使

    2024年02月15日
    浏览(76)
  • flink cdc DataStream api 时区问题

    以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题: 源表: sink 表: 解决方案:在自定义序列化时进行处理。 java code scala code mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是

    2024年02月07日
    浏览(37)
  • Flink CDC数据同步

    一、什么是FLink Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 接下来,我们来介绍一下 Flink 架构中的重要方面。 任何类型的数据都可以形成一种事

    2024年02月08日
    浏览(43)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(34)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(50)
  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(63)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(53)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包