spark master组件分析

这篇具有很好参考价值的文章主要介绍了spark master组件分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

master职责和常用成员变量

master只有在local-cluster和standalone部署模式下存在。
Master的职责包括Worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个Worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。
Worker向Master注册时会携带自身的身份和资源信息(如ID、host、port、内核数、内存大小等),这些资源将按照一定的资源调度策略分配给Driver或Application。Master给Driver分配了资源后,将向Worker发送启动Driver的命令,后者在接收到启动Driver的命令后启动Driver。
Master给Application分配了资源后,将向Worker发送启动Executor的命令,后者在接收到启动Executor的命令后启动Executor。Master接收Worker的状态更新消息,用于“杀死”不匹配的Driver或Application。
Worker向Master发送的心跳消息有两个目的:一是告知Master自己还“活着”,另外则是某个Master出现故障后,通过领导选举选择了其他Master负责对整个集群的管理,此时被激活的Master可能并没有缓存Worker的相关信息,因此需要告知Worker重新向新的Master注册。
master的成员变量比较多,介绍几个主要的。

  • rpcEnv:即RpcEnv。
  • address:RpcEnv的地址(即RpcAddress)。RpcAddress只包含host和port两个属性,用来记录Master URL的host和port。
  • checkForWorkerTimeOutTask:检查Worker超时的任务。
  • forwardMessageThread:包含一个线程的ScheduledThreadPoolExecutor,启动的线程以master-forward-message-thread作为名称。forwardMessageThread主要用于运行checkForWorkerTimeOutTask和recoveryCompletionTask。
  • workers:所有注册到Master的Worker信息(WorkerInfo)的集合。
  • idToApp:Application ID与ApplicationInfo的映射关系。
  • waitingApps:正等待调度的Application所对应的ApplicationInfo的集合。
  • apps:所有ApplicationInfo的集合。
  • idToWorker:Worker id与WorkerInfo的映射关系。
  • addressToWorker:Worker的RpcEnv的地址(RpcAddress)与WorkerInfo的映射关系。
  • endpointToApp:RpcEndpointRef与ApplicationInfo的映射关系。
  • addressToApp:Application对应Driver的RpcEnv的地址(RpcAddress)与App-licationInfo的映射关系。
  • drivers:所有Driver信息(DriverInfo)的集合。
  • waitingDrivers:正等待调度的Driver所对应的DriverInfo的集合。
  • state:Master所处的状态。Master的状态包括支持(STANDBY)、激活(ALIVE)、恢复中(RECOVERING)、完成恢复(COMPLETING_RECOVERY)等。

很大部分都是master用来记录worker、application、driver的对应关系和状态的。这也表明master有负责协调它们的职责。
spark master组件分析,spark,java,spring

master启动

启动Master有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于Standalone模式。
对象启动就是调用startRpcEnvAndEndpoint�方法。进程启动是调用main方法,在main方法中调用startRpcEnvAndEndpoint�方法。所以两种启动方式最后都是startRpcEnvAndEndpoint�方法的调用。
startRpcEnvAndEndpoint�主要做了三件事情:

  1. val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)�创建了rpcEnv。
  2. val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))�new master创建了master,rpcEnv.setupEndpoint向rpcEnv注册。
  3. val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)�发送绑定REST�端口的消息。

spark master组件分析,spark,java,spring
初始化rpcEnv和注册master部分在上一篇组件间通信中讲过了。其中在注册master的时候,会生成一条onStart的消息,发送给master,根据消息类型来处理,这里是onStart消息就会调用onStart方法。
onStart方法中主要也是初始化各种服务,是new master之后的延续。
spark master组件分析,spark,java,spring
spark master组件分析,spark,java,spring
至此,master已经成功启动了。

检查worker超时

在onStart方法中使用forwardMessageThread�线程池周期性的使用send发送CheckForWorkerTimeOut�消息。
spark master组件分析,spark,java,spring
send发送的消息都是one-way message。最后调用的是endpoint的receive方法。
receive中根据消息类型是CheckForWorkerTimeOut,调用timeOutDeadWorkers�方法。
timeOutDeadWorkers�是实际移除超时worker的方法,方法比较简单,主要是判断worker状态和是否超时,满足条件的worker移除。
spark master组件分析,spark,java,spring
spark master组件分析,spark,java,spring

master的高可用

master实现了LeaderElectable特质,可以参与选举。
�LeaderElectionAgent�是负责选举的类,对应master中的leaderElectionAgent�变量。
spark master组件分析,spark,java,spring
master的高可用主要涉及persistenceEngine(保存master的相关数据,以便于恢复)�和leaderElectionAgent�(选主服务),两者都是根据配置生成对应的类。

  • 默认:BlackHolePersistenceEngine(空实现)和MonarchyLeaderAgent(空实现)
  • FILESYSTEM�:FileSystemPersistenceEngine(基于文件持久化)和MonarchyLeaderAgent(空实现)
  • ZOOKEEPER:ZooKeeperPersistenceEngine(基于zk持久化)和ZooKeeperLeaderElectionAgent(基于zk选举)
  • CUSTOM:自定义实现类

