【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准

这篇具有很好参考价值的文章主要介绍了【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关联文章:
各种时间类型和timezone关系浅析

一、测试目的和值

1. 测试一般的数据库不含time zone的类型的时区。

  • mysql timestamp(3) 类型
  • postgres timestamp(3) 类型
  • sqlserver datetime2(3) 类型
  • oracle类型 TIMESTAMP(3) 类型
    在以下测试之中均为ts字段

2.测试CDC中元数据op_ts 时区

op_tsTIMESTAMP_LTZ(3) NOT NULL当前记录表在数据库中更新的时间。如果从表的快照而不是 binlog 读取记录,该值将始终为0。|

在以下测试中cdc表建表均使用ts_ms TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL 表示。
cdc在读取表时候分两个阶段:

  1. 全量读取阶段,特点是jdbc读取,读取数据中op=r
  2. 增量读取阶段,特点是log读取,读取数据中op=c或u或d
    op在截图中看到如3="r" 或者 3="r",3是op字段的索引值。
    ts_ms在全量阶段读取数据以下成为READ数据
    ts_ms在增量阶段读取数据以下成为CREATE数据

3. flink 数据时间表示和时区

flink Table中时间必须使用org.apache.flink.table.data.TimestampData对象表示。

@PublicEvolving  
public final class TimestampData implements Comparable<TimestampData> {  
    private final long millisecond;  
    private final int nanoOfMillisecond;
}

此类型使用如下两个值联合表示记录时间。并不记录时区数据。

实战测试:

@Test  
public void testTimeZone(){  
  
    // 常识:Epoch就是值utc的0时间点,是全局绝对时间点,本质是`ZoneOffset.of("+0")`下的0时间。与`January 1, 1970, 00:00:00 GMT`视为等同。  
    // GMT是前世界标准时,UTC是现世界标准时。UTC 比 GMT更精准,以原子时计时,适应现代社会的精确计时。  
    // 28800000=8*3600*1000。8小时毫秒值。  
  
    // 如下时间是+8时区的数据库存储的不带时区的时间:2023-09-28T09:43:20.320  
    long ts=1695894200320L;  
  
    // 如果将ts当做utc时间0时刻转为字符串则会导致时间+8 hour。2023-09-28 17:43:20。这是一般常用的在线转换时间的结果。因其默认是是epoch时间,所以转换后会+8h。  
    // 可见数据库读取的不带timezone时间的毫秒值,并不是以utc0时间(epoch)为基准的,而是以当前时区0为基准的。  
  
    // LocalDateTime对象本质支持LocalDate和LocalTime两个对象,LocalDate持有Integer的`年`,`月`,`日`。LocalTime则持有Integer的`时`,`分`,`秒`等和java.util.Date类型并不一样。  
    // LocalDateTime 的带有ZoneOffset方法比较难理解,此处:  
    // epochSecond 当然值的是epoch的秒数,是绝对时间概念和`java.util.Date.getTime()/1000`对应的,而offset是指此epoch秒数需要偏移的时间量。  
    // 内部代码是`long localSecond = epochSecond + offset.getTotalSeconds();`。  
  
    // 如下代码是正确的,因为java中的`java.util.Date`类和`java.sql.Timestamp`类型都是持有绝对时间的类,`Date.getTime`获得也是相对于Epoch的毫秒值(Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT)。  
    LocalDateTime ldtFromDate = LocalDateTime.ofEpochSecond(new Date().getTime() / 1000, 0, ZoneOffset.of("+8"));  
    System.out.println(ldtFromDate);  // 2023-09-28T16:16:45。此时时钟也是16:17:44。  
    Date date0 = new Date(0); // number of milliseconds since the standard base time known as "the epoch"  
    System.out.println(date0.getTime()); // 0, date0.getTime()方法返回绝对时间Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT  
  
    // 如下的提供`ZoneOffset.UTC`可以理解是告诉LocalDateTime我提供的epochSecond已是`localSecond=当地时间-当地时间的0点`不需要再做转换了。  
    LocalDateTime ldt0 = LocalDateTime.ofEpochSecond(0L, 0, ZoneOffset.UTC);  
    System.out.println(ldt0); // 1970-01-01T00:00  
    LocalDateTime ldt8 = LocalDateTime.ofEpochSecond(0L, 0, ZoneOffset.of("+8"));  
    System.out.println(ldt8); // 1970-01-01T08:00  
  
    // TimestampData 默认不会进行任何时区转换。也不存储任何时区信息。内部仅靠`long millisecond`和`int nanoOfMillisecond`存储信息,以便于序列化。  
    // millisecond 一般可以认为是本地时间。因其在toString方法中会不会进行时区转换,toString方法仅是调用了`toLocalDateTime()`,中进行简单运算,并最终调用`LocalDateTime.toString`方法。  
    TimestampData td0 = TimestampData.fromEpochMillis(0); // 相当于LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)。  
    System.out.println(td0); // 1970-01-01T00:00。可见TimestampData输出转字符串的时间就是以utc时间为基准的这和java.util.Date类型是一致的。  
  
    LocalDateTime ldt = LocalDateTime.ofEpochSecond(  
            ts / 1000  
            , (int) (ts % 1000 * 1_000_000)  
            , ZoneOffset.UTC);  
    System.out.println(ldt); // 2023-09-28T09:43:20.320  
    TimestampData td = TimestampData.fromEpochMillis(ts);  
    System.out.println(td); // 2023-09-28T09:43:20.320  
  
    Date date = new Date(ts); // 注意:参数date(the specified number of milliseconds since the standard base time known as "the epoch")应该是epoch但此时ts并不是epoch基准的而是本地local基准的。  
    System.out.println(date); // Thu Sep 28 17:43:20 CST 2023,CST就是北京时间了,其在toString方法中`BaseCalendar.Date date = normalize();`进行了时区转换即+8了。  
}

