Flink:流式 Join 类型 / 分类 盘点 (一)

这篇具有很好参考价值的文章主要介绍了Flink:流式 Join 类型 / 分类 盘点 (一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink:流式 Join 类型 / 分类 盘点 (一),大数据专题,flink,join,类型,Temporal,Interval,Lookup,维表 博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

在Flink中,实现流之间连接的操作可以分为两类。第一类是基于原生State状态存储的Connect算子操作,这种方式可以实现低延迟的数据连接和转换;第二类则是基于窗口的JOIN操作,这种方式又可以细分为window join和interval join两种,通过对数据进行时间窗口和滑动窗口的划分,实现不同粒度的数据关联和计算。

1. Regular Join(常规 Join):

从 SQL 上看,它只是一条普通的 SQL,和批处理的 SQL 无异:

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

在为了维持常规 Join 结果的准确性,不难判断的是:Flink 需要将 Join 输入的两边数据永远保持在状态中,所以,计算查询结果所需的状态可能会无限增长,对于长时间运行的大数据量的流来说,这种 Join 的代价是负担不起的。当然,我们可以通过配置状态的 TTL 来缓解这一问题,但这可能会导致结果不准确。总得来说就是:在流上,常规 Join 是可用的,但要慎用。

Regular Join 又会细分为 INNER Equi-JOIN 和 OUTER Equi-JOIN,具体参考文档,此处不再赘述

2. Interval Join(时间区间 Join)

Regular Join 的条件太宽松,导致 Join 成本巨大,Interval Join 会添加一个时间范围限制,让流上仅处于指定时间区间内的数据参与 Join。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

上述 SQL 是一个典型的 Interval Join,它试图关联订单和它的发货记录以便获得更多信息,如果业务上保证:下单之后 4 小时以内即可发货,那上述 SQL 就能保证 order 和它的 shipment 可以关联上。我们看到,从 SQL 上来说,Interval Join 区别于 Regular Join 的地方就是:它在 Regular Join 的基础上又追加了时间范围条件,这就大大地减轻了维持 Join 状态数据的负担。以下是一些典型的 Interval Join 条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

3. Temporal Join (版本表 Join)

Temporal Join 并没有对应一个精准的中文称为,但别简单地把它称为 Temporal Table Join,因为它 Join 的是 Temporal Table (时态表)中的版本表,如果要精准描述的话,应该说是:版本表 Join。

要了解 Temporal Join 必须得先了解很么是 Temporal Table (时态表),对此,请参考 《关于 动态表 / 时态表 / 版本表 概念的澄清》一文,本文就不再解释了。Temporal Join 就是 join 了一张版本表,那这到底有何不同呢?我们知道,既然版本表中一条记录在不同时刻可能会有不同的值(版本),那这就会引申出一个问题:当我们 join 一张版本表时,应该 join 一条记录的哪个版本呢?如果没有特别配置,那么默认行为自然是应该 join 当前的最新值(版本),那有没有需要 join 过去某个时间点的值(版本)的场景呢?有!并且有很多!官方文档给出的就是一个典型的例子:对于一张订单,我们总是应该参照下单时的汇率表去转换为一种统一货币的总价,这就需要订单表去 Join 汇率表在下单时刻的那个版本值

3.1. 基于事件时间的 Temporal Join

为了便于描述,我们按官方文档的介绍,让版本表作为被关联表,用”右表“指代,把主动需要关联的表称为”左表“。既然 Temporal Join 关联的右表是版本表,则关联的一方,也就是版本表必然已经定义了事件时间属性,如果关联的另一张表,也就是”左表“,也定义了事件时间属性(通过 Wartermark),且在 Join 时通过 FOR SYSTEM_TIME AS OF 关键字指定了左表上的这个事件时间属性,那么,这就是一个”基于事件时间的 Temporal Join“,以下是一个示例:

-- 左表:orders, 注意 orders 表也定义了事件时间列:order_time
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (/* ... */);

-- 右表:currency_rates 是一张版本表,因为它定义了主键和事件时间
-- 这种表的数据通常来自 CDC 数据,也就是 数据库的 changelog
-- 显然,这里使用的是存放在kafka中的debezium-json格式的 changelog 数据 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     orders.currency,
     conversion_rate,
     order_time
FROM orders
-- 关键字:FOR SYSTEM_TIME AS OF 用于指定左表中的一个时间类型的字段,Flink会根据这个时间和
-- 版本表上的指定的事件时间字段(即 currency_rates.update_time)进行比对,找到对应版本的记录
-- 与之进行关联。这里“对应版本”的逻辑应该是:在order_time这个时刻,currency_rates 所对应着的
-- 当时版本的记录值
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

在上面这个基于事件时间的 Temporal Join 中,最核心的一个逻辑在于:在按汇率计算一个 order 的总价时,到底是读的 currency_rates 版本表中的哪一个版本值?我们假设: 一个 order 的 order_time 是 13:42,currency_rates 对应货币的汇率在 13:4013:45各有一个版本(假设每5分钟更新一次),分别是 6.886.89,若当前时间 是13:46 分,则这个 order join 的哪一个汇率呢?显然是 6.88

从 基于事件时间的 Temporal Join 的行为特征上不难看出:对于正在实时 Join 的两个流来说,如我们需要一张表总是 Join 其记录所代表的事件在发生的当时另一张表上当时的数据,此时就应该使用 “基于事件时间的 Temporal Join”,简单总结一下的话可以说是:当时对当时,这应该符合大多数流式的 Join 需求。

3.2. 基于处理时间的 Temporal Join

基于处理时间的 Temporal Join常常用在使用外部系统来丰富流的数据,典型的例子是:维表 Join。

从 基于处理时间的 Temporal Join 的行为特征上不难看出:对于正在实时 Join 的两个流来说,如我们需要一张表总是 Join 另一张表上当前的最新数据,此时就应该使用 “基于处理时间的 Temporal Join”,简单总结一下的话可以说是:当时对现在,维表 Join 通常是此类情形的典型代表(通常维表的变化是很缓慢的)文章来源地址https://www.toymoban.com/news/detail-840794.html

到了这里,关于Flink:流式 Join 类型 / 分类 盘点 (一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包