Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,本文依赖Flink 集群环境、mysql。
本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版文章来源地址https://www.toymoban.com/news/detail-827770.html
一、Flink CDC Connectors介绍
本文介绍的CDC是基于2.4版本,当前版本已经发布至3.0,本Flink 专栏介绍是基于Flink 1.17版本,CDC 2.4版本支持到1.17版本。
1、CDC Connectors介绍及架构图
Apache Flink®的CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。
了解更多关于Debezium的信息。
或者参考:37、Flink 的CDC 格式:debezium部署以及mysql示例
2、支持的连接
3、Flink CDC与 Flink 版本关系
4、特性介绍
- 支持读取数据库快照,并在处理失败后立即继续读取binlog。
- CDC连接器用于DataStream API,用户可以在一个作业中使用多个数据库和表的更改,而无需部署Debezium和Kafka。
- 用于Table/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源以监视单个表上的更改。
下表显示了连接器的当前功能:
5、flink sql client集成flink cdc
1)、集成步骤
1、需要有一个flink的集群环境
具体搭建参考:2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
2、下载flink cdc的jar并放在FLINK_HOME/lib/目录下面
下载地址:https://github.com/ververica/flink-cdc-connectors/releases
3、重启flink集群
2)、示例:捕获mysql的user表数据变化情况
本示例的前提是设置好了binlog,具体设置方式可以参考文章:
37、Flink 的CDC 格式:debezium部署以及mysql示例
Flink SQL> CREATE TABLE mysql_binlog_user (
> id INT NOT NULL,
> name STRING,
> age INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.168.10.44',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'cdctest',
> 'table-name' = 'user'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from mysql_binlog_user;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 4 | test456 | 8888 |
| +I | 2 | alanchan | 20 |
| +I | 3 | alanchanchn | 33 |
| +I | 1 | alan | 18 |
| -U | 4 | test456 | 8888 |
| +U | 4 | test123 | 8888 |
| -U | 4 | test123 | 8888 |
| +U | 4 | test123 | 66666 |
| -D | 4 | test123 | 66666 |
| +I | 4 | alanchanchn2 | 100 |
Flink SQL> select name ,sum(age) from mysql_binlog_user group by name;
+----+--------------------------------+-------------+
| op | name | EXPR$1 |
+----+--------------------------------+-------------+
| +I | alanchanchn2 | 100 |
| +I | alanchan | 20 |
| +I | alanchanchn | 33 |
| +I | alan | 18 |
6、flink datastream API集成flink cdc
本示例是捕获mysql cdctest库的user表数据变化情况。
1)、maven依赖
使用flink cdc添加如下依赖即可,但flink本身的运行环境相关依赖需要添加。
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
2)、代码实现
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.extern.slf4j.Slf4j;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Slf4j
public class TestFlinkCDCFromMysqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.10.44")
.port(3306)
.databaseList("cdctest") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
.tableList("cdctest.user") // 设置捕获的表
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
DataStream<String> result = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
log.info(result.toString());
result.map(new MapFunction<String,String>() {
@Override
public String map(String value) throws Exception {
log.info("value ======={}",value);
return value;
}
});
env.execute();
}
}
3)、验证
在程序运行起来后,对cdctest.user表的数据进行添加、修改、删除操作,观察程序控制台日志输出情况
08:50:26.819 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":2,"name":"alanchan","age":20},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626222,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":3,"name":"alanchanchn","age":33},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":1,"name":"alan","age":18},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626221,"transaction":null}
08:50:26.822 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":4,"name":"test456","age":999000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
一月 22, 2024 8:50:27 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect信息:
Connected to 192.168.10.44:3306 at alan_master_logbin.000004/10816 (sid:6116, cid:565)
08:50:56.030 [Source: MySQL Source -> Map (1/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":{"id":4,"name":"test456","age":999000},"after":{"id":4,"name":"test456","age":8888},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1705884032000,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":11010,"row":0,"thread":557,"query":null},"op":"u","ts_ms":1705884655747,"transaction":null}
4)、debezium数据格式介绍
关于debezium更多的信息可以参考:37、Flink 的CDC 格式:debezium部署以及mysql示例
在flink cdc的版本中,不需要特别对debezium数据格式进行处理,默认的形如下面的内容,也即不带schema的,解析方式参考上例。
{
"before": {
"name": "alan_test",
"scores": 666.0
},
"after": {
"name": "alan_test",
"scores": 888.0
},
"source": {
"version": "1.7.2.Final",
"connector": "mysql",
"name": "ALAN",
"ts_ms": 1705717298000,
"snapshot": "false",
"db": "cdctest",
"sequence": null,
"table": "userscoressink",
"server_id": 1,
"gtid": null,
"file": "alan_master_logbin.000004",
"pos": 4931,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1705717772785,
"transaction": null
}
在某些情况下可能需要带schema的,形如下例,
如果需要解析则需要将JsonDebeziumDeserializationSchema()改成JsonDebeziumDeserializationSchema(true)
一般推荐使用系统默认的,不带schema的数据格式。
{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "name"
}, {
"type": "double",
"optional": true,
"field": "scores"
}],
"optional": true,
"name": "ALAN.cdctest.userscoressink.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "name"
}, {
"type": "double",
"optional": true,
"field": "scores"
}],
"optional": true,
"name": "ALAN.cdctest.userscoressink.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": true,
"field": "sequence"
}, {
"type": "string",
"optional": true,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "server_id"
}, {
"type": "string",
"optional": true,
"field": "gtid"
}, {
"type": "string",
"optional": false,
"field": "file"
}, {
"type": "int64",
"optional": false,
"field": "pos"
}, {
"type": "int32",
"optional": false,
"field": "row"
}, {
"type": "int64",
"optional": true,
"field": "thread"
}, {
"type": "string",
"optional": true,
"field": "query"
}],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"field": "transaction"
}],
"optional": false,
"name": "ALAN.cdctest.userscoressink.Envelope"
},
"payload": {
"before": {
"name": "alan_test",
"scores": 666.0
},
"after": {
"name": "alan_test",
"scores": 888.0
},
"source": {
"version": "1.7.2.Final",
"connector": "mysql",
"name": "ALAN",
"ts_ms": 1705717298000,
"snapshot": "false",
"db": "cdctest",
"sequence": null,
"table": "userscoressink",
"server_id": 1,
"gtid": null,
"file": "alan_master_logbin.000004",
"pos": 4931,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1705717772785,
"transaction": null
}
}
以上,本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。文章来源:https://www.toymoban.com/news/detail-827770.html
本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版
到了这里,关于60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!