Flink学习13-Flink CDC

这篇具有很好参考价值的文章主要介绍了Flink学习13-Flink CDC。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、CDC简介

cdc全称 Change Data Capture 变更数据捕获。通俗来讲只要能捕获到变更的数据的技术都可以称为cdc。常见的开源技术有以下几种:
canal:https://github.com/alibaba/canal
maxwell:https://github.com/zendesk/maxwell
Debezium:https://github.com/debezium/debezium
flink-cdc:https://github.com/ververica/flink-cdc-connectors
以下是几种技术的横向对比
Flink学习13-Flink CDC,flink,学习,大数据

二、canal+maxwell

两者实现原理类似。canal模拟mysql主从复制过程,把自己当做从库。通过dump操作把binlog从主库读取到从库,然后根据binlog进行数据还原。maxwell原理同理。两者区别在于maxwell是一款轻量级框架,可拓展性较少。比如它支持处理json格式数据,并把数据发送到kafka、redis等中。而canal可以自定义数据格式,而且并不局限于将数据发送到特定的数据存储介质中。
下面以canal举例说明:

2.1 安装canal

安装很简单,选择release版本下载解压即可:https://github.com/alibaba/canal/releases?page=2。
这里需要说明的是deployer是个人开发版。
Flink学习13-Flink CDC,flink,学习,大数据

2.2 配置canal

下载解压完成之后如下:
Flink学习13-Flink CDC,flink,学习,大数据
进入conf/example/instance.porperties中修改如下配置
Flink学习13-Flink CDC,flink,学习,大数据
在canal安装目录下的conf下找到canal.properties,主要修改数据需要发送的介质。默认配置是tcp,这个可以自定义将数据写入到其他地方,如果需要将canal采集到的数据写入kafka topic就选择第二种。
Flink学习13-Flink CDC,flink,学习,大数据
配置完成之后启动:/bin/startup.sh

2.3 配置mysql binlog

mysql数据源需要提前开启binlog,找到my.cnf配置如下:
Flink学习13-Flink CDC,flink,学习,大数据
然后重启mysql服务。
登录mysql执行一下,然后查看是on就代表开启成功
Flink学习13-Flink CDC,flink,学习,大数据

2.4代码实现

pom依赖

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
</dependency>

import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.base.CaseFormat;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

public class CanalClientApp {
    public static void main(String[] args) throws Exception{
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), //本地部署就用localhost,端口默认
                "example", null, null);
        //设置死循环,一直访问connector中数据
        while (true){
            connector.connect(); //进行链接
            connector.subscribe("test.*");//设置需要监控的库表
            Message message = connector.get(100);//设置获取一批数据量大小

            List<CanalEntry.Entry> entries = message.getEntries(); //获取一批消息的list集合
            if(entries.size() > 0){ //如果list中有数据就遍历取出
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName(); //获取header中请求到的表名

                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());//value值转换成string类型
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

                    //insert update delete ...
                    CanalEntry.EventType eventType = rowChange.getEventType();

                    if(eventType == CanalEntry.EventType.INSERT){
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            HashMap<String, String> map = new HashMap<>();
                            for (CanalEntry.Column column : afterColumnsList) {
                                String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                                map.put(key, column.getValue());
                            }

                            /**
                             * TODO...
                             * 到这一步,下面要做的事情就是把map的数据发送到想要的地方去...
                             */
                            System.out.println("tableName:"+ tableName + " , " + JSON.toJSONString(map));
                        }
                    }
                }
            }

        }
    }
}

mysql写入数据:
Flink学习13-Flink CDC,flink,学习,大数据
idea实时打印输出
Flink学习13-Flink CDC,flink,学习,大数据

三、Debezium+Flink-CDC

Debezium是为Kafka Connect而建的一系列Source Connectors,每个Source Connector会根据对应数据库特性来捕获数据变更记录。不像其他方法,例如,轮询或者双写等。Debezium是基于日志进行捕获变更的。而flink-cdc(1.x版本)和Debezium一脉相承。接下来通过案例简单了解下flink-cdc的使用方式。

3.1 flink-cdc解析

