Spark AQE源码探索

这篇具有很好参考价值的文章主要介绍了Spark AQE源码探索。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

介绍

AQE全称是Adaptive Query Execution,官网介绍如下

Performance Tuning - Spark 3.5.0 Documentation

AQE做了什么

AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

特性

  1. 自动分区合并:在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。
  2. Join 策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。
  3. 自动倾斜处理:结合配置项,AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个 Reduce Task 的工作负载。

AQE开启前后对比:

在非AQE的情况下,Spark会在规划阶段确定了物理执行计划后,根据每个算子的定义生成RDD对应的DAG。然后 Spark DAGScheduler通过shuffle来划分RDD Graph并创建stage,然后提交Stage以供执行

AQE:

首先会将逻辑树拆分为多个QueryStages, 在执行时先将它的子 QueryStages 被提交(先提交mapStage),收集它们的MapOutputStatistics对象。根据收集到的 shuffle 数据统计信息,将当前 QueryStage 的执行计划优化为更好的执行计划。然后转换为DAG图再执行Stage

源码

策略源码

AQE做了这么多事情,肯定有对应的执行方法(AdaptiveSparkPlanExec类),介绍几个核心的

规则方法 描述
AdaptiveExecutionRule 所有自适应执行规则的基类。所有的规则都需要继承这个类,并实现applyInternal方法来对物理计划进行优化
CustomShuffleReaderExec 用于处理倾斜的join和shuffle分区合并
CoalesceShufflePartitions 动态合并shuffle分区
OptimizeSkewedJoin 优化倾斜的join
OptimizeLocalShuffleReader 优化本地shuffle读取的策略

AQE执行源码

首先AQE 只作用在 exchange 阶段,即需要发生数据交换的阶段,spark AQE 优化都是发生在 shuffle map 之后。

核心类:AdaptiveSparkPlanExec,AQE的主要执行类,它会首先执行初始物理计划,并且在执行过程中收集有关数据统计信息,然后基于这些信息对物理计划进行优化

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

Spark经典的doExecute()

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

获取更新后的plan数

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

getFinalPhysicalPlan 是核心方法

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

看这行代码

var result = createQueryStages(currentPhysicalPlan)

将原始的物理计划通过 createQueryStages 方法进行替换==========================

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化


对参数中传递的 sparkplan 的所有节点进行自下而上的递归。根据节点类型的不同进行针对处理

Exchange 节点:Exchange 节点被替换为 QueryStageExec ,QueryStageExec是每个查询阶段的抽象基类

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

最后调用CreateStageResult,参数中包含 QueryStageExec,同时 allChildStagesMaterialized 参数表示当前 plan 的所有子节点的输出是否已经具像化

========================== createQueryStages 返回 CreateStageResult ,会在 AQE 的循环中判断是否可以结束。这部分逻辑又回到 getFinalPhysicalPlan 中

然后也会替换plan,replaceWithQueryStagesInLogicalPlan和reOptimize都很关键,一个一个看

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

replaceWithQueryStagesInLogicalPlan 做了

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

再返回上诉代码,再看看reoptimize干了什么

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

这个里面其实又有策略执行了

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

返回主程序,然后接着往下走

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

然后这个方法会执行策略

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

Spark AQE源码探索,# 源码探索文章迁移,spark,AQE,Spark源码,Spark优化

大致上是这样的流程,当然还有很多细节,代码还是很庞大的,只能给大家看看整体流程和关键的方法,其余有深入可以关注我的公众号一起探讨

总体来看还是很多固定策略的,可以根据上一stage的write的统计信息去动态修改下游的执行计划

闲聊

spark的树,在aqe节点有两个num partition,但是会出现两者不一致且有时会相差很大,即AQE不被采用,源码可以看到实际执行和打印日志确实是独立的,但是没搞明白这个设计的特殊含义文章来源地址https://www.toymoban.com/news/detail-830015.html

