Flink CDC 详解

这篇具有很好参考价值的文章主要介绍了Flink CDC 详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


一、CDC 简介 ?

什么是 CDC ?

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的 CDC 基于 Binlog 的 CDC
开源产品 Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

关于 Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。

目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 方式的应用

A、导入依赖

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>1.12.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.12</artifactId>
		<version>1.12.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.12</artifactId>
		<version>1.12.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>3.1.3</version>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.49</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner-blink_2.12</artifactId>
		<version>1.12.0</version>
	</dependency>
	<dependency>
		<groupId>com.ververica</groupId>
		<artifactId>flink-connector-mysql-cdc</artifactId>
		<version>2.0.0</version>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.75</version>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-assembly-plugin</artifactId>
			<version>3.0.0</version>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

B、编写代码

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class FlinkCDC {
	public static void main(String[] args) throws Exception {
		//1.创建执行环境
		StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
		//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
		env.enableCheckpointing(5000L);
		//2.2 指定 CK 的一致性语义
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		//2.3 设置任务关闭的时候保留最后一次 CK 数据
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		//2.4 指定从 CK 自动重启策略
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
		//2.5 设置状态后端
		env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
		//2.6 设置访问 HDFS 的用户名
		System.setProperty("HADOOP_USER_NAME", "atguigu");
		//3.创建 Flink-MySQL-CDC 的 Source
		//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
		//latest-offset: Never to perform snapshot on the monitored database tables upon first  startup, just read from the end of the binlog which means only have the changes since the connector was started.
		//timestamp: Never to perform snapshot on the monitored database tables upon first  startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
		//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
	    DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
			.hostname("hadoop102")
			.port(3306)
			.username("root")
			.password("000000")
			.databaseList("gmall-flink")
			.tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
			.startupOptions(StartupOptions.initial())
			.deserializer(new StringDebeziumDeserializationSchema())
			.build();
		//4.使用 CDC Source 从 MySQL 读取数据
		DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
		//5.打印数据
		mysqlDS.print();
		//6.执行任务
		env.execute();
	}
}

C、案例测试

(1) 打包并上传至 Linux

flinkcdc,Flink,flink,数据库,java

(2) 开启 MySQL Binlog 并重启 MySQL

(3) 启动 Flink 集群

[fancy@hadoop102 flink-standalone]$ bin/start-cluster.sh

(4) 启动 HDFS 集群

[fancy@hadoop102 flink-standalone]$ start-dfs.sh

(5) 启动程序

[fancy@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

(6) 在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据

(7) 给当前的 Flink 程序创建 Savepoint

[fancy@hadoop102 flink-standalone]$ bin/flink savepoint JobId 
hdfs://hadoop102:8020/flink/save

(8) 关闭程序以后从 Savepoint 重启程序

[fancy@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c 
com.fancy.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2. FlinkSQL 方式的应用

A、代码实现

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
	public static void main(String[] args) throws Exception {
		//1.创建执行环境
		StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
		//2.创建 Flink-MySQL-CDC 的 Source
		tableEnv.executeSql("CREATE TABLE user_info (" +
			" id INT," +
			" name STRING," +
			" phone_num STRING" +
			") WITH (" +
			" 'connector' = 'mysql-cdc'," +
			" 'hostname' = 'hadoop102'," +
			" 'port' = '3306'," +
			" 'username' = 'root'," +
			" 'password' = '000000'," +
			" 'database-name' = 'gmall-flink'," +
			" 'table-name' = 'z_user_info'" +
			")"
		);
		tableEnv.executeSql("select * from user_info").print();
		env.execute();
	}
}

3. 自定义反序列化器

A、代码实现

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
	public static void main(String[] args) throws Exception {
		//1.创建执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		//2.创建 Flink-MySQL-CDC 的 Source
		DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
			.hostname("hadoop102")
			.port(3306)
			.username("root")
			.password("000000")
			.databaseList("gmall-flink")
			.tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
			.startupOptions(StartupOptions.initial())
			.deserializer(
				new DebeziumDeserializationSchema<String>() { //自定义数据解析器
					@Override
					public void deserialize(SourceRecord sourceRecord, Collector<String>collector) throws Exception {
						//获取主题信息,包含着数据库和表名 mysql_binlog_source.gmall-flink.z_user_info
						String topic = sourceRecord.topic();
						String[] arr = topic.split("\\.");
						String db = arr[1];
						String tableName = arr[2];
						//获取操作类型 READ DELETE UPDATE CREATE
						Envelope.Operation operation =	Envelope.operationFor(sourceRecord);
						//获取值信息并转换为 Struct 类型
						Struct value = (Struct) sourceRecord.value();
						//获取变化后的数据
						Struct after = value.getStruct("after");
						//创建 JSON 对象用于存储数据信息
						JSONObject data = new JSONObject();
						for (Field field : after.schema().fields()) {
							Object o = after.get(field);
							data.put(field.name(), o);
						}
						//创建 JSON 对象用于封装最终返回值数据信息
						JSONObject result = new JSONObject();
						result.put("operation", operation.toString().toLowerCase());
						result.put("data", data);
						result.put("database", db);
						result.put("table", tableName);
						//发送数据至下游
						collector.collect(result.toJSONString());
				}
				@Override
				public TypeInformation<String> getProducedType() {
					return TypeInformation.of(String.class);
				}
		}).build();
		//3.使用 CDC Source 从 MySQL 读取数据
		DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
		//4.打印数据
		mysqlDS.print();
		//5.执行任务
		env.execute();
	}
}