一般生产使用ZOOKEEPER。
spark master组件分析,spark,java,spring
ZooKeeperLeaderElectionAgent可以继承了LeaderLatchListener�(zk框架的选主)
是使用curator�来操作zk来进行选主的,具体curator�选主本文不再赘述可以找找资料。
spark master组件分析,spark,java,spring
当节点选中为leader的时候会执行isLeader方法,没选中会执行notLeader方法。
最后都是会执行updateLeadershipStatus�
spark master组件分析,spark,java,spring
判断是否选中leader来执行master对应的方法(electedLeader�、revokedLeadership�)
spark master组件分析,spark,java,spring
执行master的方法,就是发送ElectedLeader/RevokedLeadership�消息
spark master组件分析,spark,java,spring
master收到ElectedLeader消息,判断时候需要恢复。如果需要就调用beginRecovery方法,设置延迟发送�CompleteRecovery�消息。
spark master组件分析,spark,java,spring
beginRecovery� 根据从持久化来的数据重新注册application、driver和worker,并通知对应节点发送了master的主从切换。(节点收到主从切换消息后处理过程不在本文描述,可以猜测应该是更换上报master节点之类的信息)
spark master组件分析,spark,java,spring
master等待一段时间后收到CompleteRecovery�,这期间是用来等待相关节点上报。
CompleteRecovery 移除超时没有上报的worker和application,重新调度超时的driver。最后调用schedule方法进行资源调度。
spark master组件分析,spark,java,spring
master收到RevokedLeadership�消息,直接退出进程(这里我觉得应该可以改成将master切换成standBy,不应该简单的退出)。
spark master组件分析,spark,java,spring
至此master高可用完成

driver调度

master的调度入口都是 schedule方法。
优先调度driver,再调度executor。

  1. 挑选存活的worker,并打乱顺序。随机分配
  2. 遍历需要资源的driver和worker,使用canLaunchDriver�判断这次的循环中worker有没有足够的资源调度driver,没有就继续。
  3. 找到worker的资源可以调度driver,使用launchDriver(worker, driver)�让worker调度driver
  4. !launched && isClusterIdle�则表明没有worker有足够资源调度driver,driver等待下一次调度

spark master组件分析,spark,java,spring
canLaunchDriver判断worker的内存、cpu、资源(可以是gpu、fpga等)的大小是否满足。
dirver和executor都是相同的方式判断。
spark master组件分析,spark,java,spring
launchDriver(worker, driver)�
主要是发送LaunchDriver�消息给worker,让worker拿出相应的资源来调度driver。
同时也更新了driver和worker的状态。
spark master组件分析,spark,java,spring

executor调度

  1. 获取application的单个executor的资源需求(比如 1cpu/2G)
  2. 挑选出至少满足单个executor的资源需求canLaunchExecutor�的存活的worker节点,并将worker节点按照空间cpu数倒排。为了优先分配给内核资源充足的worker
  3. 按照规则将executor分配给这些worker
  4. 按照分配结果发送消息给worker来启动executor

spark master组件分析,spark,java,spring
分配规则
分配规则有点长,其实就是两种模式。

  • 一个是密集型,优先在一个worker上分配,直到这个worker没有资源,再往下一个worker上分配。
  • 一个是分散型,在所有的worker上依次分配,完成后再按照顺序再次依次分配。
  1. 获取计算各种参数。主要是这几个
    • coresToAssign�:本次需要分配的core。取的是application未分配的core数和worker剩余core数的最小值。
    • minCoresPerExecutor�:一个executor的最小core数。没有指定coresPerExecutor�(一个executor分配core数)的话取值1.
    • oneExecutorPerWorker�:是否在一个worker上只分配一个executor,coresPerExecutor�空则为true。
    • spreadOutApps�:false集中分配,true分散分配
  2. 依次调用canLaunchExecutorForApp�获取可以分配executor的worker节点的下标�
  3. 循环分配,直到分配完成或者没有空闲的worker停止。
    • var keepScheduling = true�keepScheduling表示worker是否还参与调度,和spreadOutApps�参数相关。
    • oneExecutorPerWorker�是控制worker上分配executor的个数,如果为true最多分配一个,后续调度的话在这个worker上不增加executor的个数,增加core数和内存。

spark master组件分析,spark,java,spring
canLaunchExecutorForApp
主要从这几个方面判断资源是否满足

  • keepScheduling:全部剩余待分配的core数大于等于单个executor最小分配core数�。
  • enoughCores :这个worker有足够的core。
  • enoughMemory :这个worker有足够的内存。
  • enoughResources :这个worker有足够的资源。
  • underLimit�:新加一个executor,app的executor总数不会超过app的executor最大限制。

