FlinkCDC for mysql to Clickhouse

这篇具有很好参考价值的文章主要介绍了FlinkCDC for mysql to Clickhouse。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

完整依赖

<dependencies>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-core</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-streaming-java_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
<!--       <dependency>-->
<!--           <groupId>org.apache.flink</groupId>-->
<!--           <artifactId>flink-jdbc_2.12</artifactId>-->
<!--           <version>1.10.3</version>-->
<!--       </dependency>-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-jdbc_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-java</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-clients_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-common</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
 
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner-blink_2.12</artifactId>
           <version>1.13.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-table-planner-blink_2.12</artifactId>
           <version>1.13.0</version>
           <type>test-jar</type>
       </dependency>
 
       <dependency>
           <groupId>com.alibaba.ververica</groupId>
           <artifactId>flink-connector-mysql-cdc</artifactId>
           <version>1.4.0</version>
       </dependency>
 
 
       <dependency>
           <groupId>com.aliyun</groupId>
           <artifactId>flink-connector-clickhouse</artifactId>
           <version>1.12.0</version>
       </dependency>
       <dependency>
           <groupId>ru.yandex.clickhouse</groupId>
           <artifactId>clickhouse-jdbc</artifactId>
           <version>0.2.6</version>
       </dependency>
       <dependency>
           <groupId>com.google.code.gson</groupId>
           <artifactId>gson</artifactId>
           <version>2.8.6</version>
       </dependency>
   </dependencies>

Flink CDC

package name.lijiaqi.cdc;
 
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
 
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;
 
public class MySqlBinlogSourceExample {
   public static void main(String[] args) throws Exception {
       SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
              .hostname("localhost")
              .port(3306)
              .databaseList("test")
              .username("flinkcdc")
              .password("dafei1288")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .build();
 
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
       // 添加 source
       env.addSource(sourceFunction)
       // 添加 sink
      .addSink(new ClickhouseSink());
 
       env.execute("mysql2clickhouse");
  }
 
   // 将cdc数据反序列化
   public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
       @Override
       public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
 
           Gson jsstr = new Gson();
           HashMap<String, Object> hs = new HashMap<>();
 
           String topic = sourceRecord.topic();
           String[] split = topic.split("[.]");
           String database = split[1];
           String table = split[2];
           hs.put("database",database);
           hs.put("table",table);
           //获取操作类型
           Envelope.Operation operation = Envelope.operationFor(sourceRecord);
           //获取数据本身
           Struct struct = (Struct)sourceRecord.value();
           Struct after = struct.getStruct("after");
 
           if (after != null) {
               Schema schema = after.schema();
               HashMap<String, Object> afhs = new HashMap<>();
               for (Field field : schema.fields()) {
                   afhs.put(field.name(), after.get(field.name()));
              }
               hs.put("data",afhs);
          }
 
           String type = operation.toString().toLowerCase();
           if ("create".equals(type)) {
               type = "insert";
          }
           hs.put("type",type);
 
           collector.collect(jsstr.toJson(hs));
      }
 
       @Override
       public TypeInformation<String> getProducedType() {
           return BasicTypeInfo.STRING_TYPE_INFO;
      }
  }
 
 
   public static class ClickhouseSink extends RichSinkFunction<String>{
       Connection connection;
       PreparedStatement pstmt;
       private Connection getConnection() {
           Connection conn = null;
           try {
               Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
               String url = "jdbc:clickhouse://localhost:8123/default";
               conn = DriverManager.getConnection(url,"default","dafei1288");
 
          } catch (Exception e) {
               e.printStackTrace();
          }
           return conn;
      }
 
       @Override
       public void open(Configuration parameters) throws Exception {
           super.open(parameters);
           connection = getConnection();
           String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
           pstmt = connection.prepareStatement(sql);
      }
 
       // 每条记录插入时调用一次
       public void invoke(String value, Context context) throws Exception {
           //{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}
           Gson t = new Gson();
           HashMap<String,Object> hs = t.fromJson(value,HashMap.class);
           String database = (String)hs.get("database");
           String table = (String)hs.get("table");
           String type = (String)hs.get("type");
 
           if("test".equals(database) && "test_cdc".equals(table)){
               if("insert".equals(type)){
                   System.out.println("insert => "+value);
                   LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
                   String name = (String)data.get("name");
                   String description = (String)data.get("description");
                   Double id = (Double)data.get("id");
                   // 未前面的占位符赋值
                   pstmt.setInt(1, id.intValue());
                   pstmt.setString(2, name);
                   pstmt.setString(3, description);
 
                   pstmt.executeUpdate();
              }
          }
      }
 
       @Override
       public void close() throws Exception {
           super.close();
 
           if(pstmt != null) {
               pstmt.close();
          }
 
           if(connection != null) {
               connection.close();
          }
      }
  }
}

Flink SQL CDC

package name.lijiaqi.cdc;
 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
