Spark Kubernetes 的源码分析系列 - scheduler

这篇具有很好参考价值的文章主要介绍了Spark Kubernetes 的源码分析系列 - scheduler。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 Overview

这一块代码可以理解为 Spark 是如何实现一个基于 K8S 的调度器,来调度生成 Executor Pod 的。

2 分析

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler
└── cluster
    └── k8s
        ├── ExecutorPodStates.scala
        ├── ExecutorPodsAllocator.scala
        ├── ExecutorPodsLifecycleManager.scala
        ├── ExecutorPodsPollingSnapshotSource.scala
        ├── ExecutorPodsSnapshot.scala
        ├── ExecutorPodsSnapshotsStore.scala
        ├── ExecutorPodsSnapshotsStoreImpl.scala
        ├── ExecutorPodsWatchSnapshotSource.scala
        ├── KubernetesClusterManager.scala
        ├── KubernetesClusterSchedulerBackend.scala
        └── KubernetesExecutorBuilder.scala

2 directories, 11 files

2.1 KubernetesExecutorBuilder

由于上篇文章主要介绍了 Driver 的 Pod 是如何生成的,在讲 scheduler 之前,先补充一下 Executor 的配置步骤。重点代码在下面这个 features 里。步骤跟 Driver 类似,但是少了一些,剩下的就是 一个 Basic 的配置,当然是包含 Pod 或者 Container 的一些 meta 信息。此外,跟 ApiServer 交互请求 Executor Pod 的时候也需要 K8S 的安全认证的机制。然后就是类似 Env 和本地目录挂载的一些配置。

val features = Seq(
  new BasicExecutorFeatureStep(conf, secMgr),
  new ExecutorKubernetesCredentialsFeatureStep(conf),
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new LocalDirsFeatureStep(conf),
  new MountVolumesFeatureStep(conf))

2.2 KubernetesClusterManager

这个是 Spark 这一段,关于 K8S 集群作为 resource manager 的一个管理中心。这个类是继承了 ExternalClusterManager 接口的,主要是控制生成 schedulerBackend 对象。

2.3 KubernetesClusterSchedulerBackend

这是 K8S 集群调度器的封装,SchedulerBackend,简称 SB 就好了...SB 主要是包含了申请 request 和删除 remove Executor 的逻辑。

// 这里是指定初始申请的 Executor 的数量,可以通过 conf 来配置
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
// 这个是 Executor 出问题 debug 的关键
// 默认情况下 Executor 退出后,会由 Spark 的 K8S 客户端主动进行删除
// 所以 Executor 的日志就找不到了
// 开启这个配置 spark.kubernetes.executor.deleteOnTermination
// 这样 Executor 即时 Failed 了,他的 Pod 也不会被自动删除
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

// 移除 Executor 的逻辑,上面说到的 Pod 被删除就是这里的 delete 导致的
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
  logInfo("do send request to kill executors!")
  kubernetesClient
    .pods()
    .withLabel(SPARK_APP_ID_LABEL, applicationId())
    .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
    .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
    .delete()
  // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}

2.4 ExecutorPodsSnapshotsStore

这个接口是用于管理 Executor Pod,下面简称 EP...EP 的状态,并且用 ExecutorPodsSnapshot 的数据结构来记录变化的情况。

2.5 ExecutorPodsSnapshot

ExecutorPodsSnapshot 是关于 Spark App 在集群里 EP 的状态的不可变视图。

private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {

  import ExecutorPodsSnapshot._

  // 核心方法,witUpdate 通过传入 Pod 参数,通过 new 一个 EP snapshot 视图来记录 EP 的状态,本质上一个 Map(Executor id -> Executor Pod 状态) 的数据结构
  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
    new ExecutorPodsSnapshot(newExecutorPods)
  }
}

2.6 ExecutorPodsSnapshotsStoreImpl

