Spark Kubernetes 的源码分析系列 - features

这篇具有很好参考价值的文章主要介绍了Spark Kubernetes 的源码分析系列 - features。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 Overview

features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。

2 分析

看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
├── BasicDriverFeatureStep.scala
├── BasicExecutorFeatureStep.scala
├── DriverCommandFeatureStep.scala
├── DriverKubernetesCredentialsFeatureStep.scala
├── DriverServiceFeatureStep.scala
├── EnvSecretsFeatureStep.scala
├── ExecutorKubernetesCredentialsFeatureStep.scala
├── HadoopConfDriverFeatureStep.scala
├── KerberosConfDriverFeatureStep.scala
├── KubernetesFeatureConfigStep.scala
├── LocalDirsFeatureStep.scala
├── MountSecretsFeatureStep.scala
├── MountVolumesFeatureStep.scala
└── PodTemplateConfigMapStep.scala

还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder 中,有一个 features 这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。

val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

下面我们按照顺序来分析一下。

2.1 BasicDriverFeatureStep

类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?

driverPodName // Driver Pod 的名字
driverContainerImage // Driver Container 
driverCpuCores // Driver 需要的 Cpu Cores
driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
driverMemoryMiB // Driver 的内存 MiB
overheadFactor // 这个稍后会讲到
memoryOverheadMiB // 这个稍后会讲到
driverMemoryWithOverheadMiB // 这个稍后会讲到

以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。

# 一堆的 Builder
val driverContainer = new ContainerBuilder(pod.container)
  # Container Name
  .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
  # Image Name
  .withImage(driverContainerImage)
  # Image 拉取的策略
  .withImagePullPolicy(conf.imagePullPolicy)
  # Driver 的端口
  .addNewPort()
    .withName(DRIVER_PORT_NAME)
    .withContainerPort(driverPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
    # Block Manager 的 Port 相关配置
    .withName(BLOCK_MANAGER_PORT_NAME)
    .withContainerPort(driverBlockManagerPort)
    .withProtocol("TCP")
    .endPort()
  .addNewPort()
     # Spark UI 的端口配置
    .withName(UI_PORT_NAME)
    .withContainerPort(driverUIPort)
    .withProtocol("TCP")
    .endPort()
  .addNewEnv()
    # 一些环境变量
    .withName(ENV_SPARK_USER)
    .withValue(Utils.getCurrentUserName())
    .endEnv()
  .addAllToEnv(driverCustomEnvs.asJava)
  .addNewEnv()
    .withName(ENV_DRIVER_BIND_ADDRESS)
    .withValueFrom(new EnvVarSourceBuilder()
      .withNewFieldRef("v1", "status.podIP")
      .build())
    .endEnv()
  .editOrNewResources()
     # cpu 相关配置
    .addToRequests("cpu", driverCpuQuantity)
    .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
    .addToRequests("memory", driverMemoryQuantity)
    .addToLimits("memory", driverMemoryQuantity)
    .addToLimits(driverResourceQuantities.asJava)
    .endResources()
    # 终于 build 完
  .build()

val driverPod = new PodBuilder(pod.pod)
   # 如果 Pod 是存在的,表示要么修改,否则就是新增
  .editOrNewMetadata()
    # Pod 的名字
    .withName(driverPodName)
    # Pod 的 Label
    .addToLabels(conf.labels.asJava)
    .addToAnnotations(conf.annotations.asJava)
    .endMetadata()
  .editOrNewSpec()
    # Pod 的重启策略
    .withRestartPolicy("Never")
    # Pod 的 NodeSelector 特性
    .addToNodeSelector(conf.nodeSelector.asJava)
    # 拉取镜像的 Repository 密码(ru
    .addToImagePullSecrets(conf.imagePullSecrets: _*)
    .endSpec()
  .build()

此外 getAdditionalPodSystemProperties() 还需要这个方法是拉取其他的配置,比如说 spark.app.id 等等,不赘述了。

2.2 DriverKubernetesCredentialsFeatureStep

这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。

maybeMountedOAuthTokenFile // OAuthToken 文件
maybeMountedClientKeyFile // Client Key 文件
maybeMountedClientCertFile // Cient Cert 文件
maybeMountedCaCertFile // Ca Cert 文件
driverServiceAccount // Driver 的 Service Account
oauthTokenBase64 // OauthToken Base64 编码
caCertDataBase64 // CaCert 里面的数据 Base64 编码
clientKeyDataBase64 // Client Key 数据的 Base64 编码
clientCertDataBase64 // Client Cert 数据的 Base 64 编码
shouldMountSecret // 是否需要挂载 Secret
driverCredentialsSecretName // Driver 的认证 Secret 名

这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
  // 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
  if (shouldMountSecret) {
    Seq(createCredentialsSecret())
  } else {
    Seq.empty
  }
}

