十分钟掌握 Flink CDC,实现Mysql数据增量备份到Clickhouse [纯干货,建议收藏]

这篇具有很好参考价值的文章主要介绍了十分钟掌握 Flink CDC,实现Mysql数据增量备份到Clickhouse [纯干货,建议收藏]。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Clickhouse的优点.

  1. 真正的面向列的 DBMS

ClickHouse 是一个 DBMS,而不是一个单一的数据库。它允许在运行时创建表和数据库、加载数据和运行

查询,而无需重新配置和重新启动服务器。

  1. 数据压缩

一些面向列的 DBMS(InfiniDB CE 和 MonetDB)不使用数据压缩。但是,数据压缩确实提高了性能。

  1. 磁盘存储的数据

  2. 在多个服务器上分布式处理

  3. SQL支持

  4. 数据不仅按列存储,而且由矢量 - 列的部分进行处理,这使开发者能够实现高 CPU 性能

Clickhouse的缺点

  1. 没有完整的事务支持,

  2. 缺少完整的Update/Delete操作,缺少高频率、低延迟的修改或删除已存在数据的能力,仅能用于批量删

除或修改数据

  1. 聚合结果必须小于一台机器的内存大小:

  2. 不适合key-value存储,

什么时候不可以用Clickhouse?

  1. 事物性工作(OLTP)

  2. 高并发的键值访问

  3. Blob或者文档存储

  4. 超标准化的数据

Flink CDC

=========

Flink cdc connector 消费 Debezium 里的数据,经过处理再sink出来,这个流程还是相对比较简单的

flinkcdc clickhouse,程序员,flink,mysql,clickhouse

首先创建 Source 和 Sink(对应的依赖引用,在文末)

SourceFunction sourceFunction = MySQLSource.builder()

.hostname(“localhost”)

.port(3306)

.databaseList(“test”)

.username(“flinkcdc”)

.password(“dafei1288”)

.deserializer(new JsonDebeziumDeserializationSchema())

.build();

// 添加 source

env.addSource(sourceFunction)

// 添加 sink

.addSink(new ClickhouseSink());

这里用到的JsonDebeziumDeserializationSchema,是我们自定义的一个序列化类,用于将Debezium输出的数据,序列化

public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

Gson jsstr = new Gson();

HashMap<String, Object> hs = new HashMap<>();

String topic = sourceRecord.topic();

String[] split = topic.split(“[.]”);

String database = split[1];

String table = split[2];

hs.put(“database”,database);

hs.put(“table”,table);

//获取操作类型

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

//获取数据本身

Struct struct = (Struct)sourceRecord.value();

Struct after = struct.getStruct(“after”);

if (after != null) {

Schema schema = after.schema();

HashMap<String, Object> afhs = new HashMap<>();

for (Field field : schema.fields()) {

afhs.put(field.name(), after.get(field.name()));

}

hs.put(“data”,afhs);

}

String type = operation.toString().toLowerCase();

if (“create”.equals(type)) {

type = “insert”;

}

hs.put(“type”,type);

collector.collect(jsstr.toJson(hs));

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

这里是将数据序列化成如下Json格式

{“database”:“test”,“data”:{“name”:“jacky”,“description”:“fffff”,“id”:8},“type”:“insert”,“table”:“test_cdc”}

接下来就是要创建Sink,将数据变化存入Clickhouse中,这里我们仅以insert为例

public static class ClickhouseSink extends RichSinkFunction{

Connection connection;

PreparedStatement pstmt;

private Connection getConnection() {

Connection conn = null;

try {

Class.forName(“ru.yandex.clickhouse.ClickHouseDriver”);

String url = “jdbc:clickhouse://localhost:8123/default”;

conn = DriverManager.getConnection(url,“default”,“dafei1288”);

} catch (Exception e) {

e.printStackTrace();

}

return conn;

}

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

connection = getConnection();

String sql = “insert into sink_ch_test(id,name,description) values (?,?,?)”;

pstmt = connection.prepareStatement(sql);

}

