Flink动态更新维表

这篇具有很好参考价值的文章主要介绍了Flink动态更新维表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.Lookup join

     概念:Lookup join是针对于由作业流表触发,关联右侧维表来补全数据的场景 。

默认情况下,在流表有数据变更,都会触发维表查询(可以通过设置维表是否缓存,来减轻查询压力),由于不保存状态,因此对内存占用较小。(以上来自网络)

具体配置如下:        

SET execution.checkpointing.interval=5000;
SET state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/2023090510549999;
SET execution.runtime-mode=streaming;

// 定义维表,维表 可以有主键,不能有水位线字段
CREATE TEMPORARY TABLE  source_dim_dept (
`id` BIGINT,
`dept` STRING,
PRIMARY KEY(id) NOT ENFORCED
)  WITH (
    'connector'='jdbc',    
    'url'='jdbc:mysql://1.1.1.1:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='data_manage',
    'password'='xxxxxx',
    'table-name'='source_dim_dept ',
    'lookup.cache'='PARTIAL',    
    'lookup.partial-cache.expire-after-write'='30s',
    'lookup.cache.ttl' = '30s',
    'lookup.cache.max-rows'='5',
    'lookup.partial-cache.max-rows'='5'    
);

// 流表 -流表必须要有 PROCTIME(),和 水位线字段
CREATE TABLE source_kafka_stream_data(
`id` BIGINT,
`ts` BIGINT,
`price` FLOAT,
proc_time AS PROCTIME(),
`auto_row_time` AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR `auto_row_time` AS `auto_row_time` - INTERVAL '0' SECONDS
 )  WITH (
    'connector'='kafka',
    'properties.group.id'='data_processing_producer',
    'scan.startup.mode'='group-offsets',
    'properties.auto.offset.reset'='latest',
    'topic'='data-paimon-test',
    'properties.bootstrap.servers'='1.1.1.1:9192',
    'format'='json'
);

CREATE VIEW transform_tableJoin_XeH7VK1o2U AS
select `status`,`id`,`ts`,`price`, PROCTIME() as auto_row_time from (
  select c.`dept` as `status`,c.`id` as `id`,O.`ts` as `ts`,O.`price` as `price`
   from source_kafka_stream_data AS O
   JOIN source_dim_dept FOR SYSTEM_TIME AS OF O.proc_time AS c
  on O.id=c.id ) ;


*注意坑在这里:上述的 连接中流表的时间字段一定要用 PROCTIME() 类型的 AS OF O.proc_time,如果 用水位线字段则Flink 会转为TemperalJoin

而不是Lookup join



// 以下是输出,无特殊配置
CREATE CATALOG paimon WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'  
);

USE CATALOG paimon;
create database if not exists paimon.paimon_ods_db;
drop table if exists paimon_ods_db.paimon_test_stream_join;
CREATE TABLE if not exists paimon_ods_db.paimon_test_stream_join(
`uuid` STRING,
`status` STRING,
`id` BIGINT,
`ts` BIGINT,
`price` DOUBLE
)  WITH (
   'sink.parallelism'='8',
   'bucket'='8',
   'bucket-key'='uuid',
   'write-mode'='append-only',
   'sink.use-managed-memory-allocator'='true',
   'sink.managed.writer-buffer-memory'='512MB',
   'num-sorted-run.compaction-trigger'='20',
   'write-buffer-size'='1024MB',
   'write-buffer-spillable'='true'  
);

INSERT INTO paimon_ods_db.paimon_test_stream_join select uuid(),`status`,`id`,`ts`,`price` from default_catalog.default_database.transform_tableJoin_XeH7VK1o2U;文章来源地址https://www.toymoban.com/news/detail-699996.html



2.Temporal join 时态表连接

时态表是一个随时间演变的表,在Flink中也称为动态表。

时态表中的行与一个或多个时态周期相关联,并且所有Flink表都是时态的(动态的)。时态表包含一个或多个版本化的表快照,它可以是跟踪更改的更改历史表(例如数据库更改日志,包含所有快照),也可以是具体化更改的维表(例如包含最新快照的数据库表)。

时态表可以分为版本表和普通表。

版本表:如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog (如mysql binlog)可以定义成版本表,版本表内的数据始终不会自动清理,只能通过upsert触发。
普通表:如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 、redis的表可以定义成普通表。
(以上来自网络)
      

SET execution.checkpointing.interval=5000;
SET state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/2023090510549999;
SET execution.runtime-mode=streaming;

// 定义维表,维表一定要有主键和水位线字段
CREATE TEMPORARY TABLE  source_dim_dept (
`id` BIGINT,
`dept` STRING,

`auto_row_time` AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR `auto_row_time` AS `auto_row_time` - INTERVAL '0' SECONDS
PRIMARY KEY(id) NOT ENFORCED
)  WITH (
    'connector'='jdbc',    
    'url'='jdbc:mysql://1.1.1.1:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
    'username'='data_manage',
    'password'='xxxxx',
    'table-name'='source_dim_dept ',
    'lookup.cache'='PARTIAL',    
    'lookup.partial-cache.expire-after-write'='30s',
    'lookup.cache.ttl' = '30s',
    'lookup.cache.max-rows'='5',
    'lookup.partial-cache.max-rows'='5'    
);

