【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性

这篇具有很好参考价值的文章主要介绍了【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink SQL 基础概念》系列,共包含以下 5 篇文章:

  • Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
  • Flink SQL 基础概念(二):数据类型
  • Flink SQL 基础概念(三):SQL 动态表 & 连续查询
  • Flink SQL 基础概念(四):SQL 的时间属性
  • Flink SQL 基础概念(五):SQL 时区问题

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性,# Flink SQL,flink,sql,事件时间,处理时间,摄入时间,窗口,WaterMark
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 1. 这个 ts 就是常见的毫秒级别时间戳
  ts BIGINT,
  -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {
                    @Override
                    public long extractTimestamp(Row element) {
                        return (long) element.getField("f2");
                    }
                });
        // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。文章来源地址https://www.toymoban.com/news/detail-849592.html

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 使用下面这句来将 user_action_time 声明为处理时间
  user_action_time AS PROCTIME()
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource());

        // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

到了这里,关于【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年03月21日
    浏览(72)
  • 【Apache-Flink零基础入门】「入门到精通系列」手把手+零基础带你玩转大数据流式处理引擎Flink(特点和优势分析+事件与时间维度分析)

    本文介绍了Apache Flink的定义、架构、基本原理,并辨析了大数据流计算相关的基本概念。同时回顾了大数据处理方式的历史演进以及有状态的流式数据处理的原理。最后,分析了Apache Flink作为业界公认为最好的流计算引擎之一所具备的天然优势,旨在帮助读者更好地理解大数

    2024年02月03日
    浏览(55)
  • 【大数据】Flink 架构(三):事件时间处理

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月21日
    浏览(34)
  • Flink中事件时间和处理时间(TumblingEventTimeWindows TumblingProcessingTimeWindows)

    TumblingEventTimeWindows 和 TumblingProcessingTimeWindows 是 Flink 中两种不同的窗口类型. 区别如下: 时间类型: TumblingEventTimeWindows 是基于事件时间的窗口类型,可以通过设置 Watermark 和 EventTimeCharacteristic 来确定事件时间;而 TumblingProcessingTimeWindows 是基于处理时间的窗口类型,时间由 F

    2024年02月07日
    浏览(30)
  • flink基础概念之什么是时间语义

    Flink支持三种不同的时间语义,以便处理流式数据中的事件时间、处理时间和摄入时间。 1. 处理时间(Processing Time) 处理时间的概念非常简单,就是指 执行处理操作的机器的系统时间 。 在这种时间语义下处理窗口非常简单粗暴,不需要各个节点之间进行协调同步,也不需要

    2024年01月21日
    浏览(38)
  • Flink SQL 时区 -- 时间字符串转时间戳并转换时区

    将时间字符串格式化,转变成时间戳,再加8小时后写入clickhouse (该方法默认精确度为秒,不适用毫秒) (1)UNIX_TIMESTAMP 作用:将时间字符串转换成时间戳 用法:UNIX_TIMESTAMP(STRING datestr, STRING format) (2)CONVERT_TZ 作用:转换时区 用法:CONVERT_TZ(string1, string2, string3) (实测仅获

    2024年02月04日
    浏览(46)
  • flink1.14 sql基础语法(二) flink sql表定义详解

    每一个表的标识由 3 部分组成: catalog name (常用于标识不同的“源”,比如 hive catalog,inner catalog 等) database name(通常语义中的“库”) table name (通常语义中的“表”) 1个flinksql程序在运行时,tableEnvironment 通过持有一个 map 结构来记录所注册的 catalog; Flinksql中的表,可以是 vi

    2024年02月14日
    浏览(40)
  • flink1.14 sql基础语法(一) flink sql表查询详解

    语法示例: flink从1.13开始,提供了时间窗口聚合计算的TVF语法。 表值函数的使用约束: (1)在窗口上做 分组聚合 ,必须带上window_start 和 window_end 作为分组的key; (2)在窗口上做 topn计算 ,必须带上window_start 和 window_end 作为partition的key; (3)带条件的 join ,必须包含

    2024年02月06日
    浏览(42)
  • Flink(十三)Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(54)
  • 【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理

    Watermark 是用于处理事件时间的一种机制,用于表示事件时间流的进展。在流处理中,由于事件到达的顺序和延迟,系统需要一种机制来衡量事件时间的进展,以便正确触发窗口操作等。Watermark 就是用来标记事件时间的进展情况的一种特殊数据元素。 Watermark 的生成方式通常是

    2024年02月20日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包