Spark Shuffle 过程

这篇具有很好参考价值的文章主要介绍了Spark Shuffle 过程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,就有可能让 Job 执行效率上更好。

在 Spark 1.2 以前,默认的 Shuffle 计算引擎是 HashShuffleManager。该 ShuffleManager 会产生大量的中间磁盘文件,进而由大量的磁盘 IO 操作影响了性能。

到 Spark 1.2 以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager。 SortShuffleManager 相较于 HashShuffleManager 来说进行了一定的改进。主要就在于,每个 Task 在进行 Shuffle 操作时,会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并 (merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面是对 HashShuffleManager 和 SortShuffleManager 的分析与原理阐述。

Spark Shuffle

HashShuffleManager

未被优化的HashShuffleManager

Spark Shuffle 过程

注:假设每个Executor 只有 1 个 CPU core,无论 Executor 上分配多少个 Task 线程,同一时间都只能执行一个 Task 线程。

  1. 首先 Shuffle Write 阶段,主要就是在一个 Stage 结束计算之后,为了下一个 Stage 可以执行 Shuffle 类的算子(比如 reduceBy Key ),而将每个 Task 处理的数据按 Key 进行分类。就是对相同的 Key 执行 Hash算法,从而将相同 Key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 Stage 的一个 Task 。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去;
  2. 每个执行 Shuffle Write 的 Task ,要为下一个 Stage 创建多少个磁盘文件呢?下一个 Stage 的 Task 有多少个,当前 Stage 的每个 Task 就要创建多少份磁盘文件。比如下一个 Stage 总共有 100 个 Task ,那么当前 Stage 的每个 Task 都要创建 100 份磁盘文件。如果当前 Stage 有 50 个 Task ,总共有 10 个 Executor,每个 Executor 执行 5 个 Task ,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建5000 个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的;
  3. Shuffle Read阶段,通常是一个 Stage 刚开始运行的时候。 Stage 中每一个 Task 需要将上一个 Stage 的计算结果( Key 值相同),从各个节点上通过网络都拉取到自己所在的节点上;然后进行 Key 的聚合或连接等操作。由于 Shuffle Write 的过程中, Task 给下游 Stage 的每个 Task 都创建了一个磁盘文件,因此 Shuffle Read 的过程中,每个 Task 只要从上游 Stage 的所有 Task 所在节点上,拉取属于自己的那一个磁盘文件即可;
  4. Shuffle Read 拉取过程是一边拉取一边进行聚合的。每个 Shuffle Read Task 都会有一个自己的 Buffer 缓冲,每次都只能拉取与 Buffer 缓冲相同大小的数据;然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 Buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
优化的HashShuffleManager

Spark Shuffle 过程

注:设置 spark.shuffle.consolidateFiles为 true 即可开启优化机制(默认为 false)

  1. 开启 consolidate 机制之后,在 Shuffle Write 过程中,Task 就不是为下游 Stage 的每个 Task 创建一个磁盘文件了。此时会出现 ShuffleFileGroup 的概念,每个 ShuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 Stage 的 Task 数量是相同的。一个 Executor 上有多少个 CPU core,就可以并行执行多少个 Task 。而第一批并行执行的每个 Task 都会创建一个 ShuffleFileGroup ,并将数据写入对应的磁盘文件内;
  2. 当 Executor 的 CPU core 执行完一批 Task ,接着执行下一批 Task 时,下一批 Task 就会复用之前已有的 ShuffleFileGroup ,包括其中的磁盘文件。也就是说,此时 Task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 Task 复用同一批磁盘文件,这样就可以有效将多个 Task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 Shuffle Write的性能;
  3. 假设第二个 Stage 有100个 Task ,第一个 Stage 有 50 个 Task ,总共还是有 10 个 Executor,每个 Executor 执行5个 Task 。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生500个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:CPU Core 的数量 * 下一个 Stage 的 Task 数量。也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

SortShuffleManager

普通运行机制

Spark Shuffle 过程

  1. 首先数据会先写入一个内存数据结构中,此时根据不同的 Shuffle 算子,可能选用不同的数据结构。如果是 reduceBy Key 这种聚合类的 Shuffle 算 子,那么会选用 Map数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 Shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之 后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构;
  2. 在溢写到磁盘文件之前,会先根据 Key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数 据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的 形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再 一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能;
  3. Task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文 件。最后会将之前所有的临时磁盘文件都进行合并,这就是 Merge 过程,此时会将之前所有临时磁盘 文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 Task 就只对应一个磁盘 文件,也就意味着该 Task 为下游 Stage 的 Task 准备的数据都在这一个文件中,因此还会单独写一份索引 文件,其中标识了下游各个 Task 的数据在文件中的 Start offset 与 End offset;
  4. SortShuffleManager 由于有一个磁盘文件 Merge 的过程,因此大大减少了文件数量。比如第一个 Stage 有 50 个 Task ,总共有 10 个 Executor ,每个 Executor 执行 5 个 Task ,而第二个 Stage 有 100 个 Task 。由于每 个 Task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁 盘文件。
bypass 运行机制

Spark Shuffle 过程

  1. Task 会为每个下游 Task 都创建一个临时磁盘文件,并将数据按 Key 进行 Hash 然后根据 Key 的 Hash 值,将 Key 写入对应的磁盘文件之中。写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件;
  2. 该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,Shuffle Read的性能会更好;
  3. 而该机制与普通 SortShuffleManager 运行机制的不同在于:磁盘写机制不同、不会进行排序。启用该机制的最大好处在于,Shuffle Write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
SortShuffleManager 源码说明
执行 runTask
org.apache.spark.scheduler.Task

final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem): T = {
    SparkEnv.get.blockManager.registerTask(taskAttemptId)
        
    ................. 中间省略部分代码 ....................

    try {

      /**
       * @Author: Small_Ran
       * @Date: 2022/6/20
       * @Description: 开始运行 Task(ResultTask、ShuffleMapTask)
       */
      runTask(context)
    } catch {
      case e: Throwable =>
        // Catch all errors; run task failure callbacks, and rethrow the exception.
        try {
          context.markTaskFailed(e)
        } catch {
          case t: Throwable =>
            e.addSuppressed(t)
        }
        context.markTaskCompleted(Some(e))
        throw e
    } finally {
        
      	................. 中间省略部分代码 ....................
    }
  }
