使用 Alluxio 优化 EMR 上 Flink Join

这篇具有很好参考价值的文章主要介绍了使用 Alluxio 优化 EMR 上 Flink Join。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

业务背景&痛点

  • 流式处理的业务场景,经常会遇到实时消息数据需要与历史存量数据关联查询或者聚合,比如电商常见的订单场景,订单表做为实时事实表,是典型的流式消息数据,通常会在 kafka 中,而客户信息,商品 SKU 表是维度表,通常存在业务数据库或者数仓中,是典型的离线数据。实时订单数据在实时处理时通常需要事实表与维度表 join 做 reference 补全,以便拿到订单详情并实时统计当天或截至当天的所有订单的商品分布详情。
亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活动与竞赛等。帮助中国开发者对接世界最前沿技术,观点,和项目,并将中国优秀开发者或技术推荐给全球云社区。如果你还没有关注/收藏,看到这里请一定不要匆匆划过,点这里(https://dev.amazoncloud.cn/user/register?show=tab1&sc_channel=CSDN)让它成为你的技术宝库!
  • 流式计算通常采用 Flink 做为数据处理平台,上文中提到的实时和离线数据join 的场景,Flink 提供了 Hive/ jdbc/Hudi/ filesystem 各种 connector 实现与离线数据的提取和读写,这样一来在 Flink 应用程序中,即可使用 Table,Sql API 来 join 关联流态表和离线表数据,实现聚合计算等操作

使用 Flink Sql 离线表 Join 流态表的常规 lookup join,是通过 Flink hive sql connector 或者 filesystem connector,对离线 hive 库表或者 S3上离线数据建 Flink Table,然后对 kafka 消息流中的数据建流态表,然后直接做量表做 join 操作

该方式架构如下图所示:

使用 Alluxio 优化 EMR 上 Flink Join

该方式主要面临的问题是:

  • lookup 维度表数据只会在首次拉起 Flink 应用的时候,保存在 task manager state 中,后续持续查询或者开窗聚合等操作时,是不会再次拉取维度表数据,业务需要定期重启 Flink 应用,或者刷新维度表数据到临时表,以便 join 聚合时和最新的维度数据关联:

使用 Alluxio 优化 EMR 上 Flink Join

  • 每次需要重新全量拉取维度表数据,存在冷启动问题,且维度表数据量大的时候(如上千万注册用户信息表,上万的商品 SKU 属性字段),造成很大 IO 开销,存在性能瓶颈
  • Flink 的 checkpoint 机制在持续查询或者开窗聚合时,需要保存 state 状态及处理数据到检查点快照中,造成 state 快照数据膨胀

解决方案思路

基于以上业务难点,本文提出一种解决方案思路,即通过 Alluxio 缓存层,将 hive 维度表数据自动加载至 Alluxio UFS 缓存中,同时通过 Flink 时态表 join,把维度表数据做成持续变化表上某一时刻的视图

同时使用 Flink 的 Temporal table function 表函数,传递一个时间参数,返回 Temporal table 这一指定时刻的视图,这样实时动态表主表与这个 Temporal table 表关联的时候,可以关联到某一个版本(历史上某一个时刻)的维度数据

优化后的整体架构如下图所示:

使用 Alluxio 优化 EMR 上 Flink Join

方案实施落地Detail

本文以 Kafka 中用户行为日志数据做为实时流态的事实表数据,hive 上用户信息数据做为离线维度表数据,采用 Alluxio+Flink temproal 的 demo,来验证其 flink join 优化的解决方案

实时事实表

本实例中我们使用 json-data-generator 开源组件模拟的用户行为 json 数据,实时写入 kafka 中,通过 Flink kafka connector 转换为持续查询的 Flink 流态表,从而做为实时 join 的时候的 Fact 事实表数据

用户行为 json 模拟数据如下格式:

[{          "timestamp": "nowTimestamp()",
                    "system": "BADGE",
                    "actor": "Agnew",
                    "action": "EXIT",
                    "objects": ["Building 1"],
                    "location": "45.5,44.3",
                    "message": "Exited Building 1"
                }]

包含用户行为的业务时间,登录系统,用户署名,行为 activity 动作,操作涉及对象,位置信息,及相关文本消息字段。我们在

Flink Sql 中建选择主要字段建事实表如下

CREATE TABLE logevent_source (`timestamp`  string, 
`system` string,
 actor STRING,
 action STRING
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup6',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);

Alluxio 缓存维度表

Alluxio 是大数据技术堆栈的分布式缓存,它提供了一个统一的 UFS 文件系统可以对接底层 S3,hdfs 数据,在读写 Alluxio UFS 的时候,可以针对 S3,HDFS 分布式存储层实现 warm up,显著提升吞吐量和减少网络开销,且与上层计算引擎如 Hive,spark,Trino 都有深度的集成,很适合做为离线维度数据的缓存加速器

Amazon EMR 对 Alluxio 提供了良好的集成,可以通过 boostrap 启动脚本方式,在 EMR 创建时自动部署 Alluxio 组件并启动 Alluxio master、worker 进程,详细 EMR 安装和部署 Alluxio 步骤可以参考另一篇文章 Alluxio EMR 集成实践

在集成 Alluxio 的 Amazon EMR 集群中,使用 Alluxio 中创建 hive 离线维表数据的缓存表方法如下:

hive-env.sh中设置设置client jar包:
$ export HIVE_AUX_JARS_PATH=/<PATH_TO_ALLUXIO>/client/alluxio-2.2.0-client.jar:${HIVE_AU

确保安装部署alluxio的EMR集群上ufs已配置,并且表或者db路径已创建
alluxio fs mkdir alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
alluxio fs chown hadoop:hadoop alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer

在AWS EMR集群上,创建hive表路径指向alluxio namespace uri:
!connect jdbc:hive2://xxx.xxx.xxx.xxx:10000/default;
hive> CREATE TABLE customer(
    c_customer_sk             bigint,
    c_customer_id             string,
    c_current_cdemo_sk        bigint,
    c_current_hdemo_sk        bigint,
    c_current_addr_sk         bigint,
    c_first_shipto_date_sk    bigint,
    c_first_sales_date_sk     bigint,
    c_salutation              string,
    c_first_name              string,
    c_last_name               string,
    c_preferred_cust_flag     string,
    c_birth_day               int,
    c_birth_month             int,
    c_birth_year              int,
    c_birth_country           string,
    c_login                   string,
    c_email_address           string
)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    STORED AS TEXTFILE
    LOCATION 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer';
OK
Time taken: 3.485 seconds

如上所示,该 Alluxio 表 location 指向的路径即为 hive 维度表所在 S3路径,因此对 Customer 用户维度信息表的写入操作会自动同步到 alluxio 缓存中。

创建好 Alluxio hive 离线维度表后,在 flink sql中,可以通过 hive 的 catalog,连接到 hive 元数据,即可以查看到 alluxio 缓存表的详细信息:

CREATE CATALOG hiveCatalog WITH (  'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf/',
    'hive-version' = '3.1.2',
    'hadoop-conf-dir'='/etc/hadoop/conf/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hiveCatalog;
show create table customer;
create external table customer(
    c_customer_sk             bigint,
    c_customer_id             string,
    c_current_cdemo_sk        bigint,
    c_current_hdemo_sk        bigint,
    c_current_addr_sk         bigint,
    c_first_shipto_date_sk    bigint,
    c_first_sales_date_sk     bigint,
    c_salutation              string,
    c_first_name              string,
    c_last_name               string,
    c_preferred_cust_flag     string,
    c_birth_day               int,
    c_birth_month             int,
    c_birth_year              int,
    c_birth_country           string,
    c_login                   string,
    c_email_address           string
) 
row format delimited fields terminated by '|'
location 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer' 
TBLPROPERTIES (
  'streaming-source.enable' = 'false',  
  'lookup.join.cache.ttl' = '12 h'
)

如上图所示,可以看到该维度表 location 路径是 alluxio 缓存 ufs 路径的 uri,业务程序读写该维度表时,alluxio 会自动更新缓存中的 customer 维度表数据,并异步写入到 alluxio的backend storage 的 S3表路径,实现数据湖的表数据同步更新。

Flink Temporal 时态表 join

Flink 时态表(Temporal table)也是动态表的一种,时态表的每条记录都会有一个或多个时间字段相关联,当我们事实表 join 维度表的时候,通常需要获取实时的维度表数据做 lookup,所以通常需要在事实表 create table 或者 join 时,通过 proctime()函数指定事实表的时间字段,同时在 join 时,通过 FOR SYSTEM_TIME AS OF 语法,指定维度表 lookup 时对应的事实表时间版本的数据

在本 Demo 示例中,客户信息在 hive 离线表作为一个变化的维度表的角色,客户行为在 kafka 中作为事实表的角色,因此在 flink kafka source table 中,通过 proctime()指定时间字段,然后在 flink hive table 做 join 时,使用 FOR SYSTEM_TIME AS OF 指定 lookup 的 kafka source table 的时间字段,从而实现 Flink temporal 时态表 join 业务处理

如下所示,Flink Sql 中通过 Kafka connector 创建用户行为的事实表,其中 ts 字段即为时态表 join 时的时间戳:

CREATE TABLE logevent_source (`timestamp`  string, 
`system` string,
 actor STRING,
 action STRING,
 ts as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup-01',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);

Flink 离线维度表与流式实时表具体 join 方法如下:

select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from 
      (select *, proctime() as proctime from user_logevent_source) as a 
left join customer  FOR SYSTEM_TIME AS OF a.proctime as b on a.actor=b.c_last_name;

如上代码示例,在事实表 logevent_source join lookup 维度表时,通过 proctime 函数获取到维度表的瞬时最新的版本数据,保障 join 时的一致性和实时性

同时,该维度表数据已经在 alluxio cache,因此读取时性能远高于离线读取 s3上的表数据

通过 hive 切换 S3和 alluxio 路径的 customer 信息 维度表,对比测试 flink join 可以看出 alluxio 缓存后性能明显优势

通过 alter table 方便切换本地和 cache 的 location路径:

alter table customer set location "s3://xxxxxx/data/s3/30/customer";
alter table customer  set location "alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer";

选取某一 split 数据分片的 TaskManager 日志:

  • cache 前(S3路径读取): 5s 加载
2022-06-29 02:54:34,791 INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem           [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 02:54:39,971 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Loaded 433000 row(s) into lookup join cache
  • cache 后(alluxio 读取): 2s 加载
2022-06-29 03:25:14,476 INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem           [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 03:25:16,397 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Loaded 433000 row(s) into lookup join cache

在 JobManager 上查看 Timeline,对比 alluxio 和 s3路径下 job 的执行时间可以看到更加清楚

 使用 Alluxio 优化 EMR 上 Flink Join

可以看到, 单个 task 查询提升1倍以上,整体 job 性能提升更加明显

其他需要考虑的问题

持续 Join 每次都需要拉取维度数据做 join,Flink 的 checkpoint state 是否一直膨胀导致 TM 的 RockDB 撑爆或者内存溢出?

state 自带有 ttl 机制,可以设置 ttl 过期策略,触发 Flink 清理过期 state 数据,Flink Sql 可以通过 Hint 方式设置

insert into logevent_sink
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from 
(select *, proctime() as proctime from logevent_source) as a 
  left join 
customer/*+ OPTIONS('lookup.join.cache.ttl' = '5 min')*/  FOR SYSTEM_TIME AS OF a.proctime as b 
on a.actor=b.c_last_name;

Flink Table/Streaming API 类似:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter() 
    .build();
ValueStateDescriptor<Long> lastUserLogin = 
    new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);

设置后重新启动 lookup join,从 Flink TM 日志中可以看到,ttl 到期后,会触发清理并重新拉取 hive 维表数据:

2022-06-29 04:17:09,161 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   
[] - Lookup join cache has expired after 5 minute(s), reloading

此外,可以通过配置 flink state retain,减少 checkpoint 时候快照数量,从而减少快照时候 state 的占用空间

Flink job中配置:
-D state.checkpoints.num-retained=5

设置后,可以看到 s3 checkpoint 路径上,Flink Job 会自动清理历史快照,只保留最近的5次快照数据,从而确保 checkpoint 快照数据不会堆积

[hadoop@ip-172-31-41-131 ~]$ aws s3 ls s3://salunchbucket/data/checkpoints/7b9f2f9becbf3c879cd1e5f38c6239f8/
                           PRE chk-3/
                           PRE chk-4/
                           PRE chk-5/
                           PRE chk-6/
                           PRE chk-7/

附录

Alluxio整体架构

Alluxio on EMR 快速部署

在 Amazon EMR 中利用 Alluxio 的分层存储架构

EMR Alluxio集成detail

Flink Temporal Join 详细

本篇作者

 使用 Alluxio 优化 EMR 上 Flink Join

唐清原

Amazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验

使用 Alluxio 优化 EMR 上 Flink Join

陈昊

Amazon 合作伙伴解决方案架构师,有将近 20 年的 IT 从业经验,在企业应用开发、架构设计及建设方面具有丰富的实践经验。目前主要负责 Amazon (中国)合作伙伴的方案架构咨询和设计工作,致力于 Amazon 云服务在国内的应用推广以及帮助合作伙伴构建更高效的 Amazon 云服务解决方案。

文章来源:https://dev.amazoncloud.cn/column/article/6309af45d4155422a4610a40?sc_channel=CSDN

 文章来源地址https://www.toymoban.com/news/detail-407258.html

 

 

 

到了这里,关于使用 Alluxio 优化 EMR 上 Flink Join的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 怎样使用GPT案例:使用GPT获得OPPO终止ZEKU芯片业务需要的背景知识

    以下引用 @郑昀的微博: #IT那些事儿# 神仙打架那些事儿 主角:美国公司高通(总部在加州),英国公司ARM(总部在剑桥,但2016年被孙正义收购,2022年孙正义从软银退隐之后将全身心投入到ARM的变革工作中) 第一回合:2021年1月13日,高通希望通过收购 Nuvia 推出自研 CPU 架构

    2024年02月06日
    浏览(44)
  • MSQL系列(十三) Mysql实战-left/right/inner join 使用详解及索引优化

    Mysql实战-left/right/inner join 使用详解及索引优化 前面我们讲解了B+Tree的索引结构,也详细讲解下Join的底层驱动表 选择原理,今天我们来了解一下为什么会出现内连接外连接,两种连接方式,另外实战一下内连接和几种最常用的join语法 Left join 左表 left join 右表查询 right join 左

    2024年02月05日
    浏览(50)
  • alluxio简单使用

    本文是基于alluxio官网和自己实践整理。 Alluxio版本:1.8.1 CDH 1.15.2 以内存为中心的分布式虚拟存储系统。Alluxio在上层计算框架和底层存储系统之间架起了桥梁,应用层只需要访问Alluxio即可以访问底层对接了的任意存储系统的数据。作者是李浩源/范斌,都是中国人,所以官网

    2024年02月04日
    浏览(26)
  • golang使用高阶函数优化业务功能

            两个接口(新增Tag和更新Tag),在业务层均需要添加两个校验,校验Tag信息是否重复和Tag的数据中的编码是否重复。         对应的增加两个校验的函数/方法,在接口实现中依次调用两个函数/方法进行校验。         实现简单;但重复代码多,后期再增加其他

    2024年02月07日
    浏览(47)
  • 大数据Flink(九十):Lookup Join(维表 Join)

    文章目录 Lookup Join(维表 Join) Lookup Join 定义(支持 BatchStreaming) :Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

    2024年02月04日
    浏览(43)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(60)
  • Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 详解

    Flink ⽀持⾮常多的数据 Join ⽅式,主要包括以下三种: 动态表(流)与动态表(流)的 Join 动态表(流)与外部维表(⽐如 Redis)的 Join 动态表字段的列转⾏(⼀种特殊的 Join) 细分 Flink SQL ⽀持的 Join: Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join Interval Joi

    2024年02月04日
    浏览(52)
  • 直击运维痛点,大数据计算引擎 EasyMR 的监控告警设计优化之路

    当企业的业务发展到一定的阶段时,在系统中引入监控告警系统来对系统/业务进行监控是必备的流程。没有监控或者没有一个好的监控,会导致开发人员无法快速判断系统是否健康;告警的实质则是“把人当服务用”,用告警通知人的方式去干预系统达到修正的目的。 监控

    2024年02月14日
    浏览(39)
  • 业务数据同步工具介绍和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介绍 Sqoop : SQ L-to-Had oop ( Apache已经终止Sqoop项目 ) 用途:把关系型数据库的数据转移到HDFS(Hive、Hbase)(重点使用的场景);Hadoop中的数据转移到关系型数据库中。Sqoop是java语言开发的,底层使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是数据块的转移,没有使

    2024年02月15日
    浏览(81)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包