spark rpc(组件间通信)

这篇具有很好参考价值的文章主要介绍了spark rpc(组件间通信)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

spark 组件间通信原本使用的是akka。后来改成了用netty实现了一个类似akka的框架。
主要类在 spark-core的rpc包下面。

  • RpcEnv:接口,rpc运行的环境
  • RpcEndpoint:RPC端点是对Spark的RPC通信实体的统一抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint。
  • RpcEndpointRef:RPC端点的引用,抽象类RpcEndpoint-Ref定义了所有RpcEndpoint引用的属性与接口
  • NettyRpcEnv:RpcEnv的唯一实现
  • Dispatcher:Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint(RPC端点)
  • MessageLoop:MessageLoop的主要职责是处理和调度从网络接收到的消息。
  • Inbox:是Spark中的一个消息收件箱,它也位于每个本地节点上。
  • Outbox:是Spark中的一个消息发件箱,它也位于每个本地节点上。
  • NettyRpcCallContext:回调方法,在消息被处理后,回调处理成功或者失败的情况

spark rpc(组件间通信),spark,rpc,大数据

rpc启动

以spark中master启动举例
程序入口是Master类的main方法。在main中调用startRpcEnvAndEndpoint方法。
startRpcEnvAndEndpoint中 RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) 是开始创建一个RpcEnv
spark rpc(组件间通信),spark,rpc,大数据
首先创建RpcEnvConfig。参数含义在下图。其中clientMode没有传入,默认是false。代表不是客户端。
再通过NettyRpcEnvFactory创建NettyRpcEnv
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据
创建一个NettyRpcEnv
启动NettyRpcEnv
spark rpc(组件间通信),spark,rpc,大数据

NettyRpcEnv创建

NettyRpcEnv创建的时候,同时初始化了一些成员变量。
transportContext、clientFactory、transportConf 在上一篇网络通信是说过了。
下面主要讲讲 dispatcher
spark rpc(组件间通信),spark,rpc,大数据

dispatcher创建

dispatcher创建的时候,为了能路由消息,有两个缓存endpoints、endpointRefs。
同时创建了一个 sharedLoop,在同一个线程池中并行处理多个rpc实例的请求。
spark rpc(组件间通信),spark,rpc,大数据

ShareMessageLoop创建

创建线程池,提交循环任务到线程池中执行。
循环是不断获取inbox消息进行处理。
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据

NettyRpcEnv启动

回到NettyRpcEnvFactory的create方法中。
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1是启动的入口。
其中 startNettyRpcEnv 是一个函数。
在 Utils.startServiceOnPort 中 会调用这个函数来启动NettyRpcEnv。
spark rpc(组件间通信),spark,rpc,大数据
选择合适的端口,尝试多次调用startService启动。
因为spark要启动的service很多,抽象成了Utils里面的一个公共方法,包含了生成端口和重试的通用逻辑。每个service启动的具体方式都是在传入startService函数中实现
spark rpc(组件间通信),spark,rpc,大数据
startNettyRpcEnv实现就是nettyEnv.startServer。创建了server并在dispatcher中注册。
创建server逻辑在network-common包中,上一篇已经讲过了。主要是使用netty创建了一个server,同时绑定了rpcHandler的处理逻辑,并启动。

val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
  nettyEnv.startServer(config.bindAddress, actualPort)
  (nettyEnv, nettyEnv.address.port)
}

spark rpc(组件间通信),spark,rpc,大数据

注册rpc实例

这里的rpc实例是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性),并不是master。
1.创建endpointRef,添加到endpointRefs缓存中(这里在endpoint还没有注册完成就提起添加引用,是因为后面endpoint注册的时候会产生一个onStart的消息,需要endpointRef来处理onStart消息,此时endpointRef就会处理onStart消息)
2.在shareLoop中注册endpoint
3.endpoint注册完成,添加endpoints缓存中
spark rpc(组件间通信),spark,rpc,大数据
创建inbox消息,同时将onStart消息放到message队列中。
messageLoop注册 实例名和inbox到缓存中。
执行setActive,将inbox放入处理队列进行处理。
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据

endPoint启动

inbox中调用process处理onStart消息。
可以看到首先是判断message类型。这里传入的是onStart消息,所以走OnStart部分。
endPoint.onStart()
这个endPoint是什么?它是在dispatcher.registerRpcEndpoint的时候传入的是,是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性)。RpcEndpointVerifier的onStart方法默认什么都没有做。
spark rpc(组件间通信),spark,rpc,大数据
到此NettyRpc创建完成并且启动。