public class MysqlToMysqlMain {
   public static void main(String[] args) throws Exception {
       EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build();
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
 
 
 
       tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
 
 
       // 数据源表
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING\n" +
                       ") WITH (\n" +
                       " 'connector' = 'mysql-cdc',\n" +
                       " 'hostname' = 'localhost',\n" +
                       " 'port' = '3306',\n" +
                       " 'username' = 'flinkcdc',\n" +
                       " 'password' = 'dafei1288',\n" +
                       " 'database-name' = 'test',\n" +
                       " 'table-name' = 'test_cdc'\n" +
                       ")";
 
 
       String url = "jdbc:mysql://127.0.0.1:3306/test";
       String userName = "root";
       String password = "dafei1288";
       String mysqlSinkTable = "test_cdc_sink";
       // 输出目标表
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING,\n" +
                       " PRIMARY KEY (id) NOT ENFORCED \n " +
                       ") WITH (\n" +
                       " 'connector' = 'jdbc',\n" +
                       " 'driver' = 'com.mysql.jdbc.Driver',\n" +
                       " 'url' = '" + url + "',\n" +
                       " 'username' = '" + userName + "',\n" +
                       " 'password' = '" + password + "',\n" +
                       " 'table-name' = '" + mysqlSinkTable + "'\n" +
                       ")";
       // 简单的聚合处理
       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";
 
       tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
       TableResult result = tableEnv.executeSql(transformSQL);
 
       // 等待flink-cdc完成快照
       result.print();
       env.execute("sync-flink-cdc");
  }
 
}

文章来源地址https://www.toymoban.com/news/detail-719372.html

到了这里,关于FlinkCDC for mysql to Clickhouse的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Clickhouse: One table to rule them all!

    前面几篇笔记我们讨论了存储海量行情数据的个人技术方案。它们之所以被称之为个人方案,并不是因为性能弱,而是指在这些方案中,数据都存储在本地,也只适合单机查询。 数据源很贵 – 在这个冬天,我们已经听说,某些上了规模的机构,也在让员工共享万得账号了。

    2024年01月19日
    浏览(28)
  • clickhouse实时同步MySQL数据

    两种方式         1、使用clickhouse表引擎,直接从MySQL中读取数据(针对表),如果业务需求不是很复杂,可以选择此方式,需要哪张表就配置哪张表,操作简单,数据实时同步;         2、使用clickhouse数据库引擎,同步MySQL数据库,配置稍微复杂一点,我是没有配置成功,

    2024年02月15日
    浏览(46)
  • ClickHouse和MySQL的区别

    ClickHouse和MySQL是两种不同的数据库管理系统,它们具有一些区别和特点。 数据存储结构:ClickHouse是一种列式存储数据库,它以列为单位进行数据存储和处理。这种存储方式在处理大量数据时非常高效,特别适用于分析查询。而MySQL是一种行式存储数据库,以行为单位存储数据

    2024年02月15日
    浏览(42)
  • mysql、clickhouse时间日期加法

    在’2023-10-27 23:59:59’上增加5秒: 按秒: 结果 = toDateTime(‘2022-01-02 23:00:00’) - toDateTime(‘2022-01-01 20:00:00’) 按时:

    2024年02月05日
    浏览(46)
  • 从ClickHouse通往MySQL的几条道路

    ClickHouse 是 Yandex(俄罗斯最大的搜索引擎)开源的一个用于实时数据分析的基于列存储的数据库,其处理数据的速度比传统方法快 100-1000 倍。ClickHouse 的性能超过了目前市场上可比的面向列的 DBMS,每秒钟每台服务器每秒处理数亿至十亿多行和数十千兆字节的数据。它是一个

    2024年02月05日
    浏览(38)
  • 阿里云RDS MySQL 数据如何快速同步到 ClickHouse

    云数据库 RDS MySQL 和 云数据库 ClickHouse 是阿里云推出的两个备受欢迎的数据库解决方案,它们为用户提供了可靠的数据存储方案、分析数仓方案,本文介绍如何快速将 RDS MySQL 的数据同步到云数据库 ClickHouse。 如何快速将RDSMySQL的数据同步到云数据库 云数据库 RDS MySQL 和云数据

    2024年02月04日
    浏览(40)
  • spark 集成 ClickHouse 和 MySQL (读和写操作)(笔记)

    目录 前言: 一.spark读出 1. spark 读出 MySQL表数据 1.2 spark 读出 ClickHouse表数据  二.spark写入 1. spark 写入 MySQL表数据  2.spark 写入 ClickHouse表数据 这篇文章主要记录的是用spark集成ClickHouse和MySQL, 将数据read出,和将数据write写入表的 (记录笔记) 因为这个不是重点,所以先简单创

    2024年02月07日
    浏览(38)
  • SpringBoot 整合 clickhouse和mysql 手把手教程全网最详细

    最近做一个项目 需要 整合mysql clickhouse 多数据源 后台用的是ruoyi框架 需要注意的是官网不建议使用ru.yandex.clickhouse驱动,应该改成com.clickhouse驱动,并且推荐使用0.3.2以上的版本 重点在@DataSource(value = DataSourceType.SLAVE) 注解上 在这里切换从库代表这个类里面的方法都切换成从库

    2024年02月06日
    浏览(44)
  • mysql、clickhouse查询数据库所有的表以及字段信息

    mysql查询数据库所有的表以及字段信息 SELECT     table_schema 数据库名,   table_name 表名,   COLUMN_NAME 列名,   COLUMN_TYPE 数据类型,   DATA_TYPE 字段类型,   CHARACTER_MAXIMUM_LENGTH 长度,   IS_NULLABLE 是否为空,   COLUMN_DEFAULT 默认值,   COLUMN_COMMENT 备注  FROM  INFORMATION_SCHEMA.COLUMNS where -- tab

    2024年02月08日
    浏览(65)
  • docker安装mysql、clickhouse、oracle等各种数据库汇总

    1:docker 安装mongo数据库并使用 官网:https://www.mongodb.com/docs/manual/ mongo shell教程1:http://c.biancheng.net/mongodb2/connection.html 安装1 :https://www.zhihu.com/question/54602953/answer/3047452434?utm_id=0 安装2:https://www.duidaima.com/Group/Topic/ArchitecturedDesign/9182 使用驱动进行java开发:https://mongodb.github.

    2024年02月10日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包