实测解决 flink cdc mysql 时间字段差8小时/差13小时问题

这篇具有很好参考价值的文章主要介绍了实测解决 flink cdc mysql 时间字段差8小时/差13小时问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        //自定义时间转换配置
        properties.setProperty("converters", "dateConverters");
        properties.setProperty("dateConverters.type", "com.ysservice.utils.MySqlDateTimeConverter");

        //构建mysqlSource
        MySqlSource mysqlCdcSource = MySqlSource.<String>builder()
                .hostname(FlinkConfig.source_hostname)
                .port(3306)
                .databaseList(databaseNameArray) // set captured database
                .tableList(inputTableArray) // set captured table
                .username(FlinkConfig.source_username)
                .password(FlinkConfig.source_password)
                .serverId(FlinkConfig.source_serverId)
                .serverTimeZone("Asia/Shanghai")
                .debeziumProperties(properties)
                .deserializer(new DorisJsonDebeziumDeserializationSchema(inputOutputTableName,tableConfigBean)) // 传入gp的表名映射
                .build();
package com.ysservice.utils;

import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
 * @author: WuBo
 * @date:2022/10/11 11:50
 */
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;

    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;

    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void configure(Properties props) {

    }

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {

        String sqlType = column.typeName().toUpperCase();

        SchemaBuilder schemaBuilder = null;

        Converter converter = null;

        if ("DATE".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");

            converter = this::convertDate;

        }

        if ("TIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");

            converter = this::convertTime;

        }

        if ("DATETIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");

            converter = this::convertDateTime;


        }

        if ("TIMESTAMP".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");

            converter = this::convertTimestamp;

        }

        if (schemaBuilder != null) {

            registration.register(schemaBuilder, converter);

        }

    }


    private String convertDate(Object input) {

        if (input == null) return null;

        if (input instanceof LocalDate) {

            return dateFormatter.format((LocalDate) input);

        }

        if (input instanceof Integer) {

            LocalDate date = LocalDate.ofEpochDay((Integer) input);

            return dateFormatter.format(date);

        }

        return String.valueOf(input);

    }


    private String convertTime(Object input) {

        if (input == null) return null;

        if (input instanceof Duration) {

            Duration duration = (Duration) input;

            long seconds = duration.getSeconds();

            int nano = duration.getNano();

            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);

            return timeFormatter.format(time);

        }

        return String.valueOf(input);

    }


    private String convertDateTime(Object input) {

        if (input == null) return null;

        if (input instanceof LocalDateTime) {

            return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");

        }

        return String.valueOf(input);

    }


    private String convertTimestamp(Object input) {

        if (input == null) return null;

        if (input instanceof ZonedDateTime) {

            // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间

            ZonedDateTime zonedDateTime = (ZonedDateTime) input;

            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();

            return timestampFormatter.format(localDateTime).replaceAll("T", " ");

        }
        return String.valueOf(input);
    }
}


关键代码:

实测解决 flink cdc mysql 时间字段差8小时/差13小时问题,flink,mysql,flink,java
其中的:com.ysservice.utils.MySqlDateTimeConverter,根据自己的MySqlDateTimeConverter类路径进行修改

全量阶段和增量阶段的时间问题还不一样,实测本方式能全部解决,解决的同学记得回来点个赞!文章来源地址https://www.toymoban.com/news/detail-589073.html