2.3 DriverServiceFeatureStep

这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。

preferredServiceName // Service Name
resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
driverPort // Driver 的端口
driverBlockManagerPort // Block Manager 的端口
driverUIPort // Spark UI 的端口

上面的 Service Name 超过63个字符的话需要重新配置。

private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
  preferredServiceName
} else {
  // 超过63个字符,就是需要系统内部重置这个名字了
  val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
  logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
    s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
    s"$shorterServiceName as the driver service's name.")
  shorterServiceName
}

2.4 MountSecretsFeatureStep

2.5 EnvSecretsFeatureStep

2.6 LocalDirsFeatureStep

resolvedLocalDirs // 本地目录
useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs

2.7 MountVolumesFeatureStep

2.8 DriverCommandFeatureStep

这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。

/**
 * Creates the driver command for running the user app, and propagates needed configuration so
 * executors can also find the app code.
 */

2.9 HadoopConfDriverFeatureStep

这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。

confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
confFiles // 配置文件

然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。

override def configurePod(original: SparkPod): SparkPod = {

    original.transform { case pod if hasHadoopConf =>

      // 如果有环境变量,就从环境变量指定的路径获取
      val confVolume = if (confDir.isDefined) {
        val keyPaths = confFiles.map { file =>
          new KeyToPathBuilder()
            .withKey(file.getName())
            .withPath(file.getName())
            .build()
        }
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(newConfigMapName)
            .withItems(keyPaths.asJava)
            .endConfigMap()
          .build()
      } else {
        // 没有环境变量的话,就直接用存在的 configMap
        new VolumeBuilder()
          .withName(HADOOP_CONF_VOLUME)
          .withNewConfigMap()
            .withName(existingConfMap.get)
            .endConfigMap()
          .build()
      }

      // 修改 Pod,通过 editSpec 方法
      val podWithConf = new PodBuilder(pod.pod)
        .editSpec()
          .addNewVolumeLike(confVolume)
            .endVolume()
          .endSpec()
          .build()

      // Container Mount 需要的 Volume
      val containerWithMount = new ContainerBuilder(pod.container)
        .addNewVolumeMount()
          .withName(HADOOP_CONF_VOLUME)
          .withMountPath(HADOOP_CONF_DIR_PATH)
          .endVolumeMount()
        .addNewEnv()
          .withName(ENV_HADOOP_CONF_DIR)
          .withValue(HADOOP_CONF_DIR_PATH)
          .endEnv()
        .build()

      SparkPod(podWithConf, containerWithMount)
    }
  }

2.10 KerberosConfDriverFeatureStep

这是关于 Kerberos 配置的 Step。

/**
 * Provide kerberos / service credentials to the Spark driver.
 *
 * There are three use cases, in order of precedence:
 * Kerberos 的服务,有三种场景
 *
 * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
 *   manage the kerberos login and the creation of delegation tokens.
 * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
 *   on the driver pod, and the driver will handle distribution of those tokens to executors.
 * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
 *   tokens which will be provided to the driver. The driver will handle distribution of the
 *   tokens to executors.
 */
principal // 指的是 KDC 中账号的 Principal
keytab // 指的是 Kerberos 生成的 Keytab
existingSecretName  // 存在的 secret name
existingSecretItemKey // secret 中的 item key
krb5File // Kerberos 服务的配置文件
krb5CMap // krb5 的 configMap
hadoopConf // 多余?
delegationTokens // Hadoop 体系中的轻量级认证 DT

生成 token 的关键代码如下。