4. 测试组件版本

  • flink 1.13
  • flink-cdc 2.2.1
  • flink-connector-jdbc 自己定制的,根据3.1.1-1.17版本修改而来。

二、本测试共测试四大数据库:

  • mysql
  • postgres
  • sqlserver
  • oracle

二、每种数据库测试8项:

  • database-SQL
    直接从数据中读取数据,是测试的基准值
  • cdc-RowData
    使用cdc的SQL API从数据库中读取值并在 com.ververica.cdc.debezium.table.AppendMetadataCollector#collect 方法中debug得到数据
  • cdc-SQL(测试除ts_ms的字段)
    使用cdc的SQL API读取值使用flink sql-client查询,用于测试除ts_ms的字段。因ts_ms准确性需分两种情况讨论。
  • cdc-SQL-RealTime(测试ts_ms)
    使用cdc的SQL API从读取值,左上角是系统时间,下侧是实时读取的数据。
  • cdc-Read数据(测试snapshot读取ts_ms字段)
    测试snapshot读取ts_ms字段,即全量读取阶段的ts_ms值,按照flink-cdc官方解释此四个数据的全量阶段值均为0(1970-01-01 00:00:00)。非0即为不正确。
  • cdc-Create数据(测试incremental读取ts_ms字段)
    测试incremental读取ts_ms字段,即增量读取阶段的ts_ms值。按照flink-cdc官方解释此四个数据的增量阶段值为数据日志记录时间。
  • jdbc-RowData
    使用flink SQL API 读取connector是jdbc的表数据org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat#nextRecord的方法中debug得到数据。。不含tm_ms数据。
  • jdbc-SQL
    使用flink SQL API 读取connector是jdbc的表数据。使用flink sql-client查询。。不含tm_ms数据。

三、测试过程数据

3.1 mysql