到了这里,关于实测解决 flink cdc mysql 时间字段差8小时/差13小时问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink mysql cdc调试问题记录

    最近需要用到flink cdc作为数据流处理框架,在demo运行中发现一些问题,特此记录问题和解决过程。 Caused by: java.lang.IllegalArgumentException: Can\\\'t find any matched tables, please check your configured database-name: [localdb] and table-name: [flink_cdc_message] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.

    2023年04月17日
    浏览(54)
  • Flink学习13-Flink CDC

    一、CDC简介 cdc全称 Change Data Capture 变更数据捕获。通俗来讲只要能捕获到变更的数据的技术都可以称为cdc。常见的开源技术有以下几种: canal:https://github.com/alibaba/canal maxwell:https://github.com/zendesk/maxwell Debezium:https://github.com/debezium/debezium flink-cdc:https://github.com/ververica/fli

    2024年01月16日
    浏览(41)
  • 解决服务器时间与实际时间相差8小时问题

    ​ 在实际部署项目到服务器运行时,我们通常会发现程序在服务器上执行的时间实际上时比现实时间要慢8小时的,其实这是因为本地时区和服务器的时区不一样,差了8个小时导致,那么如何去纠正8小时的时差呢?以下是我常用的解决方法 1. 在使用Jar命令启动服务时添加-Duser.ti

    2024年02月09日
    浏览(41)
  • docker tomcat时间少8小时问题解决

    docker容器与系统时间一致并且正确,但是java程序在运行中通过log日志发现发了8小时 解决方法 修改docker容器中tomcat/bin/catalina.sh文件,添加一下内容 JAVA_OPTS=\\\"$JAVA_OPTS -Dfile.encoding=UTF8 -Duser.timezone=GMT+08\\\" 附 操作命令 因docker容器中没有vi,所以需要将其cp出来,修改后,在cp进容器

    2024年02月12日
    浏览(27)
  • Java项目时间字段问题-MySQL

    Java项目时间字段问题-MySQL ​ 在 Java 项目中,与 MySQL 数据库中的时间字段对应的 Java 类型通常使用 java.time 包中的类,这是从 Java 8 开始引入的日期和时间 API。以下是常见的时间字段和它们在 Java 实体类中的对应类型示例: 1、DATETIME 或 TIMESTAMP ​ 在 MySQL 中, DATETIME 和 TIME

    2024年02月10日
    浏览(41)
  • flink 1.13.x集成 CDC 2.3.0

    原因 https://github.com/ververica/flink-cdc-connectors/pull/1407 简单讲,Flink 运行机器时区和Mysql Server 时区不匹配, database.serverTimezone 配置配置影响 具体代码可以查看CDC com.ververica.cdc.connectors.mysql.MySqlValidator#checkTimeZone 解决办法 手动指定下Flink 运行的时区,和连接的数据库时区信息保持

    2024年02月16日
    浏览(36)
  • flink cdc MySQL2Doris 案例分享 解决分库多表同步

    使用flink cdc,完成mysql 多库 多表同时同步到doris中 flink 1.14.4 doris 1.1.0 flink-connector-mysql-cdc 2.2.1版本 一直会报异常 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder 从官网下载依赖,然后本地添加进去flink-sql-connector-mysql-cdc-2.2.0 由于 U

    2023年04月09日
    浏览(44)
  • 11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

    问题是来自于 群友, 2024.03.29, 也是花了一些时间 来排查这个问题  大致的问题是用 mysql-cdc 连接了一个 mysql-pxc 集群, 然后创建了一个 test_user 表  使用 \\\"select * from test_user\\\" 获取数据表的数据, 可以拿到 查询时的快照, 但是 无法获取到后续对于 test_user 表的增量操作的数据, 比如

    2024年04月15日
    浏览(52)
  • 【现场问题】flink-cdc,Oracle2Mysql的坑,Oracle区分大小写导致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入为空的数值。 为什么会报这个错误,我们来看DML的执行语句: insert into t_wx_target select

    2024年02月12日
    浏览(47)
  • 时区的坑:数据时间在不同数据库中差8小时、13小时、14小时是怎么回事

    恭喜你,十有八九,就是时区的问题: 首先要明确的是,我们中国的标准时区是东八区,就是世界协调时间(UTC)加上八个小时,也就是UTC+8。 格林尼治标准时间 最坑的是美国时间,因为美国实行的是夏令时,也就是说美国是有两个时区在一天内来回切换,有几个月是西六区,

    2024年02月05日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包