如果是oneExecutorPerWorker�为true,一个worker只能有一个executor,并且这个worker已经分配了executor,只添加core。只判断keepScheduling和enoughCores。
spark master组件分析,spark,java,spring
正式调度executor

  1. 根据assignedCores和coresPerExecutor�计算出要在worker上分配几个executor,如果coresPerExecutor为空则默认1个
  2. 计算coresToAssign�每个executor分配几个cpu核,如果coresPerExecutor为空则所有的cpu核(assignedCores)都分配到一个上面。
  3. 按照executor数量,调用launchExecutor�方法依次在worker上启动。launchExecutor�方法主要就是发送LaunchExecutor�消息到worker和发送ExecutorAdded消息给driver�。

spark master组件分析,spark,java,spring
spark master组件分析,spark,java,spring

worker注册

一个集群刚开始的时候只有Master,为了让后续启动的Worker加入到Master的集群中,每个Worker都需要在启动的时候向Master注册,Master接收到Worker的注册信息后,将把Worker的各种重要信息(如ID、host、port、内核数、内存大小等信息)缓存起来,以便进行资源的分配与调度。Master为了容灾,还将Worker的信息通过持久化引擎进行持久化,以便经过领导选举出的新Master能够将集群的状态从错误或灾难中恢复。

  1. master状态是standby,发送MasterInStandby消息给worker
  2. idToWorker中含有此次注册的worker的id,发送RegisteredWorker�消息给worker
  3. worker注册成功(registerWorker�),发送RegisteredWorker�消息给worker、持久化、开启调度。注册失败的话发送RegisterWorkerFailed�消息给worker。

spark master组件分析,spark,java,spring
registerWorker

  1. 移除workers缓存中相同address的dead的worker节点
  2. 判断addressWorker中是否存在相同地址的worker节点。
    1. 如果存在且状态为UNKOWN,这表明是master是在恢复的状态,移除这个旧的worker。
    2. 存在状态不是UNKOWN的,就是ACTIVE。相同address只能有一个worker(新的worker和旧的worker的id不同,地址相同),新的worker不能注册。
  3. 更新master中worker相关的缓存

spark master组件分析,spark,java,spring
worker向master注册成功后会发送一个WorkerLatestState�消息给master
WorkerLatestState�消息中携带了worker上运行的executor和driver的信息。
遍历executor和driver,匹配不上的发送KillExecutor/KillDriver�消息给master。
spark master组件分析,spark,java,spring

处理心跳

master收到Heartbeat�消息

  1. 从idToWorker�中获取worker:更新最后的心跳时间为当前时间。
  2. idToWorker�不存在worker,但是worker存在:说明这个worker注册过,但是超时了。向这个worker发送ReconnectWorker消息
  3. �worker在idToWorker和workers中都不存在:要么就是没有注册过,要么就是注册过但是超时时间超过了reaperIterations * workerTimeoutMs�。忽略这次心跳

spark master组件分析,spark,java,spring

注册application

Driver一般启动在client。
Driver在启动后需要向Master注册Application信息,这样Master就能将Worker上的资源及Executor分配给Driver。

  1. 创建新的application
  2. registerApplication(app)�注册application
  3. 将application信息持久化
  4. 开始调度,给application分配资源

spark master组件分析,spark,java,spring
spark master组件分析,spark,java,spring

处理请求executor

如果Driver启用了ExecutorAllocationManager,那么ExecutorAllocationManager将通过StandaloneAppClient中的ClientEndpoint,向Master发送RequestExecutors消息请求Executor。
ExecutorAllocationManager:根据工作负载动态分配executor
master收到RequestExecutors消息后,会调用handleRequestExecutors�方法。
spark master组件分析,spark,java,spring
handleRequestExecutors�
调整application的executor的数量限制,开始调度。

  • executor限制上调:只要有具有足够资源的workers,就会启动新的executor。
  • executor限制下调:在明确收到终止请求之前,我们不会终止现有的executor。�

spark master组件分析,spark,java,spring

处理executor状态变化

Executor在运行的整个生命周期中,会向Master不断发送ExecutorStateChanged消息报告自身的状态改变。

  • LAUNCHING:启动
  • RUNNING:运行中
  • KILLED:被driver killed
  • FAILED:运行报错
  • LOST:失联
  • EXITED�:退出,exitCode�为0正常退出,非0为异常退出

ExecutorStateChanged�消息处理:

  1. 状态为RUNNING,重置app的重试次数
  2. 状态为KILLED, FAILED, LOST, EXITED�,则为完成。
    1. 先移除对应的资源,在判断是否异常退出。
    2. 异常退出且达到最大重试次数,Executor所属的Application在没有任何Executor处于RUNNING状态时将被彻底移除。