org.apache.spark.scheduler.ResultTask

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()

    /**
     * @Author: Small_Ran
     * @Date: 2022/6/20
     * @Description:  ResultTask 是 JOb 的最后一个环节;主要的作用就是输出结果
     * 包括:
     *      1、collect 收集结果到 Driver
     *      2、foreach 打印输出
     *      3、saveAsTextFile 写出到外部系统 HDFS、MySQL、Hive
     */
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

注:接下来主要说明 ShuffleMapTask 的实现方式文章来源地址https://www.toymoban.com/news/detail-402763.html

选择 Shuffle 写入器
org.apache.spark.scheduler.ShuffleMapTask.runTask(context: TaskContext)

//      这里的 shuffleManager 就是SparkContext初始化中默认值 manager = org.apache.spark.shuffle.sort.SortShuffleManager
      val manager = SparkEnv.get.shuffleManager
//      获取 Shuffle 选择器(SerializedShuffleHandle、BypassMergeSortShuffleHandle、BaseShuffleHandle)
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
org.apache.spark.shuffle.sort.SortShuffleManager

override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)

//    根据条件判断选择不同的 ShuffleManager,handle值是通过上面的 registerShuffle 方法进行判断
    val env = SparkEnv.get
    handle match {

//      不需要进行 Map 阶段的预聚合、Partition 数量小于16777216
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
//      不需要进行 Map 阶段的预聚合、Partition 数量小于 200,返回 BypassMergeSortShuffleHandle 对象
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
//        其他情况,通用情况
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }
ShuffleManager 选择判断
org.apache.spark.shuffle.sort.SortShuffleManager