// 流表 -流表要有 水位线字段
CREATE TABLE source_kafka_stream_data(
`id` BIGINT,
`ts` BIGINT,
`price` FLOAT,
proc_time AS PROCTIME(),
`auto_row_time` AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR `auto_row_time` AS `auto_row_time` - INTERVAL '0' SECONDS
 )  WITH (
    'connector'='kafka',
    'properties.group.id'='data_processing_producer',
    'scan.startup.mode'='group-offsets',
    'properties.auto.offset.reset'='latest',
    'topic'='data-paimon-test',
    'properties.bootstrap.servers'='1.1.1.1:9192',
    'format'='json'
);

CREATE VIEW transform_tableJoin_XeH7VK1o2U AS
select `status`,`id`,`ts`,`price`, PROCTIME() as auto_row_time from (
  select c.`dept` as `status`,c.`id` as `id`,O.`ts` as `ts`,O.`price` as `price`
   from source_kafka_stream_data AS O
   JOIN source_dim_dept FOR SYSTEM_TIME AS OF O.auto_row_timeAS c
  on O.id=c.id ) ;


*注意坑在这里:上面的Lookup Join 区别也在这里,连接中流表的时间字段一定要用 水位线字段 类型的 AS OF O.auto_row_time



// 以下是输出,无特殊配置
CREATE CATALOG paimon WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'  
);

USE CATALOG paimon;
create database if not exists paimon.paimon_ods_db;
drop table if exists paimon_ods_db.paimon_test_stream_join;
CREATE TABLE if not exists paimon_ods_db.paimon_test_stream_join(
`uuid` STRING,
`status` STRING,
`id` BIGINT,
`ts` BIGINT,
`price` DOUBLE
)  WITH (
   'sink.parallelism'='8',
   'bucket'='8',
   'bucket-key'='uuid',
   'write-mode'='append-only',
   'sink.use-managed-memory-allocator'='true',
   'sink.managed.writer-buffer-memory'='512MB',
   'num-sorted-run.compaction-trigger'='20',
   'write-buffer-size'='1024MB',
   'write-buffer-spillable'='true'  
);

INSERT INTO paimon_ods_db.paimon_test_stream_join select uuid(),`status`,`id`,`ts`,`price` from default_catalog.default_database.transform_tableJoin_XeH7VK1o2U;

到了这里,关于Flink动态更新维表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink1.18.0 flink维表join新思路

    弊端:         虽然缓存可以减轻维表负担,但是如果事实表数据量很大,每秒千万条,维度表只有百万条,也就是说 你会看到大量的无法关联的数据仍然需要查询维度表.  cache缓存千万数据量内存压力又比较大, 那么怎么减轻维表数据库压力,还能做到低延迟. 以往双流join ; a joi

    2024年01月24日
    浏览(31)
  • 轻松通关Flink第19讲:Flink 如何做维表关联

    在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单收货人所在省的名称,一般来说订单中会记录一个省的 ID,那么需要根据 ID 去查询外部的维度表补充省名称属性。 在

    2024年02月13日
    浏览(33)
  • 58、Flink维表的实战-6种实现方式维表的join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月02日
    浏览(27)
  • Flink:维表 Join 难点和技术方案汇总

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年04月08日
    浏览(38)
  • Flink CEP(三)pattern动态更新

    目录 1.实现分析 2.代码实现 3.测试验证 4.源码地址          线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就

    2024年02月13日
    浏览(33)
  • Flink CEP(三)pattern动态更新(附源码)

    目录 1.实现分析 2.代码实现 3.测试验证 4.源码地址          线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就

    2024年02月04日
    浏览(36)
  • flink1.15 维表join guava cache和mysql方面优化

    优化前  mysql响应慢,导致算子中数据输出追不上输入,导致显示cpu busy:100% 优化后效果两个图对应两个时刻: - - 图中guava cache命中率是通过guava自带统计,打印出来的. 1 guava缓存数据量上限 = 类中配置的guava缓存数据上线 * task个数(即flink并行度) 缓存越久 命中率越高 数据越陈旧

    2024年01月17日
    浏览(31)
  • 性能提升3-4倍!贝壳基于Flink + OceanBase的实时维表服务

    作者介绍: 肖赞,贝壳找房(北京)科技有限公司 OLAP 平台负责人,基础研发线大数据平台部架构师。 贝壳找房是中国最大的居住服务平台。作为居住产业数字化服务平台,贝壳致力于推进居住服务的产业数字化、智能化进程,通过聚合、助力优质服务者,为中国家庭提供

    2024年02月10日
    浏览(27)
  • Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有\\\"仅插入流\\\"(Insert-Only Streams)和\\\"更新日志流\\\"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。 这种麻烦其实是不可避

    2024年02月03日
    浏览(64)
  • 大数据Flink(七十):SQL 动态表 & 连续查询

    文章目录 SQL 动态表 连续查询 一、​​​​​​​SQL 应用于流处理的思路

    2024年02月10日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包