60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

这篇具有很好参考价值的文章主要介绍了60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,本文依赖Flink 集群环境、mysql。

本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版文章来源地址https://www.toymoban.com/news/detail-827770.html

一、Flink CDC Connectors介绍

本文介绍的CDC是基于2.4版本,当前版本已经发布至3.0,本Flink 专栏介绍是基于Flink 1.17版本,CDC 2.4版本支持到1.17版本。

1、CDC Connectors介绍及架构图

Apache Flink®的CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。

了解更多关于Debezium的信息。

或者参考:37、Flink 的CDC 格式:debezium部署以及mysql示例

flinkcdc streaming elt,# Flink专栏,flink,mysql,kafka,flink hive,flink sql,elasticsearch,flink cdc

2、支持的连接

flinkcdc streaming elt,# Flink专栏,flink,mysql,kafka,flink hive,flink sql,elasticsearch,flink cdc

3、Flink CDC与 Flink 版本关系

flinkcdc streaming elt,# Flink专栏,flink,mysql,kafka,flink hive,flink sql,elasticsearch,flink cdc

4、特性介绍

  • 支持读取数据库快照,并在处理失败后立即继续读取binlog。
  • CDC连接器用于DataStream API,用户可以在一个作业中使用多个数据库和表的更改,而无需部署Debezium和Kafka。
  • 用于Table/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源以监视单个表上的更改。

下表显示了连接器的当前功能:
flinkcdc streaming elt,# Flink专栏,flink,mysql,kafka,flink hive,flink sql,elasticsearch,flink cdc

5、flink sql client集成flink cdc

1)、集成步骤

1、需要有一个flink的集群环境
具体搭建参考:2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤

2、下载flink cdc的jar并放在FLINK_HOME/lib/目录下面
下载地址:https://github.com/ververica/flink-cdc-connectors/releases

3、重启flink集群

2)、示例:捕获mysql的user表数据变化情况

本示例的前提是设置好了binlog,具体设置方式可以参考文章:
37、Flink 的CDC 格式:debezium部署以及mysql示例

Flink SQL> CREATE TABLE mysql_binlog_user (
>  id INT NOT NULL,
>  name STRING,
>  age INT,
>  PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = '192.168.10.44',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = '123456',
>  'database-name' = 'cdctest',
>  'table-name' = 'user'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from mysql_binlog_user;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           4 |                        test456 |        8888 |
| +I |           2 |                       alanchan |          20 |
| +I |           3 |                    alanchanchn |          33 |
| +I |           1 |                           alan |          18 |
| -U |           4 |                        test456 |        8888 |
| +U |           4 |                        test123 |        8888 |
| -U |           4 |                        test123 |        8888 |
| +U |           4 |                        test123 |       66666 |
| -D |           4 |                        test123 |       66666 |
| +I |           4 |                   alanchanchn2 |         100 |

Flink SQL> select name ,sum(age) from mysql_binlog_user group by name;
+----+--------------------------------+-------------+
| op |                           name |      EXPR$1 |
+----+--------------------------------+-------------+
| +I |                   alanchanchn2 |         100 |
| +I |                       alanchan |          20 |
| +I |                    alanchanchn |          33 |
| +I |                           alan |          18 |


6、flink datastream API集成flink cdc

本示例是捕获mysql cdctest库的user表数据变化情况。

1)、maven依赖

使用flink cdc添加如下依赖即可,但flink本身的运行环境相关依赖需要添加。

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
	<groupId>com.ververica</groupId>
	<artifactId>flink-sql-connector-mysql-cdc</artifactId>
	<version>2.4.0</version>
	<scope>provided</scope>
</dependency>

2)、代码实现


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.extern.slf4j.Slf4j;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Slf4j
public class TestFlinkCDCFromMysqlDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(3000);

		MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
				.hostname("192.168.10.44")
				.port(3306)
				.databaseList("cdctest") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
				.tableList("cdctest.user") // 设置捕获的表
				.username("root")
				.password("123456")
				.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
				.build();

		DataStream<String> result = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
		log.info(result.toString());
		result.map(new MapFunction<String,String>() {

			@Override
			public String map(String value) throws Exception {
				log.info("value ======={}",value);
				return value;
			}
			
		});

		env.execute();
	}
}

3)、验证

在程序运行起来后,对cdctest.user表的数据进行添加、修改、删除操作,观察程序控制台日志输出情况