3.1.1 database-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.2 cdc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.3 cdc-SQL(测试除ts_ms的字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.4 cdc-SQL-RealTime(测试ts_ms)

如下:上侧(win系统显示时间截图),下侧(cdc-query的ts_ms)
如果基本一致(不是差值8h),说明cdc-query的ts_ms是正确的的。
【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.5 cdc-Read数据(测试snapshot读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.6 cdc-Create数据(测试incremental读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.7 jdbc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.1.8 jdbc-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2 postgres

3.2.1 database-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.2 cdc

cdc-RowData
【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.3 cdc-SQL(测试除ts_ms的字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.4 cdc-SQL-RealTime(测试ts_ms)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.5 cdc-Read数据(测试snapshot读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.6 cdc-Create数据(测试incremental读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.7 jdbc

jdbc-RowData
【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.2.8 jdbc-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3 sqlserver

3.3.1 database-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.2 cdc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.3 cdc-SQL(测试除ts_ms的字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.4 cdc-SQL-RealTime(测试ts_ms)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.5 cdc-Read数据(测试snapshot读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.6 cdc-Create数据(测试incremental读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.7 jdbc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.3.8 jdbc-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4 oracle

3.4.1 database-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.2 cdc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.3 cdc-SQL(测试除ts_ms的字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.3 cdc-SQL-RealTime(测试ts_ms)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.4 cdc-Read数据(测试snapshot读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.5 cdc-Create数据(测试incremental读取ts_ms字段)

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.7 jdbc-RowData

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

3.4.8 jdbc-SQL

【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据

四、结论

(1)数据库获取的without time zone在flink中都是以本地时间的存储的。可以使用LocalDateTime.ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset.UTC)直接获取。
(2)Flink中的TimestampData中存储的一般可以认为是本地时间。但需要注意:TimestampData 不可将 instant 相关方法localDateTime 、Timestamp 相关方法混用。因为instant代表与epoch时间差。而后两者代表与local是时间差。
(3)Flink程序中时间的标准值都是local本地的。因其在Sql API(sql-client)中打印出的结果会与原始数据库中打印的一致。

如下图中红色字体的是错误的数据,使用CDC需要额外注意并进行转换。
【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准,flink,大数据文章来源地址https://www.toymoban.com/news/detail-737634.html

五、附录

5.1 查询数据库时区SQL

-- mysql 以:time_zone 为准,system_time_zone至服务器时区
show variables like '%time_zone%';

-- postgres
show time zone;

-- sqlserver
DECLARE
@TimeZone NVARCHAR(255)
EXEC
master.dbo.xp_instance_regread
N'HKEY_LOCAL_MACHINE'
,
N'SYSTEM\CurrentControlSet\Control\TimeZoneInformation'
,
N'TimeZoneKeyName'
,
@TimeZone
OUTPUT
SELECT
@TimeZone 

-- oracle
select dbtimezone from dual;

到了这里,关于【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink SQL 时区 -- 时间字符串转时间戳并转换时区

    将时间字符串格式化,转变成时间戳,再加8小时后写入clickhouse (该方法默认精确度为秒,不适用毫秒) (1)UNIX_TIMESTAMP 作用:将时间字符串转换成时间戳 用法:UNIX_TIMESTAMP(STRING datestr, STRING format) (2)CONVERT_TZ 作用:转换时区 用法:CONVERT_TZ(string1, string2, string3) (实测仅获

    2024年02月04日
    浏览(47)
  • Flink CDC SQL Oracle to Postgresql与jdbc连接oracle报错处理

    flink-cdc官网:Oracle CDC Connector — CDC Connectors for Apache Flink® documentation Flink环境依赖: (3)启用日志归档 (4)检查是否启用了日志归档 (5)创建具有权限的 Oracle 用户 (5.1)。创建表空间 (5.2)。创建用户并授予权限 Flink SQL 客户端连接器测试: 创建 Oracle 链接器 返回内容 以上代

    2024年02月11日
    浏览(44)
  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(52)
  • flink cdc多种数据源安装、配置与验证 flink cdc多种数据源安装、配置与验证

      搜索 文章目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验证 本文目录结构

    2024年02月09日
    浏览(44)
  • 大数据Flink(八十一):SQL 时区问题

    文章目录 SQL 时区问题 ​​​​​​​一、SQL 时区解决的问题

    2024年02月07日
    浏览(41)
  • Flink CDC数据同步

    一、什么是FLink Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 接下来,我们来介绍一下 Flink 架构中的重要方面。 任何类型的数据都可以形成一种事

    2024年02月08日
    浏览(46)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(58)
  • 【大数据】Flink 详解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月25日
    浏览(50)
  • 基于 Flink CDC 的现代数据栈实践

    摘要:本文整理自阿里云技术专家,Apache Flink PMC Member Committer, Flink CDC Maintainer 徐榜江和阿里云高级研发工程师,Apache Flink Contributor Flink CDC Maintainer 阮航,在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分: 1.深入解读 Flink CDC 2.3 版本 2.基于 Flink CDC 构建

    2024年02月09日
    浏览(42)
  • 【大数据】Flink CDC 的概览和使用

    CDC ( Change Data Capture , 数据变更抓取 )是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

    2024年01月24日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包