Flink学习13-Flink CDC,flink,学习,大数据
Flink学习13-Flink CDC,flink,学习,大数据
官方社区的解析已经很清晰明了,通过两种方式对比发现,flink-cdc节省了很大一部分运维成本。传统etl中flink只负责计算,而flink-cdc将采集计算为一体。当然看到了flink-cdc的优势,也需要了解当前版本(1.x)的局限性。因为flink-cdc底层采用了Debezium框架,数据读取分为全量+增量模式。在全量读取数据的时候为了保证数据一致性会加上一个全局锁,如果数据量非常大读取数据会以小时级别计算。切如果在全量读取阶段任务运行失败是无法进行checkpoint的。
简单来说,flink-cdc第一阶段读取全量数据时默认会加一个全局锁,会拒绝其他事务提交update操作,这样可以保证数据一致性,但数据量特别大时,可能会导致数据库hang住。
于是官方又更新了flink-cdc 2.x版本。这个版本主要解决锁还有无法checkpoint的问题。主要原理是chunk算法+SourceEnumerator组件实现。
Flink学习13-Flink CDC,flink,学习,大数据
flink-cdc 2.x版本读取流程如上图所述,首先根据chunk算法对binlog进行切片。每个i切片分区内数据不重合。正在读写的切片如果有数据更新的话,会将更新后的数据输出。从而实现不加锁的方式下保证数据读的一致性。

3.2 flink-cdc读取mysql binlog数据

实现方式分为两种:https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html#usage-for-datastream-api
Datastream Api
pom依赖里需要添加如下:

<dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.3.0</version>
            <scope>provided</scope>
        </dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // 默认json反序列化器,进阶版可以用自定义反序列化器
            .build();
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // enable checkpoint
    env.enableCheckpointing(3000);
    
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
    env.execute("Print MySQL Snapshot + Binlog");
  }
}

Table Api

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkTableCDCApp {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableenv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        tableenv.executeSql("create table mysql_bin(" +
                "id INT primary key, " +
                "name STRING" +
                ") with(" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'ip'," +
                "'port' = '3306'," +
                "'username' = 'yourUsername'," +
                "'password' = 'yourPassword'," +
                "'database-name' = 'yourDatabaseName'," +
                "'table-oname' = 'yourTableName'" +
                ")"
        );
        Table table = tableenv.sqlQuery("select * from mysql_bin");
        tableenv.toRetractStream(table, Row.class).print();

        env.execute();
    }
}

这里需要注意的是当前只是本地编译,没有提交flink。如果任务上环境提交运行需要提前将此jar放在FLINK_HOME/lib/目录下去。
Flink学习13-Flink CDC,flink,学习,大数据
还需要有一点说明的是,我在本地编译的时候选择flink-cdc的版本是1.x,如上图。实际上不同版本的flink-cdc对应的flink版本都不同。以下版本对应关系。
Flink学习13-Flink CDC,flink,学习,大数据文章来源地址https://www.toymoban.com/news/detail-794492.html

到了这里,关于Flink学习13-Flink CDC的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink cdc多种数据源安装、配置与验证 flink cdc多种数据源安装、配置与验证

      搜索 文章目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验证 本文目录结构

    2024年02月09日
    浏览(44)
  • Flink CDC数据同步

    一、什么是FLink Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 接下来,我们来介绍一下 Flink 架构中的重要方面。 任何类型的数据都可以形成一种事

    2024年02月08日
    浏览(46)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 【大数据】Flink 详解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月25日
    浏览(50)
  • 基于 Flink CDC 的现代数据栈实践

    摘要:本文整理自阿里云技术专家,Apache Flink PMC Member Committer, Flink CDC Maintainer 徐榜江和阿里云高级研发工程师,Apache Flink Contributor Flink CDC Maintainer 阮航,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分: 1.深入解读 Flink CDC 2.3 版本 2.基于 Flink CDC 构建

    2024年02月09日
    浏览(42)
  • 【大数据】Flink CDC 的概览和使用

    CDC ( Change Data Capture , 数据变更抓取 )是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

    2024年01月24日
    浏览(53)
  • Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成、实时数据入库入仓、最详细的教程。Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据库中获取变更流并接入到Flink中,Apache Flink作为一款非常优秀的流处理引擎,其SQL API又

    2024年02月13日
    浏览(62)
  • 大数据技术之 Flink-CDC

    CDC 是 Change Data Capture(变更数据获取)的简称。在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以

    2024年02月05日
    浏览(54)
  • 【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(55)
  • Flink CDC 实时抽取 Oracle 数据-排错&调优

    Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。对该版本进行试用并成功实现了对 Oracle 的实时数据捕获以及性能调优,现将试用过程中的一些关键细节进行分享。 Oracle:11.2.0.4.0(RAC 部署) Flink:1.13.1 Hadoop:3.2.1

    2024年01月16日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包