Paimon 与 Spark 的集成(二):查询优化

这篇具有很好参考价值的文章主要介绍了Paimon 与 Spark 的集成(二):查询优化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Paimon

Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Flink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。

Paimon x Spark

‍‍Apache Spark,作为大数据处理的统一计算分析引擎,不仅支持多种语言的高级API使用,也支持了丰富的大数据场景应用,包括结构化数据处理的Spark SQL、用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的Structured Streaming。Spark已经成为了大数据领域软件栈中必不可少的组成部分。对 Paimon 来说,为了在准实时和离线湖仓场景更加便利的落地,与 Spark 深度、全面的集成势在必行。

在之前的Paimon Release版本,我们着重丰富Paimon在功能上和Spark SQL生态的集成,包括Schema Evolution,Structured Streaming Read/Write,Dynamic Insert Overwrite Partition,Update/Merge Into等等。在最近发布的0.6和0.7版本,我们开始在Paimon基于Spark SQL查询性能上做一些工作。在初期我们会结合Spark SQL已有的优化规则和框架,让Paimon充分利用到这些。通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平。下文将对其中的关键优化点进行详细介绍。‍‍

动态分区裁剪

动态分区裁剪(Dynamic Partition Prunning,DPP)在SQL优化中是常见的优化点,本质上是谓词下推(Predicate PushDown)的一种拓展,其目的是最小化从数据源中读取数据的IO成本,也进而减少了计算成本。

在数仓中,常常将较大的事实表和很小的维度表关联查询,且事实表需要根据维表中的字段信息来进行过滤,如下面TpcDS Q14中的SQL片段:

select ss_quantity quantity ,ss_list_price list_price
from store_sales, date_dim
where ss_sold_date_sk = d_date_sk and d_year between 1999 and 1999 + 2
order by quantity limit 10;

在不支持DPP的情况下的执行计划简化如下:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

Paimon应用的是Spark DataSource V2的查询框架,该框架在Spark3.2后提供了 SupportsRuntimeFiltering 接口用于V2表实现运行时的动态过滤。理论上,任何字段(包括普通数据字段和分区字段)的过滤条件都能被应用,但一般而言仅分区字段的过滤条件能够被完全应用,即无需上层的Filter的节点再使用该过滤条件去选择数据。Paimon表通过该接口实现了动态分区裁剪的能力。在支持DPP后执行计划如下所示:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

在1T的TpcDS数据集下,应用DPP后 store_sales 表参与join的数据量从27亿 减少到16亿。仅应用到该优化后,Q14运行时间减少到原来的~55%,1T TpcDS数据集的查询性能整体提升20+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2411

https://github.com/apache/incubator-paimon/pull/2421

Exchange复用

Exchange是Spark中物理计划中一个关键的操作,对应逻辑计划中的Shuffle。在执行阶段,Exchange可以代表某个SQL中部分Plan输出的数据。在复杂的SQL中,我们可以通过公共表表达式(Common Table Expression,CTE)语法定义一个SQL片段,用于简化整个SQL或者被多次使用。以下面简化的TpcDS Q23为例,定义的其中一个CTE frequent_ss_items 在整个SQL中被两次使用。

with frequent_ss_items as (
  select substr(i_item_desc,1,30) itemdesc, i_item_sk item_sk, d_date solddate, count(*) cnt
  from store_sales, date_dim, item
  where ss_sold_date_sk = d_date_sk and ss_item_sk = i_item_sk and d_year in (2000,2000+1,2000+2,2000+3)
  group by substr(i_item_desc,1,30),i_item_sk,d_date
  having count(*) >4
),
max_store_sales as (...),
best_ss_customer as (...)
select sum(sales)
from (
  select cs_quantity*cs_list_price sales
       from catalog_sales
           ,date_dim
       where d_year = 2000
         and d_moy = 2
         and cs_sold_date_sk = d_date_sk
         and cs_item_sk in (select item_sk from frequent_ss_items)
         and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer)
      union all
      select ws_quantity*ws_list_price sales
       from web_sales
           ,date_dim
       where d_year = 2000
         and d_moy = 2
         and ws_sold_date_sk = d_date_sk
         and ws_item_sk in (select item_sk from frequent_ss_items)
         and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer)
) y
limit 100;

显然在执行阶段,我们希望 frequent_ss_items 仅被执行一次,执行后的数据可以缓存,然后分别执行后续和 catalog_sales 以及 web_sales 表的Join操作。针对这个场景,Spark提供了Exchange复用的优化,期待的执行计划简化如下所示:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

但该优化依赖算子Plan中各个物理操作的 hashCode 来确定实际运行时是否可以复用。我们定位并解决了Paimon中存在的实现问题,使得Paimon可以使用到Spark提供的Exchange复用的优化,从而减少不必要的冗余计算,也降低了IO和网络的开销。仅应用到该优化后,Q23运行时间减少到原来的~50%,1T TpcDS数据集的查询性能整体提升13+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2488

