spark 组件间通信原本使用的是akka。后来改成了用netty实现了一个类似akka的框架。
主要类在 spark-core的rpc包下面。
- RpcEnv:接口,rpc运行的环境
- RpcEndpoint:RPC端点是对Spark的RPC通信实体的统一抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint。
- RpcEndpointRef:RPC端点的引用,抽象类RpcEndpoint-Ref定义了所有RpcEndpoint引用的属性与接口
- NettyRpcEnv:RpcEnv的唯一实现
- Dispatcher:Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint(RPC端点)
- MessageLoop:MessageLoop的主要职责是处理和调度从网络接收到的消息。
- Inbox:是Spark中的一个消息收件箱,它也位于每个本地节点上。
- Outbox:是Spark中的一个消息发件箱,它也位于每个本地节点上。
- NettyRpcCallContext:回调方法,在消息被处理后,回调处理成功或者失败的情况
rpc启动
以spark中master启动举例
程序入口是Master类的main方法。在main中调用startRpcEnvAndEndpoint方法。
startRpcEnvAndEndpoint中 RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
是开始创建一个RpcEnv
首先创建RpcEnvConfig。参数含义在下图。其中clientMode没有传入,默认是false。代表不是客户端。
再通过NettyRpcEnvFactory创建NettyRpcEnv
创建一个NettyRpcEnv
启动NettyRpcEnv
NettyRpcEnv创建
NettyRpcEnv创建的时候,同时初始化了一些成员变量。
transportContext、clientFactory、transportConf 在上一篇网络通信是说过了。
下面主要讲讲 dispatcher
dispatcher创建
dispatcher创建的时候,为了能路由消息,有两个缓存endpoints、endpointRefs。
同时创建了一个 sharedLoop,在同一个线程池中并行处理多个rpc实例的请求。
ShareMessageLoop创建
创建线程池,提交循环任务到线程池中执行。
循环是不断获取inbox消息进行处理。
NettyRpcEnv启动
回到NettyRpcEnvFactory的create方法中。Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
是启动的入口。
其中 startNettyRpcEnv 是一个函数。
在 Utils.startServiceOnPort 中 会调用这个函数来启动NettyRpcEnv。
选择合适的端口,尝试多次调用startService启动。
因为spark要启动的service很多,抽象成了Utils里面的一个公共方法,包含了生成端口和重试的通用逻辑。每个service启动的具体方式都是在传入startService函数中实现
startNettyRpcEnv实现就是nettyEnv.startServer。创建了server并在dispatcher中注册。
创建server逻辑在network-common包中,上一篇已经讲过了。主要是使用netty创建了一个server,同时绑定了rpcHandler的处理逻辑,并启动。
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
注册rpc实例
这里的rpc实例是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性),并不是master。
1.创建endpointRef,添加到endpointRefs缓存中(这里在endpoint还没有注册完成就提起添加引用,是因为后面endpoint注册的时候会产生一个onStart的消息,需要endpointRef来处理onStart消息,此时endpointRef就会处理onStart消息)
2.在shareLoop中注册endpoint
3.endpoint注册完成,添加endpoints缓存中
创建inbox消息,同时将onStart消息放到message队列中。
messageLoop注册 实例名和inbox到缓存中。
执行setActive,将inbox放入处理队列进行处理。
endPoint启动
inbox中调用process处理onStart消息。
可以看到首先是判断message类型。这里传入的是onStart消息,所以走OnStart部分。
endPoint.onStart()
这个endPoint是什么?它是在dispatcher.registerRpcEndpoint的时候传入的是,是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性)。RpcEndpointVerifier的onStart方法默认什么都没有做。
到此NettyRpc创建完成并且启动。
Master注册到RpcEnv
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)是master的创建,里面内容太多了。放在master创建里面单独开一篇。现在可以把它当成一个endPoint对待就行。
setupEndpoint就是在dispatcher中注册endPoint。跟上面NettyRpcEnv启动的时候注册RpcEndpointVerifier一样。最后会调用endPoint的onStart方法。
调用master中onStart方法。在上面new Master()是master的创建,onStart是master的启动。这部分也会放到master部分再讲。
至此 master中关于RpcEnv启动完成。
发送rpc消息
NettyRpcEnv主要是以下两个发送消息的方法:
send:发送单向异步的消息。所谓“单向”就是发送完后就会忘记此次发送,不会有任何状态要记录,也不会期望得到服务端的回复。send采用了at-most-once的投递规则。RpcEndpointRef的send方法非常类似于Akka中Actor的tell方法。
ask:发送同步的请求,此类请求将会被RpcEndpoint接收,并在指定的超时时间内等待返回类型为T的处理结果。
send方法
NettyRpcEndpointRef发送massage
生成requestMassage,三个参数(发送方的地址、接收方的地址、内容)
调用nettyRpcEnv的send方法
判断接收方地址是不是本地地址。是的话就走local,不是的话走remote。
local比较简单,我们从remote分析
可以看到remote最后调用的是postToOutbox方法,顾名思义就是把消息放到发件箱。
ask方法
我们从send方法停一下,看一下ask方法。
ask和send方法类似,也是先生成requestMassage再调用nettyRpcEnv的同名的askAbortable方法。但是ask参数多了一个超时时间,同时返回一个futrue对象,表明ask是需要回复的。
可以看到最后调用的也是postToOutbox方法。表明send和ask最后都是将消息放到outbox发件箱中来处理的。
postToOutbox放到发件箱
将消息加到massages队列中,调用drainOutbox方法处理队列的消息。
drainOutbox中为了避免线程竞争,采用了分段加锁和判断状态提前返回的方式。
假设是首次调用这个方法,我们看看它会怎么做。
stopped是false,通过
connectFuture是null,通过
client是null,进入launchConnectTask方法,launchConnectTask完成后会return退出方法,所以launchConnectTask中肯定还有drainOutbox调用
launchConnectTask创建远程连接
创建client并将它赋值到outbox的成员变量上,方便后面使用。val _client = nettyEnv.createClient(address)
address是remote的地址。
具体创建client方法在上一篇已经写了,这里就不再写了。
最后再次调用drainOutbox,跟上面设想的一样。
循环处理outbox的消息
在循环里面也采用分段加锁的方式。
发送消息是message.sendWith(_client)
最后messages队列中没有消息了,就将draining状态置为false,退出循环
client发送消息
可以看到是client发送消息。这部分也在上一篇写过了,这里不再赘述了。
到此,rpc消息发送完成。发送流程可以用下图表示:
文章来源:https://www.toymoban.com/news/detail-821810.html
- 序号①:表示通过调用NettyRpcEndpointRef的send和ask方法向本地节点的Rpc-Endpoint发送消息。由于是在同一节点,所以直接调用Dispatcher的postLocalMessage或postOneWayMessage方法将消息放入EndpointData内部Inbox的messages列表中。Message-Loop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。
- 序号②:表示通过调用NettyRpcEndpointRef的send和ask方法向远端节点的Rpc-Endpoint发送消息。这种情况下,消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的messages列表中。
- 序号③:表示每个Outbox的drainOutbox方法通过循环,不断从messages列表中取得OutboxMessage。
- 序号④:表示每个Outbox的drainOutbox方法使用Outbox内部的TransportClient向远端的NettyRpcEnv发送序号③中取得的OutboxMessage。
- 序号⑤:表示序号④发出的请求在与远端NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,然后经由NettyRpcHandler处理,最后NettyRpcHandler的receive方法会调用Dispatcher的postRemoteMessage或postOneWay-Message方法,将消息放入EndpointData内部Inbox的messages列表中。MessageLoop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。
接收rpc消息
在server端接收消息并处理,主要是通过rpcHandler。
rpcHandler是一个接口,这里的实现类是NettyRpcHandler,对应方法是receive
receive有两个,一个是有callback,对请求有回复。一个没有callback,不用回复。
都会调用internalReceive方法。
收到远程消息的时候,需要调用postToAll通知dispatcher中已经注册的rpcPoint实例。postToAll中是每一个已经注册的endpoint调用postMessage发送RemoteProcessConnected消息。
获取到messageLoop,使用messageLoop发送消息。
从loop中获取对应的inbox收件箱,将message加入inbox中(就是加入inbox的messages缓存中)。再将inbox放入待处理队列中,等待处理。
后面处理可以参考上面的onStart消息的处理,流程是一样的。文章来源地址https://www.toymoban.com/news/detail-821810.html
到了这里,关于spark rpc(组件间通信)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!