Flink CDC介绍和简单实用

这篇具有很好参考价值的文章主要介绍了Flink CDC介绍和简单实用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

种类

基于查询和基于binlog
Flink CDC介绍和简单实用

基于日志的 CDC 方案介绍

从 ETL 的角度进行分析,一般采集的都是业务库数据,这里使用 MySQL 作为需要采集的数据库,通过 Debezium 把 MySQL Binlog 进行采集后发送至 Kafka 消息队列,然后对接一些实时计算引擎或者 APP 进行消费后把数据传输入 OLAP 系统或者其他存储介质。
Flink 希望打通更多数据源,发挥完整的计算能力。我们生产中主要来源于业务日志和数据库日志,Flink 在业务日志的支持上已经非常完善,但是在数据库日志支持方面在 Flink 1.11 前还属于一片空白,这就是为什么要集成 CDC 的原因之一。
Flink SQL 内部支持了完整的 changelog 机制,所以 Flink 对接 CDC 数据只需要把CDC 数据转换成 Flink 认识的数据,所以在 Flink 1.11 里面重构了 TableSource 接口,以便更好支持和集成 CDC。
Flink CDC介绍和简单实用
Flink CDC介绍和简单实用
重构后的 TableSource 输出的都是 RowData 数据结构,代表了一行的数据。在RowData 上面会有一个元数据的信息,我们称为 RowKind 。RowKind 里面包括了插入、更新前、更新后、删除,这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的 JSON 格式,包含了旧数据和新数据行以及原数据信息,op 的 u表示是 update 更新操作标识符,ts_ms 表示同步的时间戳。因此,对接 Debezium JSON 的数据,其实就是将这种原始的 JSON 数据转换成 Flink 认识的 RowData。

flink作为etl工具

原工作原理
Flink CDC介绍和简单实用
优化后
Flink CDC介绍和简单实用
Flink SQL 采集+计算+传输(ETL)一体化优点:
 开箱即用,简单易上手
 减少维护的组件,简化实时链路,减轻部署成本
 减小端到端延迟
 Flink 自身支持 Exactly Once 的读取和计算
 数据不落地,减少存储成本
 支持全量和增量流式读取
 binlog 采集位点可回溯

应用场景

 实时数据同步,数据备份,数据迁移,数仓构建
优势:丰富的上下游(E & L),强大的计算(T),易用的 API(SQL),流式计算低延迟
 数据库之上的实时物化视图、流式数据分析
 索引构建和实时维护
 业务 cache 刷新
 审计跟踪
 微服务的解耦,读写分离
 基于 CDC 的维表关联

开源地址

https://github.com/ververica/flink-cdc-connectors

最新flink cdc官方文档分享

https://flink-learning.org.cn/article/detail/eed4549f80e80cc30c69c406cb08b59a

流程图

个人理解作图
Flink CDC介绍和简单实用

1.X痛点

Flink CDC介绍和简单实用所以设计目标
Flink CDC介绍和简单实用
设计实现上
在对于有主键的表做初始化模式,整体的流程主要分为5个阶段:
1.Chunk切分;2.Chunk分配;(实现并行读取数据&CheckPoint)
3.Chunk读取;(实现无锁读取)
4.Chunk汇报;
5.Chunk分配。
对于并发线程
会对比各个读取切分的最高和最低的位置区间,超过区间进行更新

目前支持开发方式

个人理解作图
Flink CDC介绍和简单实用

开发测试大致流程

个人理解作图
Flink CDC介绍和简单实用

使用

mysql开启binlog

vi /etc/my.cnf 底部追加

server_id=2
log_bin=mysql-bin
binlog_format=ROW
# 下面这行可写可不写  监控对应的数据库
binlog_do_db=elebap_bak

重启mysqld服务, 并启动mysql

systemctl restart mysqld
或者
bin/mysqld --initialize --user=root --basedir=/usr/local/mysql --datadir=/data/mysql
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+

mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON                             |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)

log_bin显示ON开启状态。
mysql的建表以及插入数据:

CREATE TABLE study(
	ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT ,
	NAME VARCHAR(20) NOT NULL,
	AGE INT(10)
);

INSERT INTO study VALUES(1 , 'a' , 10);
INSERT INTO study VALUES(2 , 'b' , 11);
INSERT INTO study VALUES(3 , 'c' , 12);
INSERT INTO study VALUES(4 , 'd' , 13);
INSERT INTO study VALUES(5 , 'e' , 14);
INSERT INTO study VALUES(6 , 'f' , 15);

代码

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * @author wyi
 * @date 2022/8/18 11:06
 * @description
 */
public class flinkcdcTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties pops = new Properties();
        pops.setProperty("debezium.snapshot.locking.mode", "none");

        DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource.<JSONObject>builder()
                .hostname("192.168.80.161")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList(BussinessConstant.DATABASE_LIST)
                .tableList(BussinessConstant.ABLE_LIST_ALARM_CONFIG_CAP_UNBALANCE)
                .deserializer(new TestRuleDeserialization())
                .build();
        SingleOutputStreamOperator<Object> map = env.addSource(mysqlSource).map(new MapFunction<JSONObject, Object>() {
            @Override
            public Object map(JSONObject jsonObject) throws Exception {
                return jsonObject.toString();
            }
        });
        map.print();
        env.execute("CdcMysqlSource");
    }
}

自定义的序列化类

