master职责和常用成员变量
master只有在local-cluster和standalone部署模式下存在。
Master的职责包括Worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个Worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。
Worker向Master注册时会携带自身的身份和资源信息(如ID、host、port、内核数、内存大小等),这些资源将按照一定的资源调度策略分配给Driver或Application。Master给Driver分配了资源后,将向Worker发送启动Driver的命令,后者在接收到启动Driver的命令后启动Driver。
Master给Application分配了资源后,将向Worker发送启动Executor的命令,后者在接收到启动Executor的命令后启动Executor。Master接收Worker的状态更新消息,用于“杀死”不匹配的Driver或Application。
Worker向Master发送的心跳消息有两个目的:一是告知Master自己还“活着”,另外则是某个Master出现故障后,通过领导选举选择了其他Master负责对整个集群的管理,此时被激活的Master可能并没有缓存Worker的相关信息,因此需要告知Worker重新向新的Master注册。
master的成员变量比较多,介绍几个主要的。
- rpcEnv:即RpcEnv。
- address:RpcEnv的地址(即RpcAddress)。RpcAddress只包含host和port两个属性,用来记录Master URL的host和port。
- checkForWorkerTimeOutTask:检查Worker超时的任务。
- forwardMessageThread:包含一个线程的ScheduledThreadPoolExecutor,启动的线程以master-forward-message-thread作为名称。forwardMessageThread主要用于运行checkForWorkerTimeOutTask和recoveryCompletionTask。
- workers:所有注册到Master的Worker信息(WorkerInfo)的集合。
- idToApp:Application ID与ApplicationInfo的映射关系。
- waitingApps:正等待调度的Application所对应的ApplicationInfo的集合。
- apps:所有ApplicationInfo的集合。
- idToWorker:Worker id与WorkerInfo的映射关系。
- addressToWorker:Worker的RpcEnv的地址(RpcAddress)与WorkerInfo的映射关系。
- endpointToApp:RpcEndpointRef与ApplicationInfo的映射关系。
- addressToApp:Application对应Driver的RpcEnv的地址(RpcAddress)与App-licationInfo的映射关系。
- drivers:所有Driver信息(DriverInfo)的集合。
- waitingDrivers:正等待调度的Driver所对应的DriverInfo的集合。
- state:Master所处的状态。Master的状态包括支持(STANDBY)、激活(ALIVE)、恢复中(RECOVERING)、完成恢复(COMPLETING_RECOVERY)等。
很大部分都是master用来记录worker、application、driver的对应关系和状态的。这也表明master有负责协调它们的职责。
master启动
启动Master有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于Standalone模式。
对象启动就是调用startRpcEnvAndEndpoint�方法。进程启动是调用main方法,在main方法中调用startRpcEnvAndEndpoint�方法。所以两种启动方式最后都是startRpcEnvAndEndpoint�方法的调用。
startRpcEnvAndEndpoint�主要做了三件事情:
-
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)�
创建了rpcEnv。 -
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))�
new master创建了master,rpcEnv.setupEndpoint向rpcEnv注册。 -
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)�
发送绑定REST�端口的消息。
初始化rpcEnv和注册master部分在上一篇组件间通信中讲过了。其中在注册master的时候,会生成一条onStart的消息,发送给master,根据消息类型来处理,这里是onStart消息就会调用onStart方法。
onStart方法中主要也是初始化各种服务,是new master之后的延续。
至此,master已经成功启动了。
检查worker超时
在onStart方法中使用forwardMessageThread�线程池周期性的使用send发送CheckForWorkerTimeOut�消息。
send发送的消息都是one-way message。最后调用的是endpoint的receive方法。
receive中根据消息类型是CheckForWorkerTimeOut,调用timeOutDeadWorkers�方法。
timeOutDeadWorkers�是实际移除超时worker的方法,方法比较简单,主要是判断worker状态和是否超时,满足条件的worker移除。
master的高可用
master实现了LeaderElectable特质,可以参与选举。
�LeaderElectionAgent�是负责选举的类,对应master中的leaderElectionAgent�变量。
master的高可用主要涉及persistenceEngine(保存master的相关数据,以便于恢复)�和leaderElectionAgent�(选主服务),两者都是根据配置生成对应的类。
- 默认:BlackHolePersistenceEngine(空实现)和MonarchyLeaderAgent(空实现)
- FILESYSTEM�:FileSystemPersistenceEngine(基于文件持久化)和MonarchyLeaderAgent(空实现)
- ZOOKEEPER:ZooKeeperPersistenceEngine(基于zk持久化)和ZooKeeperLeaderElectionAgent(基于zk选举)
- CUSTOM:自定义实现类
一般生产使用ZOOKEEPER。
ZooKeeperLeaderElectionAgent可以继承了LeaderLatchListener�(zk框架的选主)
是使用curator�来操作zk来进行选主的,具体curator�选主本文不再赘述可以找找资料。
当节点选中为leader的时候会执行isLeader方法,没选中会执行notLeader方法。
最后都是会执行updateLeadershipStatus�
判断是否选中leader来执行master对应的方法(electedLeader�、revokedLeadership�)
执行master的方法,就是发送ElectedLeader/RevokedLeadership�消息
master收到ElectedLeader消息,判断时候需要恢复。如果需要就调用beginRecovery方法,设置延迟发送�CompleteRecovery�消息。
beginRecovery� 根据从持久化来的数据重新注册application、driver和worker,并通知对应节点发送了master的主从切换。(节点收到主从切换消息后处理过程不在本文描述,可以猜测应该是更换上报master节点之类的信息)
master等待一段时间后收到CompleteRecovery�,这期间是用来等待相关节点上报。
CompleteRecovery 移除超时没有上报的worker和application,重新调度超时的driver。最后调用schedule方法进行资源调度。
master收到RevokedLeadership�消息,直接退出进程(这里我觉得应该可以改成将master切换成standBy,不应该简单的退出)。
至此master高可用完成
driver调度
master的调度入口都是 schedule方法。
优先调度driver,再调度executor。
- 挑选存活的worker,并打乱顺序。随机分配
- 遍历需要资源的driver和worker,使用
canLaunchDriver
�判断这次的循环中worker有没有足够的资源调度driver,没有就继续。 - 找到worker的资源可以调度driver,使用
launchDriver(worker, driver)�
让worker调度driver -
!launched && isClusterIdle�
则表明没有worker有足够资源调度driver,driver等待下一次调度
canLaunchDriver
判断worker的内存、cpu、资源(可以是gpu、fpga等)的大小是否满足。
dirver和executor都是相同的方式判断。launchDriver(worker, driver)�
主要是发送LaunchDriver�消息给worker,让worker拿出相应的资源来调度driver。
同时也更新了driver和worker的状态。
executor调度
- 获取application的单个executor的资源需求(比如 1cpu/2G)
- 挑选出至少满足单个executor的资源需求
canLaunchExecutor�
的存活的worker节点,并将worker节点按照空间cpu数倒排。为了优先分配给内核资源充足的worker - 按照规则将executor分配给这些worker
- 按照分配结果发送消息给worker来启动executor
分配规则
分配规则有点长,其实就是两种模式。
- 一个是密集型,优先在一个worker上分配,直到这个worker没有资源,再往下一个worker上分配。
- 一个是分散型,在所有的worker上依次分配,完成后再按照顺序再次依次分配。
- 获取计算各种参数。主要是这几个
- coresToAssign�:本次需要分配的core。取的是application未分配的core数和worker剩余core数的最小值。
- minCoresPerExecutor�:一个executor的最小core数。没有指定coresPerExecutor�(一个executor分配core数)的话取值1.
- oneExecutorPerWorker�:是否在一个worker上只分配一个executor,coresPerExecutor�空则为true。
- spreadOutApps�:false集中分配,true分散分配
- 依次调用canLaunchExecutorForApp�获取可以分配executor的worker节点的下标�
- 循环分配,直到分配完成或者没有空闲的worker停止。
-
var keepScheduling = true�
keepScheduling表示worker是否还参与调度,和spreadOutApps�参数相关。 - oneExecutorPerWorker�是控制worker上分配executor的个数,如果为true最多分配一个,后续调度的话在这个worker上不增加executor的个数,增加core数和内存。
-
canLaunchExecutorForApp
主要从这几个方面判断资源是否满足
- keepScheduling:全部剩余待分配的core数大于等于单个executor最小分配core数�。
- enoughCores :这个worker有足够的core。
- enoughMemory :这个worker有足够的内存。
- enoughResources :这个worker有足够的资源。
- underLimit�:新加一个executor,app的executor总数不会超过app的executor最大限制。
如果是oneExecutorPerWorker�为true,一个worker只能有一个executor,并且这个worker已经分配了executor,只添加core。只判断keepScheduling和enoughCores。
正式调度executor
- 根据assignedCores和coresPerExecutor�计算出要在worker上分配几个executor,如果coresPerExecutor为空则默认1个
- 计算coresToAssign�每个executor分配几个cpu核,如果coresPerExecutor为空则所有的cpu核(assignedCores)都分配到一个上面。
- 按照executor数量,调用launchExecutor�方法依次在worker上启动。launchExecutor�方法主要就是发送LaunchExecutor�消息到worker和发送ExecutorAdded消息给driver�。
worker注册
一个集群刚开始的时候只有Master,为了让后续启动的Worker加入到Master的集群中,每个Worker都需要在启动的时候向Master注册,Master接收到Worker的注册信息后,将把Worker的各种重要信息(如ID、host、port、内核数、内存大小等信息)缓存起来,以便进行资源的分配与调度。Master为了容灾,还将Worker的信息通过持久化引擎进行持久化,以便经过领导选举出的新Master能够将集群的状态从错误或灾难中恢复。
- master状态是standby,发送MasterInStandby消息给worker
- idToWorker中含有此次注册的worker的id,发送RegisteredWorker�消息给worker
- worker注册成功(registerWorker�),发送RegisteredWorker�消息给worker、持久化、开启调度。注册失败的话发送RegisterWorkerFailed�消息给worker。
registerWorker
- 移除workers缓存中相同address的dead的worker节点
- 判断addressWorker中是否存在相同地址的worker节点。
- 如果存在且状态为UNKOWN,这表明是master是在恢复的状态,移除这个旧的worker。
- 存在状态不是UNKOWN的,就是ACTIVE。相同address只能有一个worker(新的worker和旧的worker的id不同,地址相同),新的worker不能注册。
- 更新master中worker相关的缓存
worker向master注册成功后会发送一个WorkerLatestState�消息给master
WorkerLatestState�消息中携带了worker上运行的executor和driver的信息。
遍历executor和driver,匹配不上的发送KillExecutor/KillDriver�消息给master。
处理心跳
master收到Heartbeat�消息
- 从idToWorker�中获取worker:更新最后的心跳时间为当前时间。
- idToWorker�不存在worker,但是worker存在:说明这个worker注册过,但是超时了。向这个worker发送ReconnectWorker消息
- �worker在idToWorker和workers中都不存在:要么就是没有注册过,要么就是注册过但是超时时间超过了reaperIterations * workerTimeoutMs�。忽略这次心跳
注册application
Driver一般启动在client。
Driver在启动后需要向Master注册Application信息,这样Master就能将Worker上的资源及Executor分配给Driver。
- 创建新的application
-
registerApplication(app)�
注册application - 将application信息持久化
- 开始调度,给application分配资源
处理请求executor
如果Driver启用了ExecutorAllocationManager,那么ExecutorAllocationManager将通过StandaloneAppClient中的ClientEndpoint,向Master发送RequestExecutors消息请求Executor。
ExecutorAllocationManager:根据工作负载动态分配executor
master收到RequestExecutors消息后,会调用handleRequestExecutors�方法。
handleRequestExecutors�
调整application的executor的数量限制,开始调度。
- executor限制上调:只要有具有足够资源的workers,就会启动新的executor。
- executor限制下调:在明确收到终止请求之前,我们不会终止现有的executor。�
处理executor状态变化
Executor在运行的整个生命周期中,会向Master不断发送ExecutorStateChanged消息报告自身的状态改变。
- LAUNCHING:启动
- RUNNING:运行中
- KILLED:被driver killed
- FAILED:运行报错
- LOST:失联
- EXITED�:退出,exitCode�为0正常退出,非0为异常退出
ExecutorStateChanged�消息处理:文章来源:https://www.toymoban.com/news/detail-794938.html
- 状态为RUNNING,重置app的重试次数
- 状态为KILLED, FAILED, LOST, EXITED�,则为完成。
- 先移除对应的资源,在判断是否异常退出。
- 异常退出且达到最大重试次数,Executor所属的Application在没有任何Executor处于RUNNING状态时将被彻底移除。
文章来源地址https://www.toymoban.com/news/detail-794938.html
到了这里,关于spark master组件分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!