【大数据】Flink SQL 语法篇(六):Temporal Join

这篇具有很好参考价值的文章主要介绍了【大数据】Flink SQL 语法篇(六):Temporal Join。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table,使用一个明细表去 Join 这个 Versioned Table 的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 Primary Key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 Join。

应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在 12 : 00 12:00 12:00 之前(事件时间),人民币和美元汇率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后变为 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前数据就要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 计算。在事件时间语义的任务中,事件时间 12 : 00 12:00 12:00 之前的数据,要按照 7 : 1 7:1 7:1 进行计算, 12 : 00 12:00 12:00 之后的数据,要按照 6 : 1 6:1 6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table

1.Versioned Table 的两种定义方式

Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。

1.1 PRIMARY KEY 定义方式

-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    -- PRIMARY KEY 定义方式
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

1.2 Deduplicate 定义方式

-- 定义一个 append-only 的数据源表
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time
) WITH (
    'connector' = 'kafka',
    'value.format' = 'debezium-json',
    /* ... */
);

-- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time   -- 1. 定义 `update_time` 为时间字段
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- 2. 定义 `currency` 为主键
         ORDER BY update_time DESC              -- 3. ORDER BY 中必须是时间戳列
      ) AS rownum 
      FROM currency_rates)
WHERE rownum = 1; 

2.应用案例

Temporal Join 支持的时间语义:事件时间、处理时间。

2.1 案例一(事件时间)

-- 1. 定义一个输入订单表
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,
FROM orders
-- 3. Temporal Join 逻辑
-- SQL 语法为:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:

order_id  price  货币       汇率             order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
  • 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
  • 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。

2.2 案例二(处理时间)

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

-- 10:42 时,Euro 的汇率从 114 变为 116
10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <====114 变为 116
Yen           1

-- 从 Orders 表查询数据
SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== 在处理时间 10:15 到达的一条数据
     1 US Dollar        <== 在处理时间 10:30 到达的一条数据
     2 Euro             <== 在处理时间 10:52 到达的一条数据

-- 执行关联查询
SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

-- 结果如下:
amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== 在处理时间 10:15 到达的一条数据
     1 US Dollar     102          102    <== 在处理时间 10:30 到达的一条数据
     2 Euro          116          232    <== 在处理时间 10:52 到达的一条数据

可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。文章来源地址https://www.toymoban.com/news/detail-840376.html

到了这里,关于【大数据】Flink SQL 语法篇(六):Temporal Join的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据】Flink SQL 语法篇(一):CREATE

    CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。 目前 Flink SQL 支持下列 CREATE 语句: CREATE TABLE CREATE DATABASE CREATE VIEW CREATE FUNCTION 下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同

    2024年02月21日
    浏览(36)
  • Flink Temporal Join 系列 (4):用 Temporal Table Function 实现基于处理时间的关联

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年04月23日
    浏览(36)
  • flink1.14 sql基础语法(二) flink sql表定义详解

    每一个表的标识由 3 部分组成: catalog name (常用于标识不同的“源”,比如 hive catalog,inner catalog 等) database name(通常语义中的“库”) table name (通常语义中的“表”) 1个flinksql程序在运行时,tableEnvironment 通过持有一个 map 结构来记录所注册的 catalog; Flinksql中的表,可以是 vi

    2024年02月14日
    浏览(40)
  • flink1.14 sql基础语法(一) flink sql表查询详解

    语法示例: flink从1.13开始,提供了时间窗口聚合计算的TVF语法。 表值函数的使用约束: (1)在窗口上做 分组聚合 ,必须带上window_start 和 window_end 作为分组的key; (2)在窗口上做 topn计算 ,必须带上window_start 和 window_end 作为partition的key; (3)带条件的 join ,必须包含

    2024年02月06日
    浏览(42)
  • Flink-SQL join 优化 -- MiniBatch + local-global

    问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container heartbeat timeout,内存溢出(GC overhead limit exceede) ,作业无法进行正常工作 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况 查看日志内容发现,出

    2024年02月06日
    浏览(40)
  • (二开)Flink 修改源码拓展 SQL 语法

    1、Flink 扩展 calcite 中的语法解析 1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例 a)类位置 flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java 核心方法 : b)类血缘 2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑 a)文件位

    2024年02月07日
    浏览(38)
  • 【大数据】Flink 详解(十):SQL 篇 Ⅲ(Flink SQL CDC)

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月25日
    浏览(48)
  • 【大数据】Flink 详解(十):SQL 篇 Ⅲ

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月18日
    浏览(56)
  • 【大数据】Flink 详解(九):SQL 篇 Ⅱ

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年01月16日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包