Master注册到RpcEnv

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)是master的创建,里面内容太多了。放在master创建里面单独开一篇。现在可以把它当成一个endPoint对待就行。
spark rpc(组件间通信),spark,rpc,大数据
setupEndpoint就是在dispatcher中注册endPoint。跟上面NettyRpcEnv启动的时候注册RpcEndpointVerifier一样。最后会调用endPoint的onStart方法。
spark rpc(组件间通信),spark,rpc,大数据
调用master中onStart方法。在上面new Master()是master的创建,onStart是master的启动。这部分也会放到master部分再讲。
spark rpc(组件间通信),spark,rpc,大数据
至此 master中关于RpcEnv启动完成。

发送rpc消息

NettyRpcEnv主要是以下两个发送消息的方法:
send:发送单向异步的消息。所谓“单向”就是发送完后就会忘记此次发送,不会有任何状态要记录,也不会期望得到服务端的回复。send采用了at-most-once的投递规则。RpcEndpointRef的send方法非常类似于Akka中Actor的tell方法。
ask:发送同步的请求,此类请求将会被RpcEndpoint接收,并在指定的超时时间内等待返回类型为T的处理结果。

spark rpc(组件间通信),spark,rpc,大数据

send方法

NettyRpcEndpointRef发送massage
生成requestMassage,三个参数(发送方的地址、接收方的地址、内容)
调用nettyRpcEnv的send方法
spark rpc(组件间通信),spark,rpc,大数据
判断接收方地址是不是本地地址。是的话就走local,不是的话走remote。
local比较简单,我们从remote分析
可以看到remote最后调用的是postToOutbox方法,顾名思义就是把消息放到发件箱。
spark rpc(组件间通信),spark,rpc,大数据

ask方法

我们从send方法停一下,看一下ask方法。
ask和send方法类似,也是先生成requestMassage再调用nettyRpcEnv的同名的askAbortable方法。但是ask参数多了一个超时时间,同时返回一个futrue对象,表明ask是需要回复的。
spark rpc(组件间通信),spark,rpc,大数据
可以看到最后调用的也是postToOutbox方法。表明send和ask最后都是将消息放到outbox发件箱中来处理的。
spark rpc(组件间通信),spark,rpc,大数据

postToOutbox放到发件箱

将消息加到massages队列中,调用drainOutbox方法处理队列的消息。
spark rpc(组件间通信),spark,rpc,大数据
drainOutbox中为了避免线程竞争,采用了分段加锁和判断状态提前返回的方式。
假设是首次调用这个方法,我们看看它会怎么做。
stopped是false,通过
connectFuture是null,通过
client是null,进入launchConnectTask方法,launchConnectTask完成后会return退出方法,所以launchConnectTask中肯定还有drainOutbox调用
spark rpc(组件间通信),spark,rpc,大数据

launchConnectTask创建远程连接

创建client并将它赋值到outbox的成员变量上,方便后面使用。
val _client = nettyEnv.createClient(address)
address是remote的地址。
具体创建client方法在上一篇已经写了,这里就不再写了。
最后再次调用drainOutbox,跟上面设想的一样。
spark rpc(组件间通信),spark,rpc,大数据

循环处理outbox的消息

在循环里面也采用分段加锁的方式。
发送消息是message.sendWith(_client)
最后messages队列中没有消息了,就将draining状态置为false,退出循环
spark rpc(组件间通信),spark,rpc,大数据

client发送消息

可以看到是client发送消息。这部分也在上一篇写过了,这里不再赘述了。
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据
到此,rpc消息发送完成。发送流程可以用下图表示:
spark rpc(组件间通信),spark,rpc,大数据

  • 序号①:表示通过调用NettyRpcEndpointRef的send和ask方法向本地节点的Rpc-Endpoint发送消息。由于是在同一节点,所以直接调用Dispatcher的postLocalMessage或postOneWayMessage方法将消息放入EndpointData内部Inbox的messages列表中。Message-Loop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。
  • 序号②:表示通过调用NettyRpcEndpointRef的send和ask方法向远端节点的Rpc-Endpoint发送消息。这种情况下,消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的messages列表中。
  • 序号③:表示每个Outbox的drainOutbox方法通过循环,不断从messages列表中取得OutboxMessage。
  • 序号④:表示每个Outbox的drainOutbox方法使用Outbox内部的TransportClient向远端的NettyRpcEnv发送序号③中取得的OutboxMessage。
  • 序号⑤:表示序号④发出的请求在与远端NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,然后经由NettyRpcHandler处理,最后NettyRpcHandler的receive方法会调用Dispatcher的postRemoteMessage或postOneWay-Message方法,将消息放入EndpointData内部Inbox的messages列表中。MessageLoop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。

接收rpc消息