override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
//    对 BypassMergeSortShuffleHandle 进行的判断
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
//    对 SerializedShuffleHandle 进行的判断
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }
org.apache.spark.shuffle.sort.SortShuffleWriter

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
//    首先使用 bypass 运行机制的前提是,在map阶段是否存在预聚合操作(Combine)
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
//      这里就是判断是否使用 bypass 运行机制,如果任务数量(分区数量) <= 200(参数:spark.shuffle.sort.bypassMergeThreshold)则使用 bypass 运行机制。
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }
org.apache.spark.shuffle.sort.SortShuffleManager

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
//    判断支不支持序列化器
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object relocation")
      false
//      判断是否有聚合操作
    } else if (dependency.aggregator.isDefined) {
      log.debug(
        s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
      false
//      判断分区数是否大于16777216
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }
ShuffleManager 读写数据
org.apache.spark.scheduler.ShuffleMapTask

/**
       * @Author: Small_Ran
       * @Date: 2022/6/20
       * @Description:  rdd 计算的核心方法就是 iterator 方法
       * SortShuffleWriter 的 write 方法可以分为几个步骤:
       *    1、将上游 RDD 计算出的数据(通过调用 rdd.iterator方法)写入内存缓冲区,在写的过程中如果超过内存阈值就会溢写磁盘文件(可能会写多个文件);
       *    2、最后将溢写的文件和内存中剩余的数据一起进行归并排序后写入到磁盘中形成一个大的数据文件(按照 Key 分区排序);
       *    3、在归并排序后写的过程中,每写一个分区就会手动刷写一遍,并记录下这个分区数据在文件中的位移;
       *    4、最后写完一个 Task 的数据后,磁盘上会有两个文件:数据文件和记录每个 reduce 端 partition 数据位移的索引文件;
       *
       */
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
org.apache.spark.shuffle.sort.SortShuffleWriter

override def write(records: Iterator[Product2[K, V]]): Unit = {

//    获取一个排序器,根据是否需要 map 端聚合传递不同的参数
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }

//    将数据插入排序器中,这个过程或溢写出多个磁盘文件
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
//    根据 ShuffleID 和分区 ID 获取一个磁盘文件名,MapId 就是 ShuffleMap 端 RDD 的 PartitionID
//    将多个溢写的磁盘文件和内存中的排序数据进行归并排序并写到一个文件中,同时返回每个reduce端分区的数据在这个文件中的位移
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

//    将索引写入一个索引文件,并将数据文件的文件名由临时文件名改成正式的文件名;为输出文件名加一个uuid后缀
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

//      这一步将溢写到的磁盘的文件和内存中的数据进行归并排序,并溢写到一个文件中,这一步写的文件是临时文件名
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)

//      写入索引文件,使用 File.renameTo 方法将临时索引和临时数据文件重命名为正常的文件名
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)

//      返回一个状态对象,包含 shuffle 服务 Id 和各个分区数据在文件中的位移
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }
org.apache.spark.util.collection.ExternalSorter

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined


    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()

//        向内存缓冲中插入一条数据,map的键为(partitionId, key)
        map.changeValue((getPartition(kv._1), kv._1), update)

//        每写入一条数据就检查一遍内存。如果缓冲超过阈值,就会溢写到磁盘生成一个文件
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

