MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

这篇具有很好参考价值的文章主要介绍了MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行删除修改。关于Debezium格式的更多信息,参考Flink官网,网址如下。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/

相关依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>${flink.version}</version>
            <type>pom</type>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${flink.scala.version}</artifactId>
            <version>1.13.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>


        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${flink.scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.12.7-SNAPSHOT</version>
        </dependency>

    </dependencies>

FlinkCDC DataStream主程序

  • MySQL FlinkCDC DataStream方式时区有问题,快8小时,需要自己处理一下
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
        env.enableCheckpointing(5000L);

        //2.2 指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://master:8020/flink/cdc"));
        //2.6 设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "hadoop");

        String hostname = "88.88.88.888";
        int port = 3306;
        String username = "root";
        String password = "password";
        String databaseList = "test";
        String[] databases = databaseList.split(",");
        String tableList = "test.user";
        String[] tables = tableList.split(",");

        MySqlSource<String> mySQLSource = MySqlSource.<String>builder()
        .hostname(hostname)
        .port(port)
        .username(username)
        .password(password)
        .databaseList(databases)
        .tableList(tables)  //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
        .startupOptions(StartupOptions.initial())
        .deserializer(new MyDebeziumSchema())
        .build();

        DataStreamSource<String> mySQL_source = env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(), "MySQL Source");


        mySQL_source.print();

        // Kafka Sink
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka.bootstrap.servers=master:9092,node1:9092,node2:9092");
        properties.setProperty("transaction.timeout.ms", "300000");
        String topicName = "mysql_test_user";
        KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
                return new ProducerRecord<>(
                        topicName, // target topic
                        element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                topicName,             // target topic
                serializationSchema,    // serialization schema
                properties,             // producer config
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

        mySQL_source.addSink(myProducer);

        env.execute();

    }

自定义Debezium序列化器

public class MyDebeziumSchema implements DebeziumDeserializationSchema<String>, CustomConverter {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();

        String[] fileds = topic.split("\\.");

        String db = fileds[1];
        String tableName = fileds[2];
        Struct value = (Struct) sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();

        if(before != null){
            //获取列信息
            Schema schema = before.schema();
            List<Field> fields = schema.fields();

            for (Field field : fields) {
                // 调整时区减8个小时
                if("io.debezium.time.Timestamp".equals(field.schema().name())){
                    Object o = before.get(field);
                    if(o!=null) {
                        Long v = (Long) o - 8*60*60*1000;
                        beforeJson.put(field.name(),v);
                    }else {
                        beforeJson.put(field.name(),null);
                    }
                }
                else {
                    beforeJson.put(field.name(),before.get(field));
                }
            }
            //添加库名和表名
            beforeJson.put("db",db);
            beforeJson.put("tableName",tableName);
        }

        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();

        if(after != null){
            //获取列信息
            Schema schema = after.schema();
            List<Field> fields = schema.fields();
            for (Field field : fields) {
                if("io.debezium.time.Timestamp".equals(field.schema().name())){
                    // 调整时区减8个小时
                    Object o = after.get(field);
                    if(o!=null) {
                        Long v = (Long) o - 8*60*60*1000;
                        afterJson.put(field.name(),v);
                    }else {
                        afterJson.put(field.name(),null);
                    }
                }
                else {
                    afterJson.put(field.name(),after.get(field));
                }
                //添加库名和表名
                afterJson.put("db",db);
                afterJson.put("tableName",tableName);
            }
        }



        // 获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String op = operation.toString();

        // 构造DebeziumJson数据
        JSONObject debeziumJson = new JSONObject();

        if(op.equals("UPDATE")){
            // flink update 由 delete 和 create 构成
            debeziumJson.put("before",beforeJson);
            debeziumJson.put("after",null);
            debeziumJson.put("op","d");
            collector.collect(debeziumJson.toJSONString());

            debeziumJson.put("before",null);
            debeziumJson.put("after",afterJson);
            debeziumJson.put("op","c");
            collector.collect(debeziumJson.toJSONString());
        }else if(op.equals("CREATE") || op.equals("READ")){  //READ为全量数据
            debeziumJson.put("before",null);
            debeziumJson.put("after",afterJson);
            debeziumJson.put("op","c");
            collector.collect(debeziumJson.toJSONString());
        }else if(op.equals("DELETE")){
            debeziumJson.put("before",beforeJson);
            debeziumJson.put("after",null);
            debeziumJson.put("op","d");
            collector.collect(debeziumJson.toJSONString());
        }

       // collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void configure(Properties properties) {

    }

    @Override
    public void converterFor(ConvertedField convertedField, ConverterRegistration converterRegistration) {

    }
}

把Kafka数据接入ClickHouse文章来源地址https://www.toymoban.com/news/detail-553625.html