这是 ExecutorPodsSnapshotsStore 的实现类。下面一段是理解整个 scheduler 的关键,所以建议拿着英文注释认真看一遍,大概就能理解了。

Controls the propagation of the Spark application's executor pods state to subscribers that react to that state. Roughly follows a producer-consumer model. Producers report states of executor pods, and these states are then published to consumers that can perform any actions in response to these states. Producers push updates in one of two ways. An incremental update sent by updatePod() represents a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that the passed pods are all of the most up to date states of all executor pods for the application. The combination of the states of all executor pods for the application is collectively known as a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that most recent snapshot - either by incrementally updating the snapshot with a single new pod state, or by replacing the snapshot entirely on a full sync. Consumers, or subscribers, register that they want to be informed about all snapshots of the executor pods. Every time the store replaces its most up to date snapshot from either an incremental update or a full sync, the most recent snapshot after the update is posted to the subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different time intervals.

以上就是他的设计思想,简单来说就是依照生产消费者模式,订阅者订阅的是 EP 的状态,而这个状态是上文提到的 Snapshot。

SNAPSHOT_LOCK // 锁
subscribers // 订阅者
pollingTasks // ?
currentSnapshot // 当前的 Snapshot

2.7 ExecutorPodsWatchSnapshotSource

这里面主要是继承 K8S 客户端的一个 Wathcher 监听器,主要监听 Pod 的事件。因为 EP 被增删改出错,等都需要被 SB 感知。

enum Action {
  ADDED, MODIFIED, DELETED, ERROR
}

2.8 ExecutorPodsPollingSnapshotSource

这个是通过 K8S client 轮询 ApiServer 获取 Pod 状态并且保存到 Snapshot 里的过程。

private class PollRunnable(applicationId: String) extends Runnable {

  override def run(): Unit = Utils.tryLogNonFatalError {
    logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
    // 核心方法,将得到的 Pod 的状态,通过 replaceSnapshot 来记录
    snapshotsStore.replaceSnapshot(kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId)
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .list()
      .getItems
      .asScala)
  }
}

// 轮询默认是30s一次
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

2.9 ExecutorPodsLifecycleManager

这个就是一个 EP 生命周期的 Manager,本质上 Pod 是创建在 K8S 集群的,Driver Pod 对 EP 的管理需要通过 K8S 的 ApiServer,而当 Pod 发生状态改变了,对应的也要告知 Driver。

private def onNewSnapshots(
    schedulerBackend: KubernetesClusterSchedulerBackend,
    snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
  val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
  snapshots.foreach { snapshot =>
    snapshot.executorPods.foreach { case (execId, state) =>
      state match {
        case deleted@PodDeleted(_) =>
          logDebug(s"Snapshot reported deleted executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          removeExecutorFromSpark(schedulerBackend, deleted, execId)
          execIdsRemovedInThisRound += execId
        case failed@PodFailed(_) =>
          logDebug(s"Snapshot reported failed executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
        case succeeded@PodSucceeded(_) =>
          logDebug(s"Snapshot reported succeeded executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
            s" unusual unless Spark specifically informed the executor to exit.")
          onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
        case _ =>
      }
    }
  }

  if (snapshots.nonEmpty) {
    val latestSnapshot = snapshots.last
    (schedulerBackend.getExecutorIds().map(_.toLong).toSet
      -- latestSnapshot.executorPods.keySet
      -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
      if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
        val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
          s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
          s" executor may have been deleted but the driver missed the deletion event."
        logDebug(exitReasonMessage)

        val exitReason = ExecutorExited(
          UNKNOWN_EXIT_CODE,
          exitCausedByApp = false,
          exitReasonMessage)
        schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
        execIdsRemovedInThisRound += missingExecutorId
      }
    }
  }

  if (execIdsRemovedInThisRound.nonEmpty) {
    logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
      s" from Spark that were either found to be deleted or non-existent in the cluster.")
  }
}

3 Summary