08:50:26.819 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":2,"name":"alanchan","age":20},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626222,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":3,"name":"alanchanchn","age":33},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":1,"name":"alan","age":18},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626221,"transaction":null}
08:50:26.822 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":null,"after":{"id":4,"name":"test456","age":999000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
一月 22, 2024 8:50:27 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect信息: 
Connected to 192.168.10.44:3306 at alan_master_logbin.000004/10816 (sid:6116, cid:565)
08:50:56.030 [Source: MySQL Source -> Map (1/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo  - value ======={"before":{"id":4,"name":"test456","age":999000},"after":{"id":4,"name":"test456","age":8888},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1705884032000,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":11010,"row":0,"thread":557,"query":null},"op":"u","ts_ms":1705884655747,"transaction":null}

4)、debezium数据格式介绍

关于debezium更多的信息可以参考:37、Flink 的CDC 格式:debezium部署以及mysql示例

在flink cdc的版本中,不需要特别对debezium数据格式进行处理,默认的形如下面的内容,也即不带schema的,解析方式参考上例。

{

	"before": {
		"name": "alan_test",
		"scores": 666.0
	},
	"after": {
		"name": "alan_test",
		"scores": 888.0
	},
	"source": {
		"version": "1.7.2.Final",
		"connector": "mysql",
		"name": "ALAN",
		"ts_ms": 1705717298000,
		"snapshot": "false",
		"db": "cdctest",
		"sequence": null,
		"table": "userscoressink",
		"server_id": 1,
		"gtid": null,
		"file": "alan_master_logbin.000004",
		"pos": 4931,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "u",
	"ts_ms": 1705717772785,
	"transaction": null

}

在某些情况下可能需要带schema的,形如下例,

如果需要解析则需要将JsonDebeziumDeserializationSchema()改成JsonDebeziumDeserializationSchema(true)

一般推荐使用系统默认的,不带schema的数据格式。


{
	"schema": {
		"type": "struct",
		"fields": [{
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "before"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": true,
				"field": "name"
			}, {
				"type": "double",
				"optional": true,
				"field": "scores"
			}],
			"optional": true,
			"name": "ALAN.cdctest.userscoressink.Value",
			"field": "after"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "version"
			}, {
				"type": "string",
				"optional": false,
				"field": "connector"
			}, {
				"type": "string",
				"optional": false,
				"field": "name"
			}, {
				"type": "int64",
				"optional": false,
				"field": "ts_ms"
			}, {
				"type": "string",
				"optional": true,
				"name": "io.debezium.data.Enum",
				"version": 1,
				"parameters": {
					"allowed": "true,last,false"
				},
				"default": "false",
				"field": "snapshot"
			}, {
				"type": "string",
				"optional": false,
				"field": "db"
			}, {
				"type": "string",
				"optional": true,
				"field": "sequence"
			}, {
				"type": "string",
				"optional": true,
				"field": "table"
			}, {
				"type": "int64",
				"optional": false,
				"field": "server_id"
			}, {
				"type": "string",
				"optional": true,
				"field": "gtid"
			}, {
				"type": "string",
				"optional": false,
				"field": "file"
			}, {
				"type": "int64",
				"optional": false,
				"field": "pos"
			}, {
				"type": "int32",
				"optional": false,
				"field": "row"
			}, {
				"type": "int64",
				"optional": true,
				"field": "thread"
			}, {
				"type": "string",
				"optional": true,
				"field": "query"
			}],
			"optional": false,
			"name": "io.debezium.connector.mysql.Source",
			"field": "source"
		}, {
			"type": "string",
			"optional": false,
			"field": "op"
		}, {
			"type": "int64",
			"optional": true,
			"field": "ts_ms"
		}, {
			"type": "struct",
			"fields": [{
				"type": "string",
				"optional": false,
				"field": "id"
			}, {
				"type": "int64",
				"optional": false,
				"field": "total_order"
			}, {
				"type": "int64",
				"optional": false,
				"field": "data_collection_order"
			}],
			"optional": true,
			"field": "transaction"
		}],
		"optional": false,
		"name": "ALAN.cdctest.userscoressink.Envelope"
	},
	"payload": {
		"before": {
			"name": "alan_test",
			"scores": 666.0
		},
		"after": {
			"name": "alan_test",
			"scores": 888.0
		},
		"source": {
			"version": "1.7.2.Final",
			"connector": "mysql",
			"name": "ALAN",
			"ts_ms": 1705717298000,
			"snapshot": "false",
			"db": "cdctest",
			"sequence": null,
			"table": "userscoressink",
			"server_id": 1,
			"gtid": null,
			"file": "alan_master_logbin.000004",
			"pos": 4931,
			"row": 0,
			"thread": null,
			"query": null
		},
		"op": "u",
		"ts_ms": 1705717772785,
		"transaction": null
	}
}

以上,本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。

本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版

到了这里,关于60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(66)
  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(78)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(60)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(48)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(76)
  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(87)
  • 最新版Flink CDC MySQL同步Elasticsearch(一)

    首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客 注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写

    2024年02月13日
    浏览(38)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(47)
  • 实战:大数据Flink CDC同步Mysql数据到ElasticSearch

    前面的博文我们分享了大数据分布式流处理计算框架Flink和其基础环境的搭建,相信各位看官都已经搭建好了自己的运行环境。那么,今天就来实战一把使用Flink CDC同步Mysql数据导Elasticsearch。 CDC简介 CDC 的全称是 Change Data Capture(变更数据捕获技术) ,在广义的概念上,只要

    2024年02月09日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包