三、Flink-CDC 2.0

1.1.x 痛点

flinkcdc,Flink,flink,数据库,java

2.设计目标

flinkcdc,Flink,flink,数据库,java

3.设计实现

A、整体概览

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

1.Chunk 切分;
2.Chunk 分配;(实现并行读取数据&CheckPoint)
3.Chunk 读取;(实现无锁读取)
4.Chunk 汇报;
5.Chunk 分配。

flinkcdc,Flink,flink,数据库,java
B、Chunk 切分

flinkcdc,Flink,flink,数据库,java
根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

C、Chunk 分配

flinkcdc,Flink,flink,数据库,java
将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标。

同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。

若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。

D、Chunk 读取

flinkcdc,Flink,flink,数据库,java
读取可以分为 5 个阶段

1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;
2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;
3)查询完成之后记录当前的 Binlog 位置信息记为高位点;
4)在增量部分消费从低位点到高位点的 Binlog;
5)根据主键,对 buffer 中的数据进行修正并输出。

通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个 Chunk 中的数据一致性。

E、Chunk 汇报

flinkcdc,Flink,flink,数据库,java
在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

F、Chunk 分配

flinkcdc,Flink,flink,数据库,java

FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 Snapshot Chunk 完成信息之后,还有一个消费增量数据(Binlog)的任务,此时是通过下发 Binlog Chunk 给任意一个 SourceReader 进行单并发读取来实现的。

四、核心原理分析

A、Binlog Chunk 中开始读取位置源码

MySqlHybridSplitAssigner

private MySqlBinlogSplit createBinlogSplit() {
	final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values().stream()
		.sorted(Comparator.comparing(MySqlSplit::splitId))
		.collect(Collectors.toList());
	Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
	final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
	final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
	BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
	for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
		// find the min binlog offset
		
		BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
		if (binlogOffset.compareTo(minBinlogOffset) < 0) {
			minBinlogOffset = binlogOffset;
		}
	finishedSnapshotSplitInfos.add(
		new FinishedSnapshotSplitInfo(
			split.getTableId(),
			split.splitId(),
			split.getSplitStart(),
			split.getSplitEnd(),
			binlogOffset)
		);
		tableSchemas.putAll(split.getTableSchemas());
	}
	final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
	return new MySqlBinlogSplit(
		BINLOG_SPLIT_ID,
		lastSnapshotSplit.getSplitKeyType(),
		minBinlogOffset,
		BinlogOffset.NO_STOPPING_OFFSET,
		finishedSnapshotSplitInfos,
		tableSchemas
	);
}

B、读取低位点到高位点之间的 Binlog

BinlogSplitReader文章来源地址https://www.toymoban.com/news/detail-658317.html

/**
* Returns the record should emit or not.
*
* <p>The watermark signal algorithm is the binlog split reader only sends thebinlog event that
* belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
* since the offset is after its high watermark.
*
* <pre> E.g: the data input is :
* snapshot-split-0 info : [0, 1024) highWatermark0
* snapshot-split-1 info : [1024, 2048) highWatermark1
* the data output is:
* only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
* only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
	if (isDataChangeRecord(sourceRecord)) {
		TableId tableId = getTableId(sourceRecord);
		BinlogOffset position = getBinlogPosition(sourceRecord);
		// aligned, all snapshot splits of the table has reached max highWatermark
		if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
			return true;
		}
		Object[] key =
			getSplitKey(
				currentBinlogSplit.getSplitKeyType(),
				sourceRecord,
				statefulTaskContext.getSchemaNameAdjuster()
			);
		for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
			if (RecordUtils.splitKeyRangeContains(key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) {
				return true;
			}
		}
		// not in the monitored splits scope, do not emit
		return false;
	}
	// always send the schema change event and signal event
	// we need record them to state of Flink
	return true;
}

到了这里,关于Flink CDC 详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(47)
  • 使用Flink CDC从数据库采集数据,保证数据不丢失:实现断点续传机制

    大数据技术在当前的数据分析和处理中扮演着重要的角色。Apache Flink作为一种快速、可靠的流处理引擎,在大规模数据处理中广受欢迎。本文将介绍如何使用Flink CDC(Change Data Capture)从数据库采集数据,并通过设置checkpoint来支持数据采集中断恢复,从而保证数据不丢失。

    2024年02月04日
    浏览(49)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(47)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(59)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(51)
  • flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

    大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读. 在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务, 然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交, 本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命

    2024年02月19日
    浏览(41)
  • Flink读取mysql数据库(java)

    代码如下: 运行结果如下:

    2024年02月12日
    浏览(43)
  • [大数据 Flink,Java实现不同数据库实时数据同步过程]

    目录 🌮前言: 🌮实现Mysql同步Es的过程包括以下步骤: 🌮配置Mysql数据库连接 🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置: 🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代

    2024年02月10日
    浏览(45)
  • 【大数据】Flink 详解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月25日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包