动态调整Scan并发

任务实际执行时的并发度是影响作业运行性能的关键之一。Spark提供了 spark.sql.shuffle.partitions 参数来调整Join或者Agg等算子的并发,也提供了自适应查询执行(Adative Query Execution,AQE)框架动态调整并发,但这些都无法影响到读取数据源Scan阶段的并发。

在DataSource V2的框架下,数据源的Scan方式包括并发完全由DataSource自己决定。我们以TpcDS Q19为例:

select  i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
   sum(ss_ext_sales_price) ext_price
 from date_dim, store_sales, item,customer,customer_address,store
 where d_date_sk = ss_sold_date_sk
   and ss_item_sk = i_item_sk
   and i_manager_id=8
   and d_moy=11
   and d_year=1998
   and ss_customer_sk = c_customer_sk 
   and c_current_addr_sk = ca_address_sk
   and substr(ca_zip,1,5) <> substr(s_zip,1,5) 
   and ss_store_sk = s_store_sk 
 group by i_brand
      ,i_brand_id
      ,i_manufact_id
      ,i_manufact
 order by ext_price desc
         ,i_brand
         ,i_brand_id
         ,i_manufact_id
         ,i_manufact
limit 100 ;

其中 customer_addressstore 基于 substr(ca_zip,1,5) <> substr(s_zip,1,5) 条件Join。

在未引入CBO对join重排序的情况下,这两张表通过BroadcastNestedLoopJoin来实现,没有引入Exchange调整Join的并发。执行计划如下图所示:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

在未引入优化之前,由于 customer_address 表的数据分片较小,但任务计算负载较高(数据Join后严重膨胀),整体执行性能很差。

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

Paimon根据这种问题提供了基于当前作业的可用core数来动态调整数据源的数据分片的能力,也进而调整并发,从而提升查询效率。

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

仅应用该优化后,Q19运行时间减少到原来的~25%,1T TpcDS数据集的查询性能整体提升14+%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2482

合并标量子查询

类似于Exchange复用,合并标量子查询优化会遍历整个SQL逻辑执行计划,提取出标量子查询(ScalarSubQuery),尝试将多个标量子查询合并起来,使得仅执行一次子查询得到多个标量值。

我们以TpcDS Q9的片段为例,整个Q9由5个case-when语句构成。

select case when (select count(*) 
                  from store_sales 
                  where ss_quantity between 1 and 20) > 74129
            then (select avg(ss_ext_discount_amt) 
                  from store_sales 
                  where ss_quantity between 1 and 20) 
            else (select avg(ss_net_paid)
                  from store_sales
                  where ss_quantity between 1 and 20) end bucket1
from reason
where r_reason_sk = 1;

在该SQL中,case when 的条件,thenelse 语句三个部分使用同样的过滤条件读取同一张表,仅聚合表达式不同。在没有应用到这个优化的情况下,执行计划如下所示:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

Spark本身提供了 MergeScalarSubQueries 的优化规则,但从实现上没法更好的对接到Paimon这样的DataSource V2表,因此我们在Paimon侧单独实现,并通过Spark提供的Extensions的接口将Paimon自实现的优化注入到了Spark优化器中。在应用该优化后,执行计划如下所示:

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

由此可见,合并标量子查询优化有效的减少了冗余的计算,提升了Paimon在该场景下的查询性能。仅应用该优化后,Q9运行时间减少到原来的~57%。

相关代码:

https://github.com/apache/incubator-paimon/pull/2657

Cost-Based优化

Spark SQL允许使用基于成本的优化(Cost-Based Optimizer,CBO)来提升查询性能,主要用于多路Join的场景,使用动态规划算法来选择Cost最低的Join顺序。要想使得这个优化能更有效,依赖于计算Cost的模型,以及表的表级和列级统计信息的收集,而其中列级统计信息在评估Plan算子节点的运行时统计信息中尤为重要。

新版本的Paimon在元数据中增加了statistics的信息,可以通过原生的Spark Analyze命令完成收集,并对接到了Spark SQL,使得Spark SQL可以利用Paimon的表级/列级信息进行查询优化。我们以TpcDS Q24a为例:

with ssales as
(select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
from store_sales, store_returns, store, item, customer, customer_address
where ss_ticket_number = sr_ticket_number
  and ss_item_sk = sr_item_sk
  and ss_customer_sk = c_customer_sk
  and ss_item_sk = i_item_sk
  and ss_store_sk = s_store_sk
  and c_current_addr_sk = ca_address_sk
  and c_birth_country <> upper(ca_country)
  and s_zip = ca_zip
and s_market_id=8
group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size)
select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
from ssales
where i_color = 'peach'
group by c_last_name, c_first_name, s_store_name
having sum(netpaid) > (select 0.05 * avg(netpaid) from ssales)
order by c_last_name, c_first_name, s_store_name;

