Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

这篇具有很好参考价值的文章主要介绍了Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }

    plan transformUp {
      case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
        tryOptimizeSkewedPartitions(stage)
    }
  }

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions),大数据,spark,分布式,spark,大数据,分布式
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions
      ||
      \/
 optimizeSkewedPartitions
      ||
      \/
 ShufflePartitionsUtil.createSkewPartitionSpecs
      ||
      \/
 ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions),大数据,spark,分布式,spark,大数据,分布式
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {
      // When we are going to start a new partition, it's possible that the current partition or
      // the previous partition is very small and it's better to merge the current partition into
      // the previous partition.
      val shouldMergePartitions = lastPartitionSize > -1 &&
        ((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
        (currentPartitionSize < targetSize * smallPartitionFactor ||
          lastPartitionSize < targetSize * smallPartitionFactor))
      if (shouldMergePartitions) {
        // We decide to merge the current partition into the previous one, so the start index of
        // the current partition should be removed.
        partitionStartIndices.remove(partitionStartIndices.length - 1)
        lastPartitionSize += currentPartitionSize
      } else {
        lastPartitionSize = currentPartitionSize
      }
    }
    。。。
    while (i < sizes.length) {
      // If including the next size in the current partition exceeds the target size, package the
      // current partition and start a new partition.
      if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
        tryMergePartitions()
        partitionStartIndices += i
        currentPartitionSize = sizes(i)
      } else {
        currentPartitionSize += sizes(i)
      }
      i += 1
    }
    tryMergePartitions()
    partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100
    val smallPartitionFactor2 = 0.5
    // merge last two partition if their size is not bigger than smallPartitionFactor * target
    val sizeList5 = Array[Long](50, 50, 40, 5)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList5, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

    val sizeList6 = Array[Long](40, 5, 50, 45)
    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
      sizeList6, targetSize, smallPartitionFactor2).toSeq ==
      Seq(0))

这种情况下,就会只有一个reduce任务运行。文章来源地址https://www.toymoban.com/news/detail-851201.html

到了这里,关于Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • hive/spark数据倾斜解决方案

    数据倾斜主要表现在,mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条Key所在的reduce节点所处理的数据量比其他节点就大很多,

    2024年02月11日
    浏览(42)
  • spark 的group by ,join数据倾斜调优

    spark任务中最常见的耗时原因就是数据分布不均匀,从而导致有些task运行时间很长,长尾效应导致的整个job运行耗时很长 首先我们要定位数据倾斜,我们可以通过在spark ui界面中查看某个stage下的task的耗时,如果发现某些task耗时很长,对应要处理的数据很多,证明有数据倾斜

    2024年02月21日
    浏览(43)
  • spark sql 数据倾斜--join 同时开窗去重的问题优化

    背景: 需求:在一张查询日志表中,有百亿数据,需要join上维表,再根据几个字段进行去重 开窗去重和join 一定要分步进行 ,按照需求先做join再开窗,或者去重完成后在进行join。 dwd_tmp1 中存在百亿用户查询日志数据 数据倾斜 数据量超百亿,资源给到200 * 2c * 20G,执行引擎

    2024年02月11日
    浏览(52)
  • 被修饰成单栋的倾斜摄影处理思路

    作者:kele 倾斜摄影数据是三维项目系统中的常客。在某些项目中,为了给倾斜摄影上的建筑赋予属性信息,实现点击建筑高亮并展示属性的功能,客户将倾斜摄影数据进行了模型单体化(使用pdmodeler或者其它软件,将倾斜摄影中的建筑提取成单个单个的对象)。这样处理后

    2023年04月27日
    浏览(37)
  • 如何提高倾斜摄影超大场景的三维模型轻量化处理速度和效率?

     倾斜摄影超大场景的三维模型轻量化处理是将高精度的三维模型进行降采样、简化等处理,以达到减少数据大小和提高渲染性能的目的。为了提高轻量化处理速度,可以从以下方面入手: 1、选择合适的轻量化算法。当前已有很多成熟的三维模型轻量化算法,如基于多分辨率

    2024年02月01日
    浏览(72)
  • Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单实战案例 之六 简单图像倾斜校正处理效果

    目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单实战案例 之六 简单图像倾斜校正处理效果 一、简单介绍 二、简单图像倾斜校正处理效果实现原理 三、简单图像倾斜校正处理效果案例实现简单步骤 四、注意事项 Python是一种跨平台的计算机程序设计语言。是一种面向对

    2024年04月13日
    浏览(62)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(55)
  • OSGB 倾斜摄影数据处理为 3DTiles、I3S、S3M 的流程

    HONG KONG GEODATA STORE 从该网站下载倾斜摄影数据(OSGB)格式。 点击 Notes,可以下载元数据文件(meta.xml) 将下载的 zip 包,逐一解压,放到同级目录下。 解压前: 由于下载的 zip 包名称为 7-NW-9B-1、7-NW-9B-2 等的名称,还不符合 OSGB 倾斜摄影数据处理的文件夹目录结构,需要在解

    2024年02月10日
    浏览(64)
  • 如何实现倾斜摄影三维模型OSGB格式转换3DTILES格式的模型轻量化和格式转换一体化处理?

    如何实现倾斜摄影三维模型OSGB格式转换3DTILES格式的模型轻量化和格式转换一体化处理?  为了实现倾斜摄影三维模型OSGB格式的轻量化和3DTILES格式转换的一体化处理,可以采用以下方法: 1、数据预处理 在进行格式转换之前,需要对OSGB格式的倾斜摄影三维模型进行预处理。

    2024年01月21日
    浏览(61)
  • 谁能讲清楚Spark之Spark逻辑处理流程

            本次主要介绍Spark是如何将应用程序转化为逻辑处理流程的,包括RDD数据模型概念、数据操作概念,以及数据依赖关系的建立规则等。 一.spark处理流程概览 spark 典型的逻辑处理流程包括四部分: 1 数据源:数据源表示的是原始数据,数据可以存放在本地文件系统和分

    2024年02月12日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包