private lazy val delegationTokens: Array[Byte] = {
  // 如果 keytab 和 secret 都是空的,就去生成 DT
  if (keytab.isEmpty && existingSecretName.isEmpty) {
    val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
      SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
    val creds = UserGroupInformation.getCurrentUser().getCredentials()
    tokenManager.obtainDelegationTokens(creds)
    // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
    // to avoid creating an unnecessary secret.
    if (creds.numberOfTokens() >  || creds.numberOfSecretKeys() > ) {
      SparkHadoopUtil.get.serialize(creds)
    } else {
      null
    }
  } else {
    null
  }
}

2.11 PodTemplateConfigMapStep

可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile。所以这个 Step 主要就是来解析这个 Pod Template 的。

3 Summary

可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。文章来源地址https://www.toymoban.com/news/detail-847044.html

到了这里,关于Spark Kubernetes 的源码分析系列 - features的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spark原理系列】Accumulator累加器原理用法示例源码详解

    源自专栏《SparkML:Spark ML系列专栏目录》 Accumulator是Spark中的一种分布式变量,用于在并行计算中进行累加操作。它是由MapReduce模型中的“全局计数器”概念演化而来的。 Accumulator提供了一个可写的分布式变量,可以在并行计算中进行累加操作。在Spark中,当一个任务对Accum

    2024年03月14日
    浏览(60)
  • 深入理解 Spark(二)SparkApplication 提交和运行源码分析

    对于运行失败的 Task,TaskSetManager 会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中等待重新执行,当重试次数过允许的最大次数,整个 Application失败。在记录 Task 失败次数过程中,TaskSetManager 还会记录它上一次失败所在的

    2024年01月17日
    浏览(37)
  • 【Spark ML系列】Frequent Pattern Mining频繁挖掘算法功能用法示例源码论文详解

    挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的首要步骤,在数据挖掘领域已经成为一个活跃的研究课题。我们建议用户参考维基百科上关于关联规则学习的相关信息。 FP-growth算法在《Han et al., Mining frequent patterns without candidate generation》一文中进行了描述

    2024年02月19日
    浏览(34)
  • 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示

    2024年02月06日
    浏览(50)
  • Spark实时数据流分析与可视化:实战指南【上进小菜猪大数据系列】

    上进小菜猪,沈工大软件工程专业,爱好敲代码,持续输出干货。 本文介绍了如何利用Apache Spark技术栈进行实时数据流分析,并通过可视化技术将分析结果实时展示。我们将使用Spark Streaming进行数据流处理,结合常见的数据处理和可视化库,实现实时的数据流分析和可视化展

    2024年02月07日
    浏览(50)
  • 【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

    需要源码请点赞关注收藏后评论区留言私信~~~ 1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消

    2024年02月03日
    浏览(49)
  • 如何用Kafka, Cassandra, Kubernetes, Spark 搭建一套系统?

    Kafka、Cassandra、Kubernetes和Spark都是用于构建分布式系统的流行技术。下面是它们各自的职责以及如何将它们组合在一起搭建一套系统的简要说明: 1、Kafka(消息队列): Kafka是一个高吞吐量、可持久化、分布式发布订阅消息系统。它负责处理实时数据流和消息传递。Kafka使用

    2024年02月16日
    浏览(22)
  • Spark系列(一)spark简介及安装配置

    目录 1. spark简介: 2. spark特点: 2.1 Speed:速度快 2.2 Easy of Use:易用性 2.3 Generality:通用性 2.4 Runs Everywhere:到处运行 3. Spark的应用场景 4. 环境要求及准备工作 5. spark搭建模式: 5.1 local模式在解压缩安装后 5.2 Standalone模式搭建(基于hdfs文件存储) 5.1.1 首先配置spark环境变量:

    2023年04月09日
    浏览(36)
  • Spark避坑系列二(Spark Core-RDD编程)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark避坑系列第二篇,该篇章主要介绍spark的编程核心RDD,RDD的概念,基础操作 RDD(Resilient Distributed Dataset)叫做弹性

    2024年02月02日
    浏览(36)
  • 面试系列之《Spark》(持续更新...)

    job:应用程序中每遇到一个action算子就会划分为一个job。 stage:一个job任务中从后往前划分,分区间每产生了shuffle也就是宽依赖则划分为一个stage,stage这体现了spark的pipeline思想,即数据在内存中尽可能的往后多计算,最后落盘,减少磁盘IO。 task:RDD中一个分区对应一个ta

    2024年02月20日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包