使用解释计划调试 Apache Spark 性能

在数据处理领域,Apache Spark已成为一个强大且多功能的框架。然而,随着数据量和复杂性不断增长,确保最佳性能变得至关重要。 

在这篇博文中,我们将探讨解释 计划如何成为调试和优化 Spark 应用程序的秘密武器。我们将深入探讨 Spark Scala 的基础知识并提供清晰的示例,以帮助您了解如何利用这个有价值的工具。

解释计划是什么?

解释计划是 Spark 处理数据所遵循的逻辑和物理执行步骤的全面细分。将其视为指导您完成 Spark 作业内部运作的路线图。

Spark 解释计划的两个重要组成部分是:

  1. 逻辑计划:逻辑计划代表 Spark 应用程序中指定的高级转换和操作。它是对您想要对数据执行的操作的抽象描述。

  2. 物理计划:另一方面,物理计划提供了 Spark 如何将逻辑计划转化为一组具体操作的具体细节。它揭示了 Spark 如何优化您的工作以获得性能。

Explain API还有一些其他重载方法:

  • explain()- 打印物理计划。

  • explain(extended: Boolean)- 打印计划(逻辑和物理)。

  • explain(mode: String)- 使用给定解释模式指定的格式打印计划(逻辑和物理): 

    • simple仅打印物理计划。

    • extended:打印逻辑计划和物理计划。

    • codegen:打印物理计划并生成代码(如果可用)。

    • cost:打印逻辑计划和统计数据(如果有)。

    • formatted:将解释输出分为两部分:物理计划大纲和节点详细信息。

解释计划的使用

我们可以使用explain()DataFrame 或 Dataset 上的方法来做到这一点。这是使用解释计划的一个简单示例:

import org.apache.spark.sql.SparkSession

// 创建 SparkSession
val spark = SparkSession.builder()
  .appName("ExplainPlanExample")
  .getOrCreate()

// 为员工创建一个虚拟 DataFrame
val employeesData = Seq(
  (1, "Alice", "HR"),
  (2, "Bob", "Engineering"),
  (3, "Charlie", "Sales"),
  (4, "David", "Engineering")
)

val employeesDF = employeesData.toDF("employee_id", "employee_name", "department")

// 为工资创建另一个虚拟 DataFrame
val salariesData = Seq(
  (1, 50000),
  (2, 60000),
  (3, 55000),
  (4, 62000)
)

val salariesDF = salariesData.toDF("employee_id", "salary")

// 将 DataFrame 注册为 SQL 临时表
employeesDF.createOrReplaceTempView("employees")
salariesDF.createOrReplaceTempView("salaries")

// 使用Spark SQL计算每个部门的平均工资
val avgSalaryDF = spark.sql("""
  SELECT department, AVG(salary) as avg_salary
  FROM employees e
  JOIN salaries s ON e.employee_id = s.employee_id
  GROUP BY department
""")


// 使用扩展模式调用解释计划来打印物理和逻辑计划
avgSalaryDF.explain(true)

// 停止 SparkSession
spark.stop()

在上面的示例中,我们创建了一个实例employeeData和salariesData  DataFrame,并执行连接,然后进行聚合以获得部门的平均工资。以下是给定数据框的解释计划。

