flink-cdc-学习笔记(一)

这篇具有很好参考价值的文章主要介绍了flink-cdc-学习笔记(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.flink cdc简介

Flink 1.11 引入了 CDC.
Flink CDC 是一款基于 Flink 打造一系列数据库的连接器。Flink 是流处理的引擎,其主要消费的数据源是类似于一些点击的日志流、曝光流等数据,但在业务场景中,点击流的日志数据只是一部分,具有更大价值的数据隐藏在用户的业务数据库中。Flink CDC 弥补了 Flink 读取这些数据的缺陷,能够通过流式的方式读取数据库中的增量变更的日志。

1.1应用数据场景 CDC

1.日志文件数据(appendOn)
2.数据库数据(CRUD)

1.2同类型产品的对比
基于查询的CDC 基于日志的CDC
开源产品 sqoop,kafka jdbc,datax canal,flink-cdc
执行模式 batch streaming
捕获所有数据的变化 ×
低延迟,不增加数据库的负担 ×
不入侵业务代码(lastUpdate) ×
捕获删除事件 ×
更新记录状态 ×

经过以上对比,我们可以发现基于日志 CDC 有以下这几种优势:

能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失
每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势
无需入侵业务,业务解耦,无需更改业务模型
捕获删除事件和捕获旧记录的状态,在查询 CDC 中,周期的查询无法感知中间数据是否删除

ETL 常用解决方案 采集/计算/传输

传统方案:mysql->kafka-connect->kafka —>app/flink/storm—>ES/Pgsql
flink方案: mysql---->flink---->ES/Pgsql

flink-cdc-connectors 可以用来替换 Debezium+Kafka 的数据采集模块,从而实现 Flink SQL 采集+计算+传输(ETL)一体化,这样做的优点有以下:
开箱即用,简单易上手
减少维护的组件,简化实时链路,减轻部署成本
减小端到端延迟
Flink 自身支持 Exactly Once 的读取和计算
数据不落地,减少存储成本
支持全量和增量流式读取
binlog 采集位点可回溯*

2.flink-cdc-sql-client如何使用

启动服务
./bin/start-cluster.sh
关闭服务
./bin/stop-cluster.sh

启动客户端
./bin/sql-client.sh embedded
默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置。有关环境配置文件结构的更多信息,请参见配置部分。
在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索
CLI 为维护和可视化结果提供三种模式。
表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET execution.result-mode=table;
变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET execution.result-mode=changelog;
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET execution.result-mode=tableau;

环境配置文件

SQL 查询执行前需要配置相关环境变量。环境配置文件 定义了 catalog、table sources、table sinks、用户自定义函数和其他执行或部署所需属性。
详情见:<flinck-cdc-yaml-demo.md>

重启策略(Restart Strategies)

重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的全局重启策略相似,更细精度的重启配置可以在环境配置文件中声明。

依赖 (略)

自定义函数(略)

Catalogs(略)

分离的 SQL 查询(略)

SQL 视图(略)

临时表(Temporal Table)(略)

3.flink-cdc-connect 案例

1.Flink JDBC Connector:Flink 与数据库集成最佳实践
2.基于 Flink SQL CDC 的实时数据同步方案

4.使用脚本实现CDC数据同步

1.下载flink-cdc的二进制包flink-cdc-3.0.tar

2.编写一个任务脚本mysql-to-doris.yaml,内容如下.

source:
  type: mysql
  host: localhost
  port: 3306
  username: admin
  password: pass
  tables: db0.commodity, db1.user_table_[0-9]+, [app|web]_order_\.*

sink:
  type: doris
  fenodes: FE_IP:HTTP_PORT
  username: admin
  password: pass

pipeline:
  name: mysql-sync-doris
  parallelism: 4

3.提交任务到flink集群

# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml
Pipeline "mysql-sync-doris" is submitted with Job ID "DEADBEEF".

5.springboot集成flink-cdc-mysql-connector实现CDC

1.[Flink SQL Client]创建一个cdc源头

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

2.项目里pom.xml引入cdc-connector的包

<dependency>
  <groupId>com.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-mysql-cdc</artifactId>

  <version>3.0.0</version>
</dependency>

3.编写实现类调用任务流程

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

通过下面的指令下载flink-cdc源码本地进行编译

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

这样flink-cdc就安装到本地maven库了.

参考资料

flink官网

Apache官方社区

wiki

文档

视频介绍
基于 Flink SQL CDC 的实时数据同步方案

阿里云-开发者社区

案例

基于 Flink SQL CDC 的实时数据同步方案
Flink JDBC Connector:Flink 与数据库集成最佳实践文章来源地址https://www.toymoban.com/news/detail-846240.html

到了这里,关于flink-cdc-学习笔记(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比_HiBoyljw的博客-CSDN博客        FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾),并维

    2024年02月07日
    浏览(11)
  • flink-cdc同步mysql数据到elasticsearch

    flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(11)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(20)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(15)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(14)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(14)
  • Flink-CDC Cannot instantiate the coordinator for operator Source

    在使用flink1.14.6版本cdc时出现报错: Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.11-1.14.6.jar:1.14.6] at java.util.concurrent.CompletableFuture.uniWhenComp

    2024年02月12日
    浏览(9)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(56)
  • 为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

    为什么Flink-CDC读取Decimal等数值类型变成了非数值字符串

    每遇到一个问题,在经过努力研究明白之后,总想写点东西记录。怎奈又没这个好习惯,过了一两天这个激情就没了,想写也写不出来了。最近在做一个flink-cdc采集数据的测试和产品化开发,遇到一个数据转换的问题,折腾了我两个早上,有些心血来潮,就记录一下吧,对我

    2023年04月09日
    浏览(13)
  • 【现场问题】flink-cdc,Oracle2Mysql的坑,Oracle区分大小写导致

    【现场问题】flink-cdc,Oracle2Mysql的坑,Oracle区分大小写导致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入为空的数值。 为什么会报这个错误,我们来看DML的执行语句: insert into t_wx_target select

    2024年02月12日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包