package com.cosmosource.da.cdc;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
 * @author wyi
 * @date 2022/8/18 10:32
 * @description 这是一个demo,测试flink-cdc连接mysql的反序列化类
 */
public class TestRuleDeserialization implements DebeziumDeserializationSchema<JSONObject> {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) throws Exception {
        //获取主题
        String topic = sourceRecord.topic();
        String[] arr = topic.split("\\.");
        String db = arr[1];
        String tableName = arr[2];

        System.out.println(arr[1]);
        System.out.println(arr[2]);

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取值信息并转换为Struct类型
        Struct value = (Struct) sourceRecord.value();

        System.out.println("value:"+value);
        //获取变化后的数据
        Struct after = value.getStruct("after");

        //创建JSON对象用于存储数据信息
        JSONObject data = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            data.put(field.name(), o);
        }

        //创建JSON对象用于封装最终返回值数据信息
        JSONObject result = new JSONObject();
        result.put("operation", operation.toString().toLowerCase());
        result.put("data", data);
        result.put("database", db);
        result.put("table", tableName);

        //发送数据至下游
        collector.collect(result);


    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {

        return TypeInformation.of(JSONObject.class);
    }
}

结果:文章来源地址https://www.toymoban.com/news/detail-461224.html

……………………………………
……………………………………
……………………………………

wy
study
value:Struct{after=Struct{ID=9,NAME=1,AGE=15},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wy,table=study,server_id=0,file=mysql-bin.000001,pos=3128,row=0},op=c,ts_ms=1660793058775}
八月 18, 2022 11:24:19 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.80.161:3306 at mysql-bin.000001/3128 (sid:5501, cid:24)
5> {"database":"wy","data":{"ID":4,"NAME":"d","AGE":13},"operation":"create","table":"study"}
1> {"database":"wy","data":{"ID":8,"NAME":"1","AGE":15},"operation":"create","table":"study"}
8> {"database":"wy","data":{"ID":7,"NAME":"f","AGE":15},"operation":"create","table":"study"}
4> {"database":"wy","data":{"ID":3,"NAME":"c","AGE":12},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":1,"NAME":"a","AGE":10},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":9,"NAME":"1","AGE":15},"operation":"create","table":"study"}
3> {"database":"wy","data":{"ID":2,"NAME":"b","AGE":11},"operation":"create","table":"study"}
6> {"database":"wy","data":{"ID":5,"NAME":"e","AGE":14},"operation":"create","table":"study"}
7> {"database":"wy","data":{"ID":6,"NAME":"f","AGE":15},"operation":"create","table":"study"}

到了这里,关于Flink CDC介绍和简单实用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-CDC】Flink CDC 介绍和原理概述

    CDC是( Change Data Capture 变更数据获取 )的简称。 核心思想是, 监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 主要分为基于查询和基于

    2024年01月20日
    浏览(46)
  • Flink CDC介绍

    1.CDC概述 CDC(Change Data Capture)是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。 传统上,数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是,这

    2024年02月11日
    浏览(32)
  • Flink CDC介绍及原理

    CDC (Change Data Capture) 是一种用于 捕捉数据库变更数据 的技术,Flink 从 1.11 版本开始原生支持 CDC 数据(changelog)的处理,目前已经是非常成熟的变更数据处理方案。 Flink CDC Connectors 是 Flink 的一组 Source 连接器,是 Flink CDC 的核心组件,这些连接器负责从  MySQL、PostgreSQL、Ora

    2024年02月11日
    浏览(24)
  • flink cdc数据同步,DataStream方式和SQL方式的简单使用

    目录 一、flink cdc介绍 1、什么是flink cdc 2、flink cdc能用来做什么 3、flink cdc的优点 二、flink cdc基础使用 1、使用flink cdc读取txt文本数据 2、DataStream的使用方式 3、SQL的方式 总结 flink cdc是一个由阿里研发的,一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数

    2024年02月13日
    浏览(41)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • 业务数据同步工具介绍和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介绍 Sqoop : SQ L-to-Had oop ( Apache已经终止Sqoop项目 ) 用途:把关系型数据库的数据转移到HDFS(Hive、Hbase)(重点使用的场景);Hadoop中的数据转移到关系型数据库中。Sqoop是java语言开发的,底层使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是数据块的转移,没有使

    2024年02月15日
    浏览(81)
  • SQL server开启变更数据捕获(CDC)

    多多点赞,会变好看! 多多留言,会变有钱! 变更数据捕获(Change Data Capture ,简称 CDC):记录 SQL Server 表的插入、更新和删除操作。开启cdc的源表在插入、更新和删除操作时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可

    2024年02月11日
    浏览(36)
  • 33、Flink之hive介绍与简单示例

    1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动

    2024年02月10日
    浏览(63)
  • Flink作业任务的9种状态简单介绍

    ​ 当创建一个Flink任务后,该任务可能会经历多种状态。目前Flink给任务共定义了9种状态,包括: Created , Running , Finished , Cancelling , Canceled , Restarting , Failing , Failed , Suspended 。下面这张图详细展示了一个Job可能会经历的所有状态。 最简单的一种状态就是:作业启动

    2024年02月02日
    浏览(33)
  • flink如何监听kafka主题配置变更

    从前一篇文章我们知道flink消费kafka主题时是采用的手动assign指定分区的方式,这种消费方式是不处理主题的rebalance操作的,也就是消费者组中即使有消费者退出或者进入也是不会触发消费者所消费的分区的,那么疑问就来了,那是否比如kafka主题分区变多,或者新增了满足

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包