Spark3 新特性之AQE

这篇具有很好参考价值的文章主要介绍了Spark3 新特性之AQE。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark3 AQE

一、 背景

Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。

二、 Spark 为什么需要AQE? (Why)

RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。RBO实际上算是一种经验主义。

经验主义的弊端就是对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO 是基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO 支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值等。因为有统计数据做支持,所以 CBO 选择的优化策略往往优于 RBO 选择的优化规则。

但是,CBO 也有三个方面的不足:

  • 适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但在其他的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如 Parquet、ORC、CSV 等。

  • 统计信息的搜集效率比较低。对于注册到 Hive Metastore 的数据表,开发者需要调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息,而各类信息的收集会消耗大量时间。

  • 静态优化,RBO、CBO执行计划一旦制定完成,就会按照该计划坚定不移地执行;如果在运行时数据分布发生动态变化,先前制定的执行计划并不会跟着调整、适配。

基于CBO的执行计划
Spark3 新特性之AQE,Spark,大数据,spark

  • Spark parses the query and creates the Unresolved Logical Plan 创建Unresolved Logical Plan
    • Validates the syntax of the query. 验证语法
    • Doesn’t validate the semantics meaning column name existence, data types. 不验证语义、字段是否存在、数据类型
  • Analysis: Using the Catalyst, it converts the Unresolved Logical Plan to Resolved Logical Plan a.k.a Logical Plan. 转换为Logical Plan
    • The catalog contains the column names and data types, during this step, it validates the columns mentioned in a query with catalog.
  • Optimization: Converts Logical Plan into Optimized Logical Plan. 转换为 Optimized Logical Plan
  • Planner: Now it creates One or More Physical Plans from an optimized Logical plan. 创建一个或多个Physical Plans
  • Cost Model: In this phase, calculates the cost for each Physical plan and select the Best Physical Plan. CBO择优
  • RDD Generation: RDD’s are generated, this is the final phase of query optimization which generates RDD in Java bytecode.

三、 AQE 到底是什么?(What)

考虑到 RBO 和 CBO 的种种限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自适应查询执行)。用一句话来概括,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。

基于AQE的执行计划
Spark3 新特性之AQE,Spark,大数据,spark

四、AQE怎么用?(How)

AQE三大特性:自动分区合并 、自动数据倾斜处理、Join 策略调整

4.1 自动分区合并

在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。

那么AQE是如何确定多小的分区需要合并,以及分区合并到多大时停止合并?

对于所有的数据分区,无论大小,AQE 按照分区编号从左到右进行扫描,边扫描边记录分区尺寸,当相邻分区的尺寸之和大于 “推荐尺寸”时,AQE 就把这些扫描过的分区进行合并。然后,继续向右扫描,并采用同样的算法,按照目标尺寸合并剩余分区,直到所有分区都处理完毕。

“推荐尺寸”是由spark.sql.adaptive.advisoryPartitionSizeInBytes设置

配置 说明
spark.sql.adaptive.coalescePartitions.enabled When true and ‘spark.sql.adaptive.enabled’ is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’), to avoid too many small tasks.
开启分区自动合并,默认开启
spark.sql.adaptive.advisoryPartitionSizeInBytes The advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
分区合并后的推荐尺寸,默认为64M
spark.sql.adaptive.coalescePartitions.minPartitionSize The minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing.
可合并分区尺寸大小,默认为1M

假设推荐尺寸为100M,shuffle后每一个分区的大小为70M、30M、80M、90M、10M、20M

按照正常情况(顺序处理),会启动4个reduce task:

第一个处理:70M、30M

第二个处理:80M

第三个处理:90M

第四个处理:10M、20M

spark3.0版本按照上述情况合并之后,各分区数据还是出现了不均衡,从而导致后续计算出现小的数据倾斜

查看spark 3.2官网新增了 spark.sql.adaptive.coalescePartitions.minPartitionSize(合并分区后,最小分区尺寸),如果把该参数设置为和推荐尺寸一致,那是不是只会启动3个 reduce task,3个都处理100M的数据?(个人猜想,不是按照顺序合并,而是会先遍历分区大小,保证合并后的分区大小相近)

You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. 官方建议:分区数不用设置,spark会自动设置合适的分区。

4.2 自动数据倾斜处理

AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个Reduce Task 的工作负载。

AQE 如何判定数据分区是否倾斜呢?它又是怎么把大分区拆分成多个小分区的?

配置 说明
spark.sql.adaptive.skewJoin.enabled When true and ‘spark.sql.adaptive.enabled’ is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions.
开启AQE自动数据倾斜处理
spark.sql.adaptive.skewJoin.skewedPartitionFactor A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes’
判定数据分区是否倾斜,默认值为5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionFactor’ multiplying the median partition size. Ideally this config should be set larger than ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’.
判定数据分区是否倾斜,默认值为256M

判定倾斜分区:大于 median partition size * spark.sql.adaptive.skewJoin.skewedPartitionFactor 且 大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

假设数据表 A 有 3 个分区,分区大小分别是80MB、100MB 和 512MB。这些分区按大小个排序后的中位数是 100MB,因为

skewedPartitionFactor 的默认值是 5 ,skewedPartitionThresholdInBytes默认值是256M,512M大于 100MB * 5 = 500MB且大于256M 的分区被判定为倾斜分区。

拆分倾斜分区:上述例子512M分区会被拆分 512/256 =2个分区

4.3 Join 策略调整

如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。

Join 策略调整指的就是 Spark SQL在运行时动态地将原本的 Shuffle Join 策略,调整为执行更加高效的 Broadcast Join。