scala> avgSalaryDF.explain(true)
== Parsed Logical Plan ==
'Aggregate ['department], ['department, 'AVG('salary) AS avg_salary#150]
+- 'Join Inner, ('e.employee_id = 's.employee_id)
   :- 'SubqueryAlias e
   :  +- 'UnresolvedRelation [employees], [], false
   +- 'SubqueryAlias s
      +- 'UnresolvedRelation [salaries], [], false

== Analyzed Logical Plan ==
department: string, avg_salary: double
Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150]
+- Join Inner, (employee_id#133 = employee_id#146)
   :- SubqueryAlias e
   :  +- SubqueryAlias employees
   :     +- View (`employees`, [employee_id#133,employee_name#134,department#135])
   :        +- Project [_1#126 AS employee_id#133, _2#127 AS employee_name#134, _3#128 AS department#135]
   :           +- LocalRelation [_1#126, _2#127, _3#128]
   +- SubqueryAlias s
      +- SubqueryAlias salaries
         +- View (`salaries`, [employee_id#146,salary#147])
            +- Project [_1#141 AS employee_id#146, _2#142 AS salary#147]
               +- LocalRelation [_1#141, _2#142]

== Optimized Logical Plan ==
Aggregate [department#135], [department#135, avg(salary#147) AS avg_salary#150]
+- Project [department#135, salary#147]
   +- Join Inner, (employee_id#133 = employee_id#146)
      :- LocalRelation [employee_id#133, department#135]
      +- LocalRelation [employee_id#146, salary#147]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#135], functions=[avg(salary#147)], output=[department#135, avg_salary#150])
   +- Exchange hashpartitioning(department#135, 200), ENSURE_REQUIREMENTS, [plan_id=271]
      +- HashAggregate(keys=[department#135], functions=[partial_avg(salary#147)], output=[department#135, sum#162, count#163L])
         +- Project [department#135, salary#147]
            +- BroadcastHashJoin [employee_id#133], [employee_id#146], Inner, BuildRight, false
               :- LocalTableScan [employee_id#133, department#135]
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=266]
                  +- LocalTableScan [employee_id#146, salary#147]


scala>

正如您在上面看到的,当extended标志设置为 时true,我们有解析的逻辑计划、分析的逻辑计划、优化的逻辑计划和物理计划。 

在尝试理解计划之前,我们需要自下而上地阅读所有计划。因此我们将在底部看到任何数据帧的创建或读取。

我们将了解其中的每一项: 

解析逻辑计划

这是Spark解析用户提供的SQL或 DataFrame 操作并创建查询的解析表示的初始阶段。使用 Spark SQL 查询时,任何语法错误都会在此处捕获。如果我们在这里观察,列名称尚未解析。

初始阶段示意图

在上面解析的逻辑计划中,我们可以看到 UnresolvedRelation。这意味着架构尚未解决。解析后的逻辑计划概述了查询的逻辑结构,包括聚合和连接操作。employees它还定义别名并标识和DataFrame的源salaries。未解析的关系将在查询执行期间解析为其实际数据源。

分析逻辑计划

解析之后,Spark 会经历一个称为语义分析或解析的过程。在此阶段,Spark 根据可用表和列的目录检查查询,解析列名称,验证数据类型,并确保查询在语义上正确。结果是经过分析的逻辑计划,其中包含有关所涉及的表和列的元数据和信息。

语义分析或解析的过程

该计划表示解析和语义分析后查询的初始逻辑结构。它显示了包含两列的结果架构,department和avg_salary。该计划由两个子查询e,和组成s,它们对应于employees和salariesDataFrame。employee_id内连接操作作为连接键应用于这些子查询之间。该计划还包括别名 (e和s) 以及用于选择特定列的投影操作。

优化的逻辑计划

一旦分析了查询并且 Spark 对数据和模式有了清晰的了解,它就会继续优化查询。在优化过程中,Spark会对查询计划应用各种逻辑优化来提高性能。这可能涉及谓词下推、常量折叠和基于规则的转换等技术。优化的逻辑计划代表了在数据检索和处理方面更高效的查询版本。

1.png

优化阶段简化了计划以获得更好的性能。在这种情况下,计划被简化以删除子查询和不必要的投影操作。它使用连接键直接连接两个本地关系 (employees和salaries) employee_id,然后应用聚合来计算每个部门的平均工资。

物理计划

物理计划也称为执行计划,是查询优化的最后阶段。此时,Spark 会生成一个关于如何在集群上物理执行查询的计划。它考虑了数据分区、数据洗牌和跨节点的任务分配等因素。物理计划是实际执行查询的蓝图,它考虑了可用资源和并行性以有效地执行查询。

2.png

物理计划概述了 Spark 执行查询所采取的实际执行步骤。它涉及聚合、联接和数据扫描,以及广播联接等优化技术以提高效率。该计划反映了 Spark 将遵循的执行策略来计算查询结果。现在,让我们仔细检查每一行以更深入地了解(从下到上)。

  • LocalTableScan:这些是本地表的扫描。在本例中,它们代表表或 DataFrames employee_id#133、department#135、employee_id#146和salary#147。这些扫描从本地分区检索数据。

  • BroadcastExchange:此操作将较小的 DataFrame (employee_id#146和salary#147) 广播到所有工作节点以进行广播连接。它将广播模式指定为HashedRelationBroadcastMode并指示应广播输入数据。

  • BroadcastHashJoin:这是两个数据源(employee_id#133和employee_id#146)之间使用inner join. 它构建连接的右侧,因为它被标记为“BuildRight”。此操作执行广播连接,这意味着它将较小的 DataFrame(右侧)广播到较大的 DataFrame(左侧)所在的所有节点。当一个 DataFrame 明显小于另一个 DataFrame 时,这样做是为了优化目的。

  • 项目:此操作从数据中选择department和列。salary

  • HashAggregate (partial_avg):这是一个部分聚合操作,计算每个部门的平均工资。它包括附加列 sum#162和count#163L,分别表示工资总和和记录数。

  • Exchange hashpartitioning:此操作基于列对数据执行哈希分区department#135。它的目标是将数据均匀分布在 200 个分区中。该ENSURE_REQUIREMENTS属性表明该操作保证了后续操作的要求。

  • HashAggregate:avg(salary#147)这是一个聚合操作,计算列中每个唯一值的平均工资 ( ) department。输出包括两列:department#135和avg_salary#150。

  • AdaptiveSparkPlan:这表示 Spark 查询的顶级执行计划。该属性isFinalPlan=false表明该计划尚未最终确定,这表明 Spark 可能会在执行期间根据运行时统计数据调整该计划。

结论

了解 Spark SQL 生成的执行计划对于开发人员来说非常有价值,体现在以下几个方面:

  • 查询优化:通过检查物理计划,开发人员可以深入了解 Spark 如何优化其 SQL 查询。它可以帮助他们查看查询是否有效地使用可用资源、分区和联接。

  • 性能调优:开发人员可以识别计划中潜在的性能瓶颈。例如,如果他们注意到不必要的改组或数据重新分配,他们可以修改查询或调整 Spark 配置以提高性能。

  • 调试:当查询未产生预期结果或发生错误时,物理计划可以提供有关问题可能出在哪里的线索。开发人员可以查明计划中存在问题的阶段或转换。

  • 高效连接:了解广播连接等连接策略可以帮助开发人员就广播哪些表做出明智的决定。这可以显着减少shuffle并提高查询性能。


文章来源地址https://www.toymoban.com/diary/problem/479.html

到此这篇关于使用解释计划调试 Apache Spark 性能的文章就介绍到这了,更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

原文地址:https://www.toymoban.com/diary/problem/479.html

如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用
大容量分布式数据处理中倾斜数据集的处理方法
上一篇 2023年10月28日 00:46
下一篇 2023年10月28日 01:15

相关文章

  • 什么是API网关,解释API网关的作用和特点?解释什么是数据流处理,如Apache Flink和Spark Streaming的应用?

    API网关是一种在分布式系统中的组件,用于管理不同系统之间的通信和交互。API网关的作用是在不同系统之间提供统一的接口和协议,从而简化系统之间的集成和互操作性。 API网关的特点包括: 路由和分发请求:API网关可以根据请求的URL、方法、参数等信息,将请求分发到

    2024年02月11日
    浏览(47)
  • Apache Spark 练习六:使用Spark分析音乐专辑数据

    本章所分析的数据来自于Kaggle公开的、人工合成的音乐专辑发行数据(https://www.kaggle.com/datasets/revilrosa/music-label-dataset)。以下,我们只针对albums.csv文件进行分析。该数据具体包括以下字段: id: the album identifier; artist_id: the artist identifier; album_title: the title of the album; genre: the

    2024年02月15日
    浏览(63)
  • 使用Apache Spark处理Excel文件的简易指南

    在日常的工作中,表格内的工具是非常方便的x,但是当表格变得非常多的时候,就需要一些特定的处理。Excel作为功能强大的数据处理软件,广泛应用于各行各业,从企业管理到数据分析,可谓无处不在。然而,面对大型且复杂的数据,Excel的处理能力可能力不从心。 对此,

    2024年01月19日
    浏览(46)
  • Azure - 机器学习:使用 Apache Spark 进行交互式数据整理

    关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人智能实验室成员,阿里云认证的资深架构师,项目管理专业人士,上亿营收AI产品研发负责人。 数据整理已经成为机器学习项目中最重要的步骤之一。

    2024年02月08日
    浏览(50)
  • 如何在Amazon EMR上使用RAPIDS加速Apache Spark流水线

    : [Amazon Web Services re:Invent 2023, Rapids Spark, Accelerate Apache Spark Pipelines, Amazon Emr, Rapids, Nvidia, Gpu Acceleration] 本文字数: 1000, 阅读完需: 5 分钟 如视频不能正常播放,请前往bilibili观看本视频。 https://www.bilibili.com/video/BV1uw41187VA RAPIDS加速器可以为Amazon EMR上的Apache Spark数据处理流

    2024年02月04日
    浏览(49)
  • 性能测试工具 ab(Apache Bench)使用详解

    Apache Bench (ab) 是一个由 Apache 提供的非常流行的、简单的性能测试工具,用于对 HTTP 服务器进行压力测试。下面是 ab 工具的一些基本使用方法。 安装 在大多数 Unix 系统中,ab 通常作为 Apache HTTP 服务器的一部分预装在系统中。你可以通过在终端中运行 ab -V 来检查 ab 的版本,

    2024年04月11日
    浏览(40)
  • openGauss学习笔记-259 openGauss性能调优-使用Plan Hint进行调优-指定不使用全局计划缓存的Hint

    259.1 功能描述 全局计划缓存打开时,可以通过no_gpc Hint来强制单个查询语句不在全局共享计划缓存,只保留会话生命周期的计划缓存。 259.2 语法格式 说明: 本参数仅在enable_global_plancache=on时对PBE执行的语句生效。 259.3 示例 dbe_perf.global_plancache_status视图中无结果即没有计划被

    2024年04月13日
    浏览(39)
  • Spark Explain:查看执行计划

    Spark SQL explain 方法有 simple、extended、codegen、cost、formatted 参数,具体如下 从 3.0 开始,explain 方法有一个新的 mode 参数,指定执行计划展示格式 只展示物理执行计划,默认 mode 是 simple spark.sql(sqlstr).explain() 展示物理执行计划和逻辑执行计划 spark.sql(sqlstr).explain(mode=“extended”

    2024年02月16日
    浏览(44)
  • unity 使用模拟器进行Profiler性能调试

    这篇文章主要记录如何实现通过模拟器对打包的app游戏进行Profiler调试。主要记录一些比较重要的点。 首先你要能够打包unity的安卓包,如果没有安装安卓组件,请先安装组件。 安装完成以后,会在unity的安装目录找到相应的SDK 这个platform-tools后面会用到,而这些组件设置了

    2024年02月07日
    浏览(57)
  • Spark面试整理-解释Spark Streaming是什么

    Spark Streaming是Apache Spark的一个组件,它用于构建可扩展、高吞吐量、容错的实时数据流处理应用。Spark Streaming使得可以使用Spark的简单编程模型来处理实时数据。以下是Spark Streaming的一些主要特点: 1. 微批处理架构 微批处理: Spark Streaming的核心是微批处理模型。它将实

    2024年04月13日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包