spark master组件分析,spark,java,spring文章来源地址https://www.toymoban.com/news/detail-794938.html

到了这里,关于spark master组件分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Cloud微服务架构组件【Java培训】

    SpringCloud是一系列框架的有序集合,为开发人员构建微服务架构提供了完整的解决方案。Spring Cloud根据分布式服务协调治理的需求成立了许多子项目,每个项目通过特定的组件去实现,下面我们讲解一下Spring Cloud 包含的常用组件以及模块。 (1)Spring Cloud Config:分布式配置中心

    2023年04月25日
    浏览(28)
  • k8s master组件无法重启

    1.案例 k8s的master组件出错,删掉pod重新拉起也无法正常启动 kubectl get pod -n kube-system  可以看到controller和scheduler组件都显示异常  kubectl describe pod kube-apiserver-k8s-master03 -n kube-system 通过describe查看组件容器的详细信息也并没有报错输出 kubectl delete pod kube-controller-manager-k8s-master0

    2024年02月09日
    浏览(32)
  • Java之SpringCloud Alibaba【七】【Spring Cloud微服务网关Gateway组件】

    Java之SpringCloud Alibaba【一】【Nacos一篇文章精通系列】 跳转 Java之SpringCloud Alibaba【二】【微服务调用组件Feign】 跳转 Java之SpringCloud Alibaba【三】【微服务Nacos-config配置中心】 跳转 Java之SpringCloud Alibaba【四】【微服务 Sentinel服务熔断】 跳转 Java之SpringCloud Alibaba【五】【微服务

    2024年02月06日
    浏览(42)
  • K8s 部署 CNI 网络组件+k8s 多master集群部署+负载均衡

    ------------------------------ 部署 CNI 网络组件 ------------------------------ ---------- 部署 flannel ---------- K8S 中 Pod 网络通信: ●Pod 内容器与容器之间的通信 在同一个 Pod 内的容器(Pod 内的容器是不会跨宿主机的)共享同一个网络命名空间,相当于它们在同一台机器上一样,可以用 lo

    2024年02月08日
    浏览(36)
  • spark rpc(组件间通信)

    spark 组件间通信原本使用的是akka。后来改成了用netty实现了一个类似akka的框架。 主要类在 spark-core的rpc包下面。 RpcEnv:接口,rpc运行的环境 RpcEndpoint:RPC端点是对Spark的RPC通信实体的统一抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint。 RpcEndpointRef:RPC端点的引用,

    2024年01月24日
    浏览(26)
  • 【Elasticsearch专栏 16】深入探索:Elasticsearch的Master选举机制及其影响因素分析

    Elasticsearch,作为当今最流行的开源搜索和分析引擎,以其分布式、可扩展和高可用的特性赢得了广大开发者的青睐。在Elasticsearch的分布式架构中,集群的稳健性和高可用性很大程度上依赖于其Master节点的选举机制。本文将深入剖析Elasticsearch的Master选举过程,帮助读者更好地

    2024年04月17日
    浏览(28)
  • Java企业级信息系统开发学习笔记(4.2)Spring Boot项目单元测试、热部署与原理分析

    该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/pG623】 1. 添加测试依赖启动器和单元测试 修改pom.xml文件,添加依赖 刷新项目依赖 2. 创建测试类与测试方法 在 src/test/java 里创建 cn.kox.boot 包,创建测试类 TestHelloWorld01 给测试类添加测试启动器注解与Spring

    2024年02月10日
    浏览(43)
  • 危大工程智慧工地源码,微服务+Java+Spring Cloud +UniApp +MySql 物联网、人工智能、视频AI分析

    一套智慧工地管理平台源码,PC端+移动APP端+可视货数据管理端源码 智慧工地可视化系统利用物联网、人工智能、云计算、大数据、移动互联网等新一代信息技术,通过工地中台、三维建模服务、视频AI分析服务等技术支撑,实现智慧工地高精度动态仿真,趋势分析、预测、模

    2024年02月14日
    浏览(51)
  • 【Spark源码分析】Spark的RPC通信一-初稿

    在 RpcEnv 中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类 NettyRpcEnv 。 RpcEndpoints 需要向 RpcEnv 注册自己的名称,以便接收信息。然后, RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息,并将它们传送到相应的 RpcEndpoints 。对于 RpcEnv 捕捉

    2024年02月04日
    浏览(28)
  • 【Spark源码分析】Spark的RPC通信二-初稿

    传输层主要还是借助netty框架进行实现。 TransportContext 包含创建 TransportServer 、 TransportClientFactory 和使用 TransportChannelHandler 设置 Netty Channel 管道的上下文。 TransportClient 提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在 TransportContext 的范围之外进行(

    2024年02月03日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包