《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
- Flink SQL 语法篇(一):CREATE
- Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 语法篇(四):Group 聚合、Over 聚合
- Flink SQL 语法篇(五):Regular Join、Interval Join
- Flink SQL 语法篇(六):Temporal Join
- Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 语法篇(八):集合、Order By、Limit、TopN
- Flink SQL 语法篇(九):Window TopN、Deduplication
- Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Temporal Join 定义(支持 Batch / Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 Join 这个 拉链快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table
,使用一个明细表去 Join 这个 Versioned Table
的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table
其实就是对同一条 key
(在 DDL 中以 Primary Key 标记同一个 key
)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table
对应时间区间内的快照数据进行 Join。
应用场景:比如常见的汇率数据(实时的根据汇率计算总金额),在
12
:
00
12:00
12:00 之前(事件时间),人民币和美元汇率是
7
:
1
7:1
7:1,在
12
:
00
12:00
12:00 之后变为
6
:
1
6:1
6:1,那么在
12
:
00
12:00
12:00 之前数据就要按照
7
:
1
7:1
7:1 进行计算,
12
:
00
12:00
12:00 之后就要按照
6
:
1
6:1
6:1 计算。在事件时间语义的任务中,事件时间
12
:
00
12:00
12:00 之前的数据,要按照
7
:
1
7:1
7:1 进行计算,
12
:
00
12:00
12:00 之后的数据,要按照
6
:
1
6:1
6:1 进行计算。这其实就是离线中快照的概念,维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table
。
1.Versioned Table 的两种定义方式
Verisoned Table
:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。举一个汇率场景的案例来看一下一个 Versioned Table 的两种定义方式。
1.1 PRIMARY KEY 定义方式
-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
-- PRIMARY KEY 定义方式
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
1.2 Deduplicate 定义方式
-- 定义一个 append-only 的数据源表
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
-- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键
ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列
) AS rownum
FROM currency_rates)
WHERE rownum = 1;
2.应用案例
Temporal Join 支持的时间语义:事件时间、处理时间。
2.1 案例一(事件时间)
-- 1. 定义一个输入订单表
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time,
FROM orders
-- 3. Temporal Join 逻辑
-- SQL 语法为:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:文章来源:https://www.toymoban.com/news/detail-840376.html
order_id price 货币 汇率 order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
- 事件时间的 Temporal Join 一定要给左右两张表都设置 Watermark。
- 事件时间的 Temporal Join 一定要把 Versioned Table 的主键包含在 Join on 的条件中。
2.2 案例二(处理时间)
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
-- 10:42 时,Euro 的汇率从 114 变为 116
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== 从 114 变为 116
Yen 1
-- 从 Orders 表查询数据
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== 在处理时间 10:15 到达的一条数据
1 US Dollar <== 在处理时间 10:30 到达的一条数据
2 Euro <== 在处理时间 10:52 到达的一条数据
-- 执行关联查询
SELECT
o.amount, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
-- 结果如下:
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== 在处理时间 10:15 到达的一条数据
1 US Dollar 102 102 <== 在处理时间 10:30 到达的一条数据
2 Euro 116 232 <== 在处理时间 10:52 到达的一条数据
可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates
维护了最新的状态数据,不需要关心历史版本的数据。文章来源地址https://www.toymoban.com/news/detail-840376.html
到了这里,关于【大数据】Flink SQL 语法篇(六):Temporal Join的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!