// 每条记录插入时调用一次

public void invoke(String value, Context context) throws Exception {

//{“database”:“test”,“data”:{“name”:“jacky”,“description”:“fffff”,“id”:8},“type”:“insert”,“table”:“test_cdc”}

Gson t = new Gson();

HashMap<String,Object> hs = t.fromJson(value,HashMap.class);

String database = (String)hs.get(“database”);

String table = (String)hs.get(“table”);

String type = (String)hs.get(“type”);

if(“test”.equals(database) && “test_cdc”.equals(table)){

if(“insert”.equals(type)){

System.out.println("insert => "+value);

LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get(“data”);

String name = (String)data.get(“name”);

String description = (String)data.get(“description”);

Double id = (Double)data.get(“id”);

// 未前面的占位符赋值

pstmt.setInt(1, id.intValue());

pstmt.setString(2, name);

pstmt.setString(3, description);

pstmt.executeUpdate();

}

}

}

@Override

public void close() throws Exception {

super.close();

if(pstmt != null) {

pstmt.close();

}

if(connection != null) {

connection.close();

}

}

}

完整代码案例:

package name.lijiaqi.cdc;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;

import com.google.gson.Gson;

import com.google.gson.internal.LinkedTreeMap;

import io.debezium.data.Envelope;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

import org.apache.flink.util.Collector;

import org.apache.kafka.connect.source.SourceRecord;

import org.apache.kafka.connect.data.Field;

import org.apache.kafka.connect.data.Schema;

import org.apache.kafka.connect.data.Struct;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.util.HashMap;