到了这里,关于Spark Shuffle 过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 什么是shuffle?shuffle的原理及过程

    目录 一、什么是shuffle 二、为什么要引入shuffle,有哪些影响 三、shuffle的工作原理 1、shuffle的阶段 2、shuffle的中间文件 3、Shuffle Write 4、Shuffle Read 四、总结回顾 类比分公司的人与物和 Spark 的相关概念是这样对应的: 集团分公司与Spark相关概念对应关系 工地搬砖任务 类比上边

    2024年02月02日
    浏览(36)
  • 大数据课程K6——Spark的Shuffle详解

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的定义特点目的优缺点; ⚪ 掌握Spark的相关参数配置; ⚪ 掌握Hadoop的插件配置; Shuffle,就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点

    2024年02月11日
    浏览(38)
  • Spark 提交任务参数设置关于(线程,shuffle,序列化)

    是在使用 Apache Spark 时,为了设置 Java 虚拟机(JVM)的堆栈大小而使用命令行选项。 -Xss 是 Java 虚拟机的一个选项,用于设置线程的堆栈大小。在这个命令行选项中, -Xss6m 表示将线程的堆栈大小设为 6MB。这个选项的作用是为了避免在运行 Spark 任务时出现堆栈溢出的错误。

    2024年02月02日
    浏览(570)
  • 字节跳动 Spark Shuffle 大规模云原生化演进实践

    Spark 是字节跳动内部使用广泛的计算引擎,已广泛应用于各种大规模数据处理、机器学习和大数据场景。目前中国区域内每天的任务数已经超过 150 万,每天的 Shuffle 读写数据量超过 500 PB。同时某些单个任务的 Shuffle 数据能够达到数百 TB 级别。 与此同时作业量与 Shuffle 的数

    2024年02月04日
    浏览(41)
  • Hadoop的shuffle过程及调优

    MapReduce 中的Shuffle 发生在 map 输出到 reduce 输入的过程,它的中文解释是 “洗牌”,顾名思义该过程涉及数据的重新分配,主要分为两部分: map 任务输出的数据分组、排序,写入本地磁盘。 reduce 任务拉取排序。 由于该过程涉及排序、磁盘IO、以及网络IO 等消耗资源和 CPU 比

    2024年02月09日
    浏览(39)
  • 我的第一个项目(十一) :飞机大战分包完成(简单阐述分包思路以及过程)

    好家伙,   代码已开源 Git: https://gitee.com/tang-and-han-dynasties/panghu-planebattle-esm.git NPM: panghu-planebattle-esm - npm (npmjs.com)   现在,比如说,我用Vue写好了个人博客主页的前端 我想在这个主页里面加点东西,让我的博客更缤纷多彩一点 我想在他的主页里面塞个小游戏,他会怎么做   如下

    2023年04月18日
    浏览(47)
  • 【Spark】What is the difference between Input and Shuffle Read

    Spark调参过程中 保持每个task的 input + shuffle read 量在300-500M左右比较合适 The Spark UI is documented here: https://spark.apache.org/docs/3.0.1/web-ui.html The relevant paragraph reads: Input: Bytes read from storage in this stage Output: Bytes written in storage in this stage Shuffle read: Total shuffle bytes and records read, includes b

    2024年02月06日
    浏览(46)
  • 【c语言】详解c语言#预处理期过程 | 宏定义前言

    c语言系列专栏: c语言之路重点知识整合   创作不易,本篇文章如果帮助到了你,还请点赞支持一下♡𖥦)!!  主页专栏有更多知识,如有疑问欢迎大家指正讨论,共同进步! 给大家跳段街舞感谢支持!ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ 代码编译到执

    2024年02月01日
    浏览(52)
  • 【大数据开发 Spark】第一篇:Spark 简介、Spark 的核心组成(5大模块)、Spark 的主要特征(4大特征)、Spark 对比 MapReduce

    初步了解一项技术,最好的方式就是去它的官网首页,一般首页都会有十分官方且准确的介绍,学习 Spark 也不例外, 官方介绍:Apache Spark ™是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。 我们可以得知,Spark 可以单节点运行,也可以搭

    2024年02月05日
    浏览(48)
  • Matlab相机标定方法及主要参数含义,坐标变换过程

    网上有很多关于matlab相机标定的资料,但找了很久没有相应的参数说明:怎样利用获得参数从世界坐标系变换到图像坐标系,所以这里为了记录一下,也方便新人理解。 首先由图像到参数的获取部分在网上有很多资料,也很容易,在这就不再赘述,我利用的标定板的格子大小

    2024年02月05日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包