具体来说,每当 DAG 中的 Map 阶段执行完毕,Spark SQL 就会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 阶段数据表的存储大小。如果发现基表尺寸小于广播阈值, 那么 Spark SQL 就把下一阶段的 Shuffle Join 调整为 Broadcast Join。

Broadcast Join 以广播的方式将小表的全量数据分发到集群中所有的 Executors,大表的数据就可以与小表数据在Process local级别进行关联操作。本地性级别有 4 种:Process local < Node local < Rack local < Any。

配置 说明
spark.sql.adaptive.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.
可广播的表尺寸阈值,默认10M

五、对比验证

spark3.2 开启AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5390/

spark3.2 关闭AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5812/

spark2.2 无AQE

http://szzb-bg-uat-etl-11:18080/history/application_1665940579703_9257/

5.1 执行耗时

使用资源: --driver-memory 6g --executor-memory 6g --executor-cores 6

耗时
spark3.2 开启AQE 4.3 min
spark3.2 关闭AQE 6.7 min
spark2.2 无AQE > 9.6min (OOM)
5.2 自动分区合并

Spark3 新特性之AQE,Spark,大数据,spark

5.3 自动数据倾斜处理

查看各stage执行时间

  • spark3.2 开启AQE 每个stage执行时间相差不大 (需要看每个stage的tasks )

  • spark2.2 无AQE 每个stage执行时间相差较大 (需要看每个stage的tasks )
    Spark3 新特性之AQE,Spark,大数据,spark

查看执行时间最长的stage数据分布

  • spark3.2 开启AQE Shuffle Read/Write 数据较均衡

  • spark2.2 无AQE Shuffle Read/Write 数据不均衡

Spark3 新特性之AQE,Spark,大数据,spark

六、结论

spark3.2.2 开启AQE(默认开启),当Reduce Task 数据分布参差不齐时,能够自动合并过小的数据分区;且在 Reduce 阶段存在数据倾斜的情况下,能够拆分大分区;通过对比执行时间,AQE能极大的提升任务的执行效率。文章来源地址https://www.toymoban.com/news/detail-620140.html

到了这里,关于Spark3 新特性之AQE的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 启动Spark Shell

    2024年01月24日
    浏览(38)
  • Windows10系统spark3.0.0配置

    Windows10系统基本环境:spark3.0. 0 +hadoop3.1. 0 +scala2.12.0+java jdk1.8。 环境变量配置路径:电脑→属性→高级系统设置→环境变量 path中加入:%JAVA_HOME%/bin。 注:jdk版本不宜过高。 cmd验证: java -version 官方下载网址:https://www.scala-lang.org/ 选择对应版本,这里我选择的是scala2.12.0版本

    2024年04月26日
    浏览(37)
  • Hive3 on Spark3配置

    大数据组件 版本 Hive 3.1.2 Spark spark-3.0.0-bin-hadoop3.2 OS 版本 MacOS Monterey 12.1 Linux - CentOS 7.6 1)Hive on Spark说明 Hive引擎包括:默认 mr 、 spark 、 Tez 。 Hive on Spark :Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。 Spark on Hive :

    2024年02月04日
    浏览(39)
  • 记录《现有docker中安装spark3.4.1》

    基础docker环境中存储hadoop3--方便后续查看 参考:   实践:            

    2024年02月11日
    浏览(38)
  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法

    XGBoost是一种基于决策树的集成学习算法,它在处理结构化数据方面表现优异。相比其他算法,XGBoost能够处理大量特征和样本,并且支持通过正则化控制模型的复杂度。XGBoost也可以自动进行特征选择并对缺失值进行处理。 1、导入相关库 2、加载数据 3、准备特征向量 4、划分

    2023年04月12日
    浏览(35)
  • hive修改spark版本重新编译,hive3.1.3 on spark3.3.0

    我的是hive3.1.3 spark3.3.0(请先将自己的 hive on mr 搭建完场,有简单了解在搞这个) 1.下载hive源码 2. maven编译:mvn clean -DskipTests package -Pdist (idea 编译不行,能行的评论告诉我) 右键 - Git Bash idea打开项目,右键pom 添加成maven项目 修改pom中自己所需依赖的版本

    2023年04月21日
    浏览(52)
  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成朴素贝叶斯分类器

    贝叶斯定理是关于随机事件A和B的条件概率,生活中,我们可能很容易知道P(A|B),但是我需要求解P(B|A),学习了贝叶斯定理,就可以解决这类问题,计算公式如下:     P(A)是A的先验概率 P(B)是B的先验概率 P(A|B)是A的后验概率(已经知道B发生过了) P(B|A)是

    2023年04月12日
    浏览(36)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(44)
  • 服务器编译spark3.3.1源码支持CDH6.3.2

    1、一定要注意编译环境的配置 2、下载连接 3、安装直接解压,到/opt/softwear/文件夹 4、配置环境变量 5、更改相关配置文件 一定注意下面的修改配置 6、修改mvn地址 6.1、如果编译报错栈已经满了修改如下 7、更改 scala版本 8、执行脚本编译 9、打包完在/opt/softwear/spark-3.3.1 有一

    2023年04月15日
    浏览(55)
  • 【五一创作】使用Scala二次开发Spark3.3.0实现对MySQL的upsert操作

    使用Scala二次开发Spark实现对MySQL的upsert操作 在我们的数仓升级项目中,遇到了这样的场景:古人开发的任务是使用DataStage运算后,按照主键【或者多个字段拼接的唯一键】来做 insert then update ,顾名思义,也就是无则插入,有则后一条数据会覆盖前一条。这其实类似于MySQL的

    2024年02月03日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包