其中CTE ssales 的部分,仅提供表级统计信息的情况下的执行计划大致如下所示,包括两个SortMergeJoin,其中左侧虚线框更是大数据量间的Join操作,严重影响性能。

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

而执行Analyze提供了列级统计信息后执行计划大致如下所示,对参与Join的表进行了重排序,且所有Join都是BroadcastHashJoin的方式执行。

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

Paimon提供了完整的statistics,借助于CBO框架,不仅可以提升相应的查询性能,也可以使得在正常资源配置下无法跑通的SQL能够正常运行,比如TpcDS Q72。在之前优化项基础上叠加应用该优化后,Q24运行时间减少到原来的~23%,1T TpcDS数据集的查询性能整体提升~30%;

相关代码:

https://github.com/apache/incubator-paimon/pull/2677

https://github.com/apache/incubator-paimon/pull/2752

https://github.com/apache/incubator-paimon/pull/2798

优 化 效 果

本文使用阿里云 EMR 5.16.0版本,集群节点的属性如下:

  • master: 1 * ecs.g7.8xlarge 32 vCPU 128 GiB

  • core: 6 * ecs.g7.8xlarge 32 vCPU 128 GiB

使用的组件及版本如下:

  • Paimon: 0.8-SNAPSHOT (对应到commit:193df7345aa520f8b45125cdd85588a91a3fc3a9)

  • Spark: 3.3.1 (额外cherry-pick SPARK-41378,以支持DataSource V2下的stats相关功能)

启用的Spark相关配置:

spark.executor.cores

4

spark.executor.memory

14g

spark.executor.memoryOverhead

2g

spark.dynamicAllocation.enabled

true

spark.sql.cbo.enabled

true

spark.sql.cbo.joinReorder.enabled

true

spark.sql.autoBroadcastJoinThreshold

128m

Paimon表选用append表(无主键表),使用parquet作为文件格式,设置bucket=-1(最新代码已经默认设置:PAIMON-2829),这样便于和Spark parquet表进行对比。

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式

上图为我们使用parquet表(带有表级统计信息,即rowCount和sizeInByte两个指标)作为基准,以此向右分别为优化前和应用这些优化后的Paimon表(仅带表统计信息),以及Parquet表和Paimon表在收集到Column级别统计信息时的查询较基准的性能对比。

对比可见,在一般情况下(无column级统计信息)优化后的Paimon和Parquet已经基本持平。开启column级统计信息后,Paimon较Parquet慢~8%,这中间的差距也将是性能优化继续跟进的方向之一。

后 续 规 划 

在湖仓体系下,我们认为读写查询优化一直是一项任重而道远的事情。当前的优化主要集中在让Paimon充分利用到Spark SQL现有的优化规则或者优化框架。在继续推进的同时,我们也会利用Paimon自身的特性,比如Index或者Clustering等,以及优化Scan等进一步提升Paimon性能。

另外,在当前湖仓场景下,依然有很多无主键表的使用,后续对append表支持Upsert能力也是重要的规划之一。

< 往 期 精 彩 推 荐 >

Paimon 与 Spark 的集成(二):查询优化,spark,大数据,分布式


▼ 关注「Apache Spark 技术交流社区」,获取更多技术干货 ▼文章来源地址https://www.toymoban.com/news/detail-838055.html

到了这里,关于Paimon 与 Spark 的集成(二):查询优化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(66)
  • 大数据开发之Spark(RDD弹性分布式数据集)

    rdd(resilient distributed dataset)叫做弹性分布式数据集,是spark中最基本的数据抽象。 代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 1.1.1 rdd类比工厂生产 1.1.2 wordcount工作流程 1、一组分区(partition),即是数据集的基本组成单位,

    2024年01月24日
    浏览(65)
  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(50)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(49)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(104)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(58)
  • 04_Hudi 集成 Spark、保存数据至Hudi、集成Hive查询、MergeInto 语句

    本文来自\\\"黑马程序员\\\"hudi课程 4.第四章 Hudi 集成 Spark 4.1 环境准备 4.1.1 安装MySQL 5.7.31 4.1.2 安装Hive 2.1 4.1.3 安装Zookeeper 3.4.6 4.1.4 安装Kafka 2.4.1 4.2 滴滴运营分析 4.2.1 需求说明 4.2.2 环境准备 4.2.2.1 工具类SparkUtils 4.2.2.2 日期转换星期 4.2.3 数据ETL保存 4.2.3.1 开发步骤 4.2.3.2 加载CS

    2024年02月13日
    浏览(46)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(65)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(64)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包