public class KafkaToClickHouse {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                //.inBatchMode()
                .build();


        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        tableEnvironment.executeSql("CREATE TABLE kafkaTable ( \n" +
                "id Int, \n" +
                "name String,\n"+
                "age Int, \n" +
                "`time` BIGINT, \n" +
                "db String, \n" +
                "tableName String \n" +
                ") WITH (" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'mysql_test_user',\n" +
                "  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset'," +
                "   'debezium-json.ignore-parse-errors' = 'true' ," +
                "  'format' = 'debezium-json'" +
                ")");



        tableEnvironment.executeSql("CREATE TABLE test_user (" +
                "id INT, \n" +
                "name String, \n" +
                "age Int, \n" +
                "`time` TIMESTAMP, \n" +
                "PRIMARY KEY (`id`) NOT ENFORCED\n" +
                ") WITH (" +
                "    'connector' = 'clickhouse'," +
                "    'url' = 'clickhouse://node2:8123'," +
                "    'database-name' = 'default'," +
                "    'table-name' = 'test_user'," +
                "    'username' = 'default'," +
                "    'password' = 'qwert', " +
                "    'sink.ignore-delete' = 'false', " +
                "    'catalog.ignore-primary-key' = 'false'," +
                "    'sink.batch-size' = '1',\n" +
                "    'sink.flush-interval' = '500'" +
                ")");






        TableResult tableResult = tableEnvironment.executeSql("insert into test_user \n" +
                "select " +
                "   id " +
                "   ,name " +
                "   ,age " +
                "   ,TO_TIMESTAMP(FROM_UNIXTIME(`time` / 1000, 'yyyy-MM-dd HH:mm:ss')) " +
                "from kafkaTable " +
                "where " +
                "   db = 'test' " +
                "   and tableName='user' ");

        tableResult.getJobClient().get().getJobExecutionResult().get();

    }
}

到了这里,关于MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • FlinkCDC for mysql to Clickhouse

    2024年02月08日
    浏览(31)
  • FlinkCDC 实时监控 MySQL

    通过 FlinkCDC 实现 MySQL 数据库、表的实时变化监控,这里只把变化打印了出来,后面会实现如何再写入其他 MySQL 库中; 在 my.cnf 中开启 binlog,我这里指定了 test 库,然后重启 MySQL 在 IDEA 中新建工程 flinkcdc pom.xml resources/log4j.properties 反序列化类: com/zsoft/flinkcdc/MyDeserialization

    2024年02月06日
    浏览(33)
  • 通过kafka connector实现mysql数据自动同步es

    整体思路: 1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列 2、通过listener监听消息队列,代码控制数据插入es ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后

    2024年02月08日
    浏览(46)
  • Clickhouse Engine kafka 将kafka数据同步clickhouse

    根据官方给出的kafka引擎文档,做一个实践记录。 官方地址:https://clickhouse.tech/docs/zh/engines/table-engines/integrations/kafka/ 1、特性介绍 clickhouse支持kafka的表双向同步,其中提供的为Kafka引擎。 其大致情况为如下情况:Kafka主题中存在对应的数据格式,Clickhouse创建一个Kafka引擎表(

    2024年01月16日
    浏览(48)
  • 使用FlinkCDC从mysql同步数据到ES,并实现数据检索

    随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。 于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。 springboot集成elasticSearch(附带工具类)

    2024年04月13日
    浏览(33)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(45)
  • FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    本文介绍了  来源单表-目标源单表同步,多来源单表-目标源单表同步。 注:1.16版本、1.17版本都可以使用火焰图,生产上最好关闭,详情见文章末尾 Flink版本:1.16.2 环境:Linux CentOS 7.0、jdk1.8 基础文件: flink-1.16.2-bin-scala_2.12.tgz、 flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:

    2024年02月11日
    浏览(46)
  • ClickHouse 与 Kafka 整合: 实时数据流处理与分析解决方案

    随着数据量的不断增长,实时数据处理和分析变得越来越重要。ClickHouse 和 Kafka 都是在现代数据技术中发挥着重要作用的工具。ClickHouse 是一个高性能的列式数据库,专为 OLAP 和实时数据分析而设计。Kafka 是一个分布式流处理平台,用于构建实时数据流管道和流处理应用程序

    2024年02月22日
    浏览(51)
  • Kafka实时数据同步

    目录 1 概述 2 捕获Oracle数据到Kafka 2.1 数据捕获设置 2.2 数据发布设置 2.3 捕获到发布数据流映射 2.4 查看任务执行日志 3 订阅Kafka数据到ClickHouse 3.1 数据订阅设置 3.2 数据加载设置 3.3 订阅到加载数据流映射 3.4 查看任务执行日志  4 校验数据一致性 BeeDI 支持实时捕获业务系统变

    2024年02月07日
    浏览(42)
  • 阿里云RDS MySQL 数据如何快速同步到 ClickHouse

    云数据库 RDS MySQL 和 云数据库 ClickHouse 是阿里云推出的两个备受欢迎的数据库解决方案,它们为用户提供了可靠的数据存储方案、分析数仓方案,本文介绍如何快速将 RDS MySQL 的数据同步到云数据库 ClickHouse。 如何快速将RDSMySQL的数据同步到云数据库 云数据库 RDS MySQL 和云数据

    2024年02月04日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包