Scheduler 的粗浅分析就到这里,其实不是太难理解的,调度器的功能就是找到给 Driver 分配和在合适的时候移除 Executor,至于如何找合适的节点来跑 Executor,那是 K8S 的事情,这里是把 K8S 作为一个外部的集群模式,具体的调度工作是交给 K8S 的。文章来源地址https://www.toymoban.com/news/detail-859356.html

到了这里,关于Spark Kubernetes 的源码分析系列 - scheduler的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • default-scheduler In Kubernetes

    Once upon a time, in a world governed by technology and innovation, there existed a vast interconnected network known as the Cloud. Within this vast realm, a fascinating phenomenon known as Kubernetes emerged, revolutionizing the way applications were deployed and managed. At the heart of this revolution was a mysterious and powerful creature called the Defa

    2024年02月10日
    浏览(32)
  • 【Spark源码分析】Spark的RPC通信一-初稿

    在 RpcEnv 中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类 NettyRpcEnv 。 RpcEndpoints 需要向 RpcEnv 注册自己的名称,以便接收信息。然后, RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息,并将它们传送到相应的 RpcEndpoints 。对于 RpcEnv 捕捉

    2024年02月04日
    浏览(41)
  • 【Spark源码分析】Spark的RPC通信二-初稿

    传输层主要还是借助netty框架进行实现。 TransportContext 包含创建 TransportServer 、 TransportClientFactory 和使用 TransportChannelHandler 设置 Netty Channel 管道的上下文。 TransportClient 提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在 TransportContext 的范围之外进行(

    2024年02月03日
    浏览(47)
  • 深入理解 Spark(一)spark 运行模式简介与启动流程源码分析

    以 standalone-client 为例,运行过程如下: SparkContext 连接到 Master,向 Master 注册并申请资源(CPU Core 和 Memory); Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,然后启动 StandaloneExecutorBackend; Stan

    2024年02月02日
    浏览(40)
  • 【Spark原理系列】Accumulator累加器原理用法示例源码详解

    源自专栏《SparkML:Spark ML系列专栏目录》 Accumulator是Spark中的一种分布式变量,用于在并行计算中进行累加操作。它是由MapReduce模型中的“全局计数器”概念演化而来的。 Accumulator提供了一个可写的分布式变量,可以在并行计算中进行累加操作。在Spark中,当一个任务对Accum

    2024年03月14日
    浏览(62)
  • 深入理解 Spark(二)SparkApplication 提交和运行源码分析

    对于运行失败的 Task,TaskSetManager 会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中等待重新执行,当重试次数过允许的最大次数,整个 Application失败。在记录 Task 失败次数过程中,TaskSetManager 还会记录它上一次失败所在的

    2024年01月17日
    浏览(37)
  • 深入源码分析kubernetes informer机制(三)Resync

    [阅读指南] 这是该系列第三篇 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 如果看过上一篇,大概能了解,client数据主要通过reflector 的list/watch进行同步。 回顾一下informer整体的数据同步逻辑。 informer初始化时,调用l

    2024年02月12日
    浏览(36)
  • 深入源码分析kubernetes informer机制(二)Reflector

    [阅读指南] 这是该系列第二篇 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下

    2024年02月12日
    浏览(41)
  • 深入源码分析kubernetes informer机制(四)DeltaFIFO

    [阅读指南] 这是该系列第四篇 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 如下图,clinet-go中定义了存储类型接口store,用来提供存储对象的基本能力。 queue继承了store接口,并提供了队列的能力,队列中可以保存需要增

    2024年02月13日
    浏览(37)
  • 深入源码分析kubernetes informer机制(零)简单了解informer

    [阅读指南] 基于kubernetes 1.27 stage版本 为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。 client-go是kubernetes节点与服务端进行资源交互的客户端库,提供了非常多的功能与组件,用来与Kubernetes API 进行交互与操作。常见的功能有管理和同步kubernetes资源、

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包