public class MySqlBinlogSourceExample {

public static void main(String[] args) throws Exception {

SourceFunction sourceFunction = MySQLSource.builder()

.hostname(“localhost”)

.port(3306)

.databaseList(“test”)

.username(“flinkcdc”)

.password(“dafei1288”)

.deserializer(new JsonDebeziumDeserializationSchema())

.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 添加 source

env.addSource(sourceFunction)

// 添加 sink

.addSink(new ClickhouseSink());

env.execute(“mysql2clickhouse”);

}

// 将cdc数据反序列化

public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

Gson jsstr = new Gson();

HashMap<String, Object> hs = new HashMap<>();

String topic = sourceRecord.topic();

String[] split = topic.split(“[.]”);

String database = split[1];

String table = split[2];

hs.put(“database”,database);

hs.put(“table”,table);

//获取操作类型

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

//获取数据本身

Struct struct = (Struct)sourceRecord.value();

Struct after = struct.getStruct(“after”);

if (after != null) {

Schema schema = after.schema();

HashMap<String, Object> afhs = new HashMap<>();

for (Field field : schema.fields()) {

afhs.put(field.name(), after.get(field.name()));

}

hs.put(“data”,afhs);

}

String type = operation.toString().toLowerCase();

if (“create”.equals(type)) {

type = “insert”;

}

hs.put(“type”,type);

collector.collect(jsstr.toJson(hs));

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

public static class ClickhouseSink extends RichSinkFunction{

Connection connection;

PreparedStatement pstmt;

private Connection getConnection() {

Connection conn = null;

try {

Class.forName(“ru.yandex.clickhouse.ClickHouseDriver”);

String url = “jdbc:clickhouse://localhost:8123/default”;

conn = DriverManager.getConnection(url,“default”,“dafei1288”);

} catch (Exception e) {

e.printStackTrace();

}

return conn;

}

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

connection = getConnection();

String sql = “insert into sink_ch_test(id,name,description) values (?,?,?)”;

pstmt = connection.prepareStatement(sql);

}

// 每条记录插入时调用一次

public void invoke(String value, Context context) throws Exception {

//{“database”:“test”,“data”:{“name”:“jacky”,“description”:“fffff”,“id”:8},“type”:“insert”,“table”:“test_cdc”}

Gson t = new Gson();

HashMap<String,Object> hs = t.fromJson(value,HashMap.class);

String database = (String)hs.get(“database”);

String table = (String)hs.get(“table”);

String type = (String)hs.get(“type”);

if(“test”.equals(database) && “test_cdc”.equals(table)){

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。flinkcdc clickhouse,程序员,flink,mysql,clickhouse

flinkcdc clickhouse,程序员,flink,mysql,clickhouse

flinkcdc clickhouse,程序员,flink,mysql,clickhouse

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

flinkcdc clickhouse,程序员,flink,mysql,clickhouse

最后

一次偶然,从朋友那里得到一份“java高分面试指南”,里面涵盖了25个分类的面试题以及详细的解析:JavaOOP、Java集合/泛型、Java中的IO与NIO、Java反射、Java序列化、Java注解、多线程&并发、JVM、Mysql、Redis、Memcached、MongoDB、Spring、Spring Boot、Spring Cloud、RabbitMQ、Dubbo 、MyBatis 、ZooKeeper 、数据结构、算法、Elasticsearch 、Kafka 、微服务、Linux。

这不,马上就要到招聘季了,很多朋友又开始准备“金三银四”的春招啦,那我想这份“java高分面试指南”应该起到不小的作用,所以今天想给大家分享一下。

flinkcdc clickhouse,程序员,flink,mysql,clickhouse

请注意:关于这份“java高分面试指南”,每一个方向专题(25个)的题目这里几乎都会列举,在不看答案的情况下,大家可以自行测试一下水平 且由于篇幅原因,这边无法展示所有完整的答案解析
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
dnimg.cn/images/e5c14a7895254671a72faed303032d36.jpg" alt=“img” style=“zoom: 33%;” />

最后

一次偶然,从朋友那里得到一份“java高分面试指南”,里面涵盖了25个分类的面试题以及详细的解析:JavaOOP、Java集合/泛型、Java中的IO与NIO、Java反射、Java序列化、Java注解、多线程&并发、JVM、Mysql、Redis、Memcached、MongoDB、Spring、Spring Boot、Spring Cloud、RabbitMQ、Dubbo 、MyBatis 、ZooKeeper 、数据结构、算法、Elasticsearch 、Kafka 、微服务、Linux。

这不,马上就要到招聘季了,很多朋友又开始准备“金三银四”的春招啦,那我想这份“java高分面试指南”应该起到不小的作用,所以今天想给大家分享一下。

[外链图片转存中…(img-FkTyyqdI-1712406568279)]

请注意:关于这份“java高分面试指南”,每一个方向专题(25个)的题目这里几乎都会列举,在不看答案的情况下,大家可以自行测试一下水平 且由于篇幅原因,这边无法展示所有完整的答案解析
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!文章来源地址https://www.toymoban.com/news/detail-850954.html

到了这里,关于十分钟掌握 Flink CDC,实现Mysql数据增量备份到Clickhouse [纯干货,建议收藏]的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 十分钟实现 Android Camera2 相机预览

    因为工作中要使用 Android Camera2 API ,但因为 Camera2 比较复杂,网上资料也比较乱,有一定入门门槛,所以花了几天时间系统研究了下,并在 CSDN 上记录了下,希望能帮助到更多的小伙伴。 Camera2 API 的包名是 android.hardware.camera2 ,是 Android 5.0 后推出的一套调用摄像头设备的接口

    2024年02月13日
    浏览(48)
  • 十分钟实现 Android Camera2 相机拍照

    因为工作中要使用 Android Camera2 API ,但因为 Camera2 比较复杂,网上资料也比较乱,有一定入门门槛,所以花了几天时间系统研究了下,并在 CSDN 上记录了下,希望能帮助到更多的小伙伴。 上篇文章 我们使用 Camera2 实现了相机预览的功能,这篇文章我们接着上文,来实现 Cam

    2024年02月11日
    浏览(39)
  • 十分钟带汝入门大数据开发语言Scala

    大家好,我是百思不得小赵。 创作时间:2022 年 6 月 7 日 博客主页: 🔍点此进入博客主页 —— 新时代的农民工 🙊 —— 换一种思维逻辑去看待这个世界 👀 今天是加入CSDN的第1193天。觉得有帮助麻烦👏点赞、🍀评论、❤️收藏 Scala是一门多范式的编程语言,一种类似Ja

    2024年02月02日
    浏览(39)
  • 《每天十分钟》-红宝书第4版-语言基础-数据类型(一)

    关于ECMAScript 数据类型,“非常6+1” 6:六种简单数据类型(也称为原始类型) Undefined Null Boolean Number String Symbol(ES6新增) 1:一种复杂数据类型 Object 使用 typeof 操作符 (注意是操作符)可以判断一个变量的数据类型 \\\"undefined\\\"表示值未定义; \\\"boolean\\\"表示值为布尔值; \\\"string\\\"表示

    2024年02月13日
    浏览(29)
  • 《每天十分钟》-红宝书第4版-语言基础-数据类型(五)

    这个符号作为一个属性表示“一个布尔值,如果是 true,则意味着对象应 该用 Array.prototype.concat()打平其数组元素”。ES6 中的 Array.prototype.concat()方法会 根 据 接 收 到 的 对 象 类 型 选 择 如 何 将 一 个 类 数 组 对 象 拼 接 成 数 组 实 例 。 覆 盖 Symbol.isConcat- Spreadable 的值可

    2024年02月14日
    浏览(32)
  • 使用cpolar内网穿透实现公网远程访问,十分钟就可以学会使用

    1.自己有公网IP,进入路由器做映射 2.自己有公网服务器搭建内网穿透 3.通过第三方公网服务器进行流量转发,映射本地端口 比较常见是第三种方式,不需要自己搭建服务,也不用去申请公网IP、不用设置路由器,不论是本地开发测试,远程联机还是远程访问都支持,随时可用

    2024年02月12日
    浏览(40)
  • Prometheus技术文档--基本安装-docker安装并挂载数据卷-《十分钟搭建》

    宿主机挂载目录位置: 以及准备对应的挂载目录: /usr/local/docker/promethues/server 准备如下: data、config、rules、ClientAll、server   授权相关文件夹权限  目标容器位置: /etc/prometheus/prometheus.yml 使用代码编辑配置文件: 书写如下配置:  解释配置: 这个 prometheus.yml 文件是Prome

    2024年02月14日
    浏览(30)
  • Grafana技术文档--基本安装-docker安装并挂载数据卷-《十分钟搭建》

    阿丹: Prometheus技术文档--基本安装-docker安装并挂载数据卷-《十分钟搭建》_一单成的博客-CSDN博客         在正确安装了Prometheus之后开始使用并安装Grafana作为Prometheus的仪表盘。 搜索可拉取版本  拉取镜像       访问{ip}:3000 即可,使用账号密码 admin/admin进行登录即可 请按照

    2024年02月14日
    浏览(30)
  • 还在苦恼如何开发一个Chrome插件吗?十分钟带你实现一个实用小插件

    你是否曾考虑过创建自己的 Chrome 插件,但又挠头毫无思路?那么在接下来的几分钟里,我不仅会介绍 Chrome 浏览器扩展的基本知识,还会指导你通过五个简单的步骤来制作自己的扩展。 知道怎么做吗?让我们一探究竟! 今年我们见证了人工智能能力的爆炸式增长。虽然cha

    2024年02月10日
    浏览(31)
  • 零编程经验,通过 GPT-4 十分钟开发了一个浏览器插件,并成功运行,实现了需求目标!

    大佬蓝鸟ID: sundyme 零编程经验,通过 GPT-4 十分钟开发了一个浏览器插件,并成功运行,实现了需求目标!太不可思意了,真正体会到了自然语言编程的魅力! 下一步是利用Pinterest 的 API 接口实现自动发图,已经生成好了代码和步骤(看着挺靠谱),等明天开发者权限审核下

    2023年04月08日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包