Flink cdc同步mysql到starrocks(日期时间格式/时区处理)

这篇具有很好参考价值的文章主要介绍了Flink cdc同步mysql到starrocks(日期时间格式/时区处理)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境

flink 1.15.3(此时最新版本为1.16.1)
mysql 5.7+
starrocks 2.5.2

mysql同步表结构

mysql中的timestamp字段是可以正常同步的,但是多了8小时,设置了mysql链接属性也没效果

CREATE TABLE `temp_flink` (
  `id` int(11) NOT NULL,
  `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `remark` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

参考下方的链接有两种方式;

这里使用单独的转换器代码如下


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 * mysql日期字段时区/格式处理
 * @author JGMa
 */
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;

    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;

    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void configure(Properties props) {

    }

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {

        String sqlType = column.typeName().toUpperCase();

        SchemaBuilder schemaBuilder = null;

        Converter converter = null;

        if ("DATE".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");

            converter = this::convertDate;

        }

        if ("TIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");

            converter = this::convertTime;

        }

        if ("DATETIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");

            converter = this::convertDateTime;


        }

        if ("TIMESTAMP".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");

            converter = this::convertTimestamp;

        }

        if (schemaBuilder != null) {

            registration.register(schemaBuilder, converter);

        }

    }


    private String convertDate(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof LocalDate) {

            return dateFormatter.format((LocalDate) input);

        }

        if (input instanceof Integer) {

            LocalDate date = LocalDate.ofEpochDay((Integer) input);

            return dateFormatter.format(date);

        }

        return String.valueOf(input);

    }


    private String convertTime(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof Duration) {

            Duration duration = (Duration) input;

            long seconds = duration.getSeconds();

            int nano = duration.getNano();

            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);

            return timeFormatter.format(time);

        }

        return String.valueOf(input);

    }


    private String convertDateTime(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof LocalDateTime) {

            return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");

        }

        return String.valueOf(input);

    }


    private String convertTimestamp(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof ZonedDateTime) {

            // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间

            ZonedDateTime zonedDateTime = (ZonedDateTime) input;

            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();

            return timestampFormatter.format(localDateTime).replaceAll("T", " ");

        }
        return String.valueOf(input);
    }
}

使用

{

    public static void main(String[] args) {
        String tableName = "temp_flink";
        String srcHost = "192.168.10.14";
        String srcDatabase = "xcode";
        String srcUsername = "root";
        String srcPassword = "123456";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        Properties mysqlProperties = new Properties();
//        mysqlProperties.setProperty("characterEncoding","UTF-8");
//        mysqlProperties.setProperty("connectionTimeZone","Asia/Shanghai");
        //自定义时间转换配置
        mysqlProperties.setProperty("converters", "dateConverters");
        mysqlProperties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(srcHost)
//                .jdbcProperties(mysqlProperties)
                .port(3306)
                .databaseList(srcDatabase)
                .tableList(srcDatabase + "." + tableName)
                .username(srcUsername)
                .password(srcPassword)
//                .serverTimeZone("Asia/Shanghai")
// 主要是这里
                .debeziumProperties(mysqlProperties)
                .deserializer(new JsonStringDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[temp_flink-source]")
                .setParallelism(1);

        streamSource.addSink(StarRocksSink.sink(
                // the sink options
                StarRocksSinkOptions.builder()
                        .withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8")
                        .withProperty("load-url", "192.168.10.245:8030")
                        .withProperty("database-name", "xcode")
                        .withProperty("username", "root")
                        .withProperty("password", "123456")
                        .withProperty("table-name", tableName)
                        // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                        // .withProperty("sink.properties.partial_update", "true")
                        // .withProperty("sink.properties.columns", "k1,k2,k3")
                        .withProperty("sink.properties.format", "json")
                        .withProperty("sink.properties.strip_outer_array", "true")
                        // 设置并行度,多并行度情况下需要考虑如何保证数据有序性
                        .withProperty("sink.parallelism", "1")
                        .build())
        ).name(">>>StarRocks temp_flink Sink<<<");

        try {
            env.execute("temp_flink stream sync");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("[sync error] info : {}", e);
        }


    }

}

参考资料
https://blog.csdn.net/cloudbigdata/article/details/122935333
https://blog.csdn.net/WuBoooo/article/details/127387144文章来源地址https://www.toymoban.com/news/detail-596301.html

到了这里,关于Flink cdc同步mysql到starrocks(日期时间格式/时区处理)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Mysql+ETLCloud CDC+StarRocks实时数仓同步实战

    大型企业需要对各种业务系统中的销售及营销数据进行实时同步分析,例如库存信息、对帐信号、会员信息、广告投放信息,生产进度信息等等,这些统计分析信息可以实时同步到StarRocks中进行分析和统计,StarRocks作为分析型数据库特别适合于对海量数据的存储和分析,我们

    2024年02月16日
    浏览(41)
  • springboot集成starrocks、以及采用flink实现mysql与starrocks亚秒级同步

    (因采用dynamic-datasource-spring-boot-starter动态数据源,所以才是以下配置文件的样式,像redis,druid根据自己情况导入依赖) 这个配置文件的场景是把starrocks当成slave库在用。某些大数据慢查询就走starrocks 就这样配置好后就可把starrocks当mysql用了 重点:采用这种方式有限制,插入

    2024年01月21日
    浏览(35)
  • flink- mysql同步数据至starrocks-2.5.0之环境搭建

    一般需要以下几个服务: mysql flink flink-taskmanager flink-jobmanager starrocks starrocks-fe starrocks-be docker-compose.yml 配置文件 启动: docker-compose up -d : 登陆 starrocks 注意事项 mysql开启 bin log mysql 基于cdc ,采用 binlog模式,所以要开启binlog, conf/my.cnf : docker 服务放到同一docker网络中 如果不在同

    2024年02月10日
    浏览(50)
  • Flink CDC MySQL同步MySQL错误记录

    0、相关Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者从mvnrepository.com下载 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    浏览(53)
  • 最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。 连接器 数据库 驱动 mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    浏览(66)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(104)
  • Flink 实现 MySQL CDC 动态同步表结构

    作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flin

    2024年02月04日
    浏览(78)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(60)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 连接mysql(1)

    下载 连接器 SQL jar (或 自行构建 )。 将下载的jar包放在FLINK_HOME/lib/. 重启Flink集群。 注意 :目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的 6.使用 Flink CDC 对 MySQL 进行流式 ETL 本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。 假设我们将产品数

    2024年04月26日
    浏览(48)
  • 基于Flink SQL CDC Mysql to Mysql数据同步

    Flink CDC有两种方式同步数据库: 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步; 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。 本方案使用FlinkSQL方法,同步两表中的数据。 其中Flink应用可以部署在具有公网IP的服务

    2023年04月11日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包