在server端接收消息并处理,主要是通过rpcHandler。
rpcHandler是一个接口,这里的实现类是NettyRpcHandler,对应方法是receive
receive有两个,一个是有callback,对请求有回复。一个没有callback,不用回复。
都会调用internalReceive方法。
spark rpc(组件间通信),spark,rpc,大数据
收到远程消息的时候,需要调用postToAll通知dispatcher中已经注册的rpcPoint实例。postToAll中是每一个已经注册的endpoint调用postMessage发送RemoteProcessConnected消息。
spark rpc(组件间通信),spark,rpc,大数据
获取到messageLoop,使用messageLoop发送消息。
spark rpc(组件间通信),spark,rpc,大数据
从loop中获取对应的inbox收件箱,将message加入inbox中(就是加入inbox的messages缓存中)。再将inbox放入待处理队列中,等待处理。
spark rpc(组件间通信),spark,rpc,大数据
spark rpc(组件间通信),spark,rpc,大数据
后面处理可以参考上面的onStart消息的处理,流程是一样的。文章来源地址https://www.toymoban.com/news/detail-821810.html

到了这里,关于spark rpc(组件间通信)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 远程通信-RPC

            在分布式微服务架构中,远程通信是最基本的需求。 常见的远程通信方式,有基于 REST 架构的 HTTP协议、RPC 框架。         下面,从三个维度了解一下 RPC。 1、什么是远程调用 2、什么是 RPC 3、RPC 的运用场景和优         RPC 的概念与技术其实是比较早的,

    2024年02月13日
    浏览(28)
  • 大数据_面试_ETL组件常见问题_spark&flink

    问题列表 回答 spark与flink的主要区别 flink cdc如何确保幂等与一致性 Flink SQL CDC 实践以及一致性分析-阿里云开发者社区 spark 3.0 AQE动态优化 hbase memorystore blockcache sparksql如何调优 通过webui定位那个表以及jobid,jobid找对应的执行计划 hdfs的常见的压缩算法 hbase的数据倾斜 spark数据处

    2024年02月16日
    浏览(32)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(25)
  • RPC分布式通信框架

    在实际开发中单机服务器存在诸多问题: 1.受限于硬件资源无法提高并发量 2.任意模块的修改都将导致整个项目代码重新编译部署 3.在系统中,有些模块属于CPU密集型,有些属于I/O密集型,各模块对于硬件资源的需求不一样 什么是分布式?分布式是否可以解决以上问题? 分

    2024年04月28日
    浏览(28)
  • 自定义Dubbo RPC通信协议

    Dubbo 协议层的核心SPI接口是 org.apache.dubbo.rpc.Protocol ,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

    2024年01月19日
    浏览(42)
  • 【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

    RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。   RpcService主要包含如下两个重要方法。 startServer():用于启动

    2024年02月22日
    浏览(26)
  • 聊聊分布式架构04——RPC通信原理

    目录 RPC通信的基本原理 RPC结构 手撸简陋版RPC 知识点梳理 1.Socket套接字通信机制 2.通信过程的序列化与反序列化 3.动态代理 4.反射 思维流程梳理 码起来 服务端时序图 服务端—Api与Provider模块 客户端时序图 RPC通信的基本原理 RPC(Remote Procedure Call)是一种远程过程调用协议,

    2024年02月07日
    浏览(26)
  • 大数据系统常用组件理解(Hadoop/hive/kafka/Flink/Spark/Hbase/ES)

    一.Hadoop Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理。 Hadoop的核心是yarn、HDFS和Mapreduce。yarn是资源管理系统,实现资源调度,yarn是Hadoop2.0中的资源管理系统,总体上是master/slave结构。对于yarn可以粗浅将其理解

    2024年02月20日
    浏览(37)
  • 鸿蒙OS跨进程IPC与RPC通信

    基本概念 IPC(Inter-Process Communication)与RPC(Remote Procedure Call)用于实现跨进程通信,不同的是前者使用Binder驱动,用于设备内的跨进程通信,后者使用软总线驱动,用于跨设备跨进程通信。需要跨进程通信的原因是因为每个进程都有自己独立的资源和内存空间,其他进程不能

    2024年02月22日
    浏览(23)
  • 使用thrift进行RPC通信(附c程序示例)

    为了实现不同语言的程序跨进程、跨主机通信,一般可以采用mq或rpc框架来实现。 对于异步通知的场景可以使用mq,如zeroMQ。 但对于某些实时性较强且同步的应用场景,使用成熟的rpc框架来实现也是一种比较更好的选择。 开源的rpc框架有很多,其中跨语言的rpc框架以使用go

    2024年02月05日
    浏览(21)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包