到了这里,关于Spark AQE源码探索的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 13.108.Spark 优化、Spark优化与hive的区别、SparkSQL启动参数调优、四川任务优化实践:执行效率提升50%以上

    13.108.Spark 优化 1.1.25.Spark优化与hive的区别 1.1.26.SparkSQL启动参数调优 1.1.27.四川任务优化实践:执行效率提升50%以上 1.1.25.Spark优化与hive的区别 先理解spark与mapreduce的本质区别,算子之间(map和reduce之间多了依赖关系判断,即宽依赖和窄依赖。) 优化的思路和hive基本一致,比较

    2024年02月10日
    浏览(56)
  • Spark优化和问题

    在配置SparkSQL任务时指定executor核心数 建议为4 (同一 executor [进程]内内存共享,当数据倾斜时,使用相同核心数与内存量的两个任务, executor总量少 的任务不容易OOM,因为单核心最大可用内存大.但是并非越大越好,因为单个exector最大core受服务器剩余core数量限制, 过大的core 数量可能

    2024年01月21日
    浏览(32)
  • 实战干货|Spark 在袋鼠云数栈的深度探索与实践

    Spark 是一个快速、通用、可扩展的大数据计算引擎,具有高性能、易用、容错、可以与 Hadoop 生态无缝集成、社区活跃度高等优点。在实际使用中,具有广泛的应用场景: · 数据清洗和预处理:在大数据分析场景下,数据通常需要进行清洗和预处理操作以确保数据质量和一致

    2024年04月26日
    浏览(47)
  • Spark Streaming实战与优化

    作者:禅与计算机程序设计艺术 Spark Streaming 是 Apache Spark 的一个模块,可以用于对实时数据流进行快速、高容错的处理。它允许用户开发高吞吐量、复杂的实时分析应用程序。Spark Streaming 可以与 Apache Kafka 或 Flume 等工具进行集成,从而实现实时数据采集和 ETL(Extract-Transfo

    2024年02月06日
    浏览(41)
  • 【Spark源码分析】Spark的RPC通信一-初稿

    在 RpcEnv 中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类 NettyRpcEnv 。 RpcEndpoints 需要向 RpcEnv 注册自己的名称,以便接收信息。然后, RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息,并将它们传送到相应的 RpcEndpoints 。对于 RpcEnv 捕捉

    2024年02月04日
    浏览(42)
  • 【Spark源码分析】Spark的RPC通信二-初稿

    传输层主要还是借助netty框架进行实现。 TransportContext 包含创建 TransportServer 、 TransportClientFactory 和使用 TransportChannelHandler 设置 Netty Channel 管道的上下文。 TransportClient 提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在 TransportContext 的范围之外进行(

    2024年02月03日
    浏览(48)
  • 大数据学习之Spark性能优化

    窄依赖(Narrow Dependency):指父RDD的每个分区只被子RDD的一个分区所使用,例如map、filter等这些算子。一个RDD,对它的父RDD只有简单的一对一的关系,也就是说,RDD的每个partition仅仅依赖于父RDD中的一个partition,父RDD和子RDD的partition之间的对应关系,是一对一的。 宽依赖(Shuffl

    2024年02月04日
    浏览(50)
  • spark sql官网优化指南

    缓存数据 调整参数 把数据缓存到内存,spark sql能够只扫描需要列并且会自动压缩数据,占用最小的内存和减小GC压力。这无需多言,内存远远要快于磁盘,spark效率比hive高这个就是一个主要原因。 缓存数据代码 释放缓存 用完后一定要记得释放掉,不要空占的内存浪费资源。

    2024年02月19日
    浏览(36)
  • Spark SQL优化:NOT IN子查询优化解决

    文章最前 : 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。  相关文章: LEFT ANTI JOIN的使用

    2024年02月05日
    浏览(55)
  • 深入理解 Spark(一)spark 运行模式简介与启动流程源码分析

    以 standalone-client 为例,运行过程如下: SparkContext 连接到 Master,向 Master 注册并申请资源(CPU Core 和 Memory); Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,然后启动 StandaloneExecutorBackend; Stan

    2024年02月02日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包