【Spark源码分析】Spark的RPC通信一-初稿

这篇具有很好参考价值的文章主要介绍了【Spark源码分析】Spark的RPC通信一-初稿。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark的RPC通信一-初稿

Spark的RPC顶层设计

RpcEnv中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类NettyRpcEnv

RpcEndpoints 需要向 RpcEnv 注册自己的名称,以便接收信息。然后,RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息,并将它们传送到相应的 RpcEndpoints。对于 RpcEnv 捕捉到的未捕获异常,RpcEnv 会使用 RpcCallContext.sendFailure 将异常发回给发送者,如果没有发送者或出现 NotSerializableException,则记录异常。

RpcEnv 还提供了一些方法来检索给定名称或 uriRpcEndpointRefs

RpcEnvFactory中定义了创建RpcEnv的抽象方法,在NettyRpcEnvNettyRpcEnvFactory中使用Netty对继承的方式进行了实现。

NettRpcEnv中启动终端点方法setEndpoint中,会将RpcEndpointRpcEndpointRef相互以键值对的形式存储到ConcurrentHashMap中,最后在RpcEnvobject类中通过反射方式实现创建RpcEnv的实例的静态方法。

核心类NettyRpcEnv

NettyRpcEnv的核心成员和核心方法

  • transportConfTransportConf的实例对象,加载一些关于RPC的配置项
  • dispatcherDispatcher的实例对象,消息转发器,将RPC消息路由到要该对此消息处理的RpcEndpoint
  • streamManagerNettyStreamManager的实例对象,流的管理器,为NettyRpcEnv提供流式服务。
  • transportContextTransportContext的实例对象
  • clientFactory: 用于构造发送和接收响应的TransportClient
  • fileDownloadFactory: 用于文件下载的独立客户端工厂。这样可以避免使用与主 RPC 上下文相同的 RPC 处理程序,从而将这些客户端引起的事件与主 RPC 流量隔离开来。它还允许对某些属性进行不同的配置,例如每个对等节点的连接数。
  • serverTransportServer,提供高效的底层流媒体服务。
  • ConcurrentHashMap[RpcAddress, Outbox] outboxes:远程地址与Outbox的映射map。
  • startServer(bindAddress: String, port: Int)
    • 创建一个TransportServer
    • 向消息转发器中注册RpcEndpointVerifierRpcEndpointVerifier的注册名称为endpoint-verifier,用来校验RpcEndpoint是否存在的RpcEndpoint服务
  • send(message: RequestMessage): Unit
    • 发送消息时,将本地消息交于InBox,远程消息交于OutBox
  • ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)
    • 若请求消息的接收者的地址与当前的NettyRpcEnv的地址相同,将消息交通过dispatcher.postLocalMessage(message, p)方法处理,p中是成功和失败的回调函数。
    • 若请求消息的的接收者的地址与当前的NettyRpcEnv的地址不同时,将消息通过postToOutbox(message.receiver, rpcMessage)方法处理,主要是将消息放入outbox,然后传输到远程地址上。
    • 在方法的最后设定了一个定时器,实现消息请求的超时机制。
  • postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage):将消息传到远程节点上
    • 如果receiver.client不为空,那么消息将直接通过TransportClient发送到远端节点
    • 如果receiver.client为空,则获取远端结点地址对应的Outbox,若没有则新建一个
    • 如果NettyRpcEnv已经停止,移除该Outbox并停止,否则调用Outbox.send()发送消息。

核心类RpcEndpoint

RpcEndpoint是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint

RPC 的RpcEndpoint,它定义了给定消息时要触发的函数。保证按调用顺序为 onStartreceiveonStopRpcEndpoint的生命周期为constructor -> onStart -> receive* -> onStop。receive 可以并发调用。如果希望接收是线程安全的,则需要请使用 ThreadSafeRpcEndpoint。如果 RpcEndpoint 方法(onError 除外)抛出任何错误,onError 将被调用并说明原因。如果 onError 抛出错误,RpcEnv会将忽略。

ThreadSafeRpcEndpoint是继承自RpcEndpoint的特质,需要 RpcEnv 以线程安全方式向其发送消息的特性。主要用于对消息的处理,必须是线程安全的场景。ThreadSafeRpcEndpoint对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息。

核心类RpcEndpointRef

远程 RpcEndpoint 的引用。RpcEndpointRef 是线程安全的。用于消息发送方持有并发送消息。

核心成员

  • maxRetries最大尝试连接次数。可以通过spark.rpc.numRetries参数指定,默认3次
  • retryWaitMs每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait,默认3秒
  • defaultAskTimeoutRPC ask操作的超时时间。可以通过spark.rpc.askTimeout,默认120秒
  • address:远程RpcEndpoint引用的地址
  • name:远程RpcEndpoint引用的名称

核心方法

  • send():发送单向异步信息。只管发送,不管结果。
  • ask()系列:向远程的RpcEndpoint.receiveAndReply()方法发送消息,并带有超时机制的Future。该类方法只发送一次消息,从不重试。
  • askSync()系列:向相应的 RpcEndpoint.receiveAndReply 发送消息,并在指定超时内获取结果,如果失败则抛出异常。
    这是一个阻塞操作,可能会耗费大量时间,因此不要在 RpcEndpoint 的消息循环中调用它。

NettyRpcEndpointRef是其唯一的继承类。重写了ask()send()方法,主要是消息封装成RequestMessage,然后通过nettyEnvasksend方法将消息发送出去。

客户端发送请求简单示例图

  1. 若是向本地节点的RpcEndpoint发送消息
    1. 通过调用NettyRpcEndpointRefsend()ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点,所以直接调用DispatcherpostLocalMessage()postOneWayMessage()方法将消息放入EndpointData内部Inboxmessages中。
    2. InboxMessage放入后Inbox后,Inbox所属的endPointData就会放入receivers一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,
    3. MessageLoop将调用inbox.process()方法消息的处理。对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。
  2. 通过调用NettyRpcEndpointRefsend()ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outboxmessages中。
  3. 每个OutboxdrainOutbox()方法通过循环,不断从messages列表中取得OutboxMessage,并通过TransportClient发送,底层依赖Netty
  4. TransportClient和远端NettyRpcEnvTransportServer建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler将消息分发给TransportRequestHandler,最终会调用NettyRpcHandlerStreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法,之后与第一步所述一致,调用DispatcherpostRemoteMessage()或``postOneWayMessage()`方法。
  5. 如果TransportRequestHandler处理的是RpcRequest,那么server端的TransportRequestHandler处理消息时还会对client端进行响应,依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。

Spark RPC消息的发送与接收实现

OutboxMessage在客户端使用,是对外发送消息的封装。InboxMessage在服务端使用,是对接收消息的封装

InboxMessage是一个scala特质类,所有的RPC消息都继承自InboxMessage。下面是继承自InboxMessage的子类

  • OneWayMessageRpcEndpoint处理此类型的消息后不需要向客户端回复信息。
  • RpcMessageRpcEndpoint处理完此消息后需要向客户端回复信息。
  • OnStartInbox实例化后,再通知与此Inbox相关联的RpcEndpoint启动。
  • OnStopInbox停止后,通知与此Inbox相关联的RpcEndpoint停止。
  • RemoteProcessConnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务建立了连接。
  • RemoteProcessDisconnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务断开了连接。
  • RemoteProcessConnectionError:告诉所有的RpcEndpoint,与远端某个地址之间的连接发生了错误。

核心类Inbox

InboxRpcEndpoint存储了消息即InboxMessage,并线程安全地发送给RpcEndPoint

private[netty] class Inbox(
    val endpointRef: NettyRpcEndpointRef,
    val endpoint: RpcEndpoint)
  extends Logging {

  //相当于给this起了一个别名为inbox,
  inbox =>  
  
}

重要的属性

  • messages所有的消息以消息盒子的方式,通过LinkedList链式存储
  • enableConcurrent是否同时允许多线程同时处理消息
  • numActiveThreadsInbox中正在处理消息的线程数

重要方法

  • post()InboxMessage投递到box中,从下面的代码可以看出使用了synchronized保证线程安全,如果该box已经关闭,消息将会丢弃。

    def post(message: InboxMessage): Unit = inbox.synchronized {
      if (stopped) {
        // 日志进行warning输出
        onDrop(message)
      } else {
        messages.add(message)
        false
      }
    }
    
  • process()处理存储在messages中的消息

      def process(dispatcher: Dispatcher): Unit = {
        var message: InboxMessage = null
        // 1.以synchronized进行并发检查,开启并发则取消息,numActiveThreads自增1。
        inbox.synchronized {
          if (!enableConcurrent && numActiveThreads != 0) {
            return
          }
          message = messages.poll()
          if (message != null) {
            numActiveThreads += 1
          } else {
            return
          }
        }
        while (true) {
          // 安全回调?处理异常的
          safelyCall(endpoint) {
            //对不同消息,通过模式匹配进行通过不同的endpoint进行处理
            message match {
              case RpcMessage(_sender, content, context) =>
                try {
                  endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                    throw new SparkException(s"Unsupported message $message from ${_sender}")
                  })
                } catch {
                  case e: Throwable =>
                    context.sendFailure(e)
                    // Throw the exception -- this exception will be caught by the safelyCall function.
                    // The endpoint's onError function will be called.
                    throw e
                }
    
              case OneWayMessage(_sender, content) =>
                endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
                  throw new SparkException(s"Unsupported message $message from ${_sender}")
                })
    
              case OnStart =>
                endpoint.onStart()
                if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                  inbox.synchronized {
                    if (!stopped) {
                      enableConcurrent = true
                    }
                  }
                }
    
              case OnStop =>
                val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                assert(activeThreads == 1,
                  s"There should be only a single active thread but found $activeThreads threads.")
                dispatcher.removeRpcEndpointRef(endpoint)
                endpoint.onStop()
                assert(isEmpty, "OnStop should be the last message")
    
              case RemoteProcessConnected(remoteAddress) =>
                endpoint.onConnected(remoteAddress)
    
              case RemoteProcessDisconnected(remoteAddress) =>
                endpoint.onDisconnected(remoteAddress)
    
              case RemoteProcessConnectionError(cause, remoteAddress) =>
                endpoint.onNetworkError(cause, remoteAddress)
            }
          }
    
          inbox.synchronized {
            // 调用 `onStop` 后,"enableConcurrent "将被设置为 false,所以需要每次都检查它。
            if (!enableConcurrent && numActiveThreads != 1) {
              // 此线程退出,降低并发,最终归于一个线程处理剩下的消息
              numActiveThreads -= 1
              return
            }
            message = messages.poll()
            // 没有消息之后,退出当前循环
            if (message == null) {
              numActiveThreads -= 1
              return
            }
          }
        }
      }
    
  • stop()enableConcurrent赋值为false,保证当前是唯一活跃的线程。并在messages中添加onStop消息。

    def stop(): Unit = inbox.synchronized {
      // 优雅关闭,是关闭并发只留一个线程处理消息。确保OnStop为最后一个消息,这样,"RpcEndpoint.onStop "就可以安全地释放资源了。
      if (!stopped) {
        enableConcurrent = false
        stopped = true
        messages.add(OnStop)
      }
    }
    

核心类Dispatcher

Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint

内部类

  • EndpointData:包装一个Inbox类。一个RpcEndpoint与NettyRpcEndpointRef映射关联在一起。即一个Inbox只为一个映射关系服务。
  • MessageLoop:用于转发信息的循环任务类,从receivers中获取有消息的inbox进行处理。

重要属性

  • endpoints储存nameEndpointData的映射关系EndpointData包含了nameRpcEndpoint, NettyRpcEndpointRefInbox,采用ConcureentHashMap保证线程安全
  • endpointRefs储存RpcEndpointRpcEndpointRef的映射关系。采用ConcureentHashMap保证线程安全
  • receivers存储inbox中可能包含message的EndpointData。在MessageLoop中取出并处理消息。使用阻塞队列LinkedBlockingQueue存储。
  • threadpool用于调度消息的线程池。根据spark.rpc.netty.dispatcher.numThreads创建固定大小的线程池,启动与线程池大小相同个数的MessageLoop任务。

重要方法

  • registerRpcEndpoint()在调度器中注册endpoint。由nameRpcEndpoint构建NettyRpcEndpointRef,并加入到endpoints, endpointRefs, receivers
  • postToAll()将message投递到在注册到该Dispatcher的所有RpcEndpointpostMessage()将message投递到注册到该Dispatcher指定name的RpcEndpoint中,并将EndpointData放入receivers中,该方法中还传入了失败回调函数
  • unregisterRpcEndpoint(), stop():注销所有已注册的RpcEndpoint,从endpoints中移除并在inbox中增加了onstop消息。在receivers中插入哨兵,等待receivers中的所有消息都处理完毕后,关闭线程池。

Dispatcher中的消息处理流程。

  1. postToAll()或者postxx()方法会调用postMessage()方法将InboxMessage放到对应endPointDatainboxmessages列表(调用inbox.post())
  2. InboxMessage放入后inbox后,inbox所属的endPointData就会放入receivers
  3. 一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,因为receivers是一个阻塞队列
  4. MessageLoop将调用inbox.process()方法消息的处理。利用模式匹配,对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。

核心类Outbox

OutboxMessage是一个特质,内部只有未实现的SendWith方法和onFailure方法。OneWayOutboxMessageRpcOutboxMessage都继承自OutboxMessage特质,实现的SendWith通过调用TransportClientsendRpc()方法发送信息,其中RpcOutboxMessage还增加了超时和发送成功的回调方法。

Outbox的重要属性

  • messages: 保存要发送的OutboxMessageLinkedList类型,线程不安全
  • client: TransportClient
  • stopped: 当前Outbox是否停止的标识
  • draining: 表示当前Outbox内正有线程在处理messages中消息的状态

重要方法

  • send():将要发送的OutboxMessage首先保存到成员变量链表messages中,若Outbox未停止则调用drainOutbox()方法处理messages中的信息。因为messagesLinkedList类型,线程不安全,所以在添加和删除时使用了同步机制。之后调用了私有的drainOutbox()方法发送消息。发送信息。如果没有活动连接,则缓存并启动新连接。如果[[发件箱]]被停止,发送者将收到[[SparkException]]通知。

      def send(message: OutboxMessage): Unit = {
        val dropped = synchronized {
          if (stopped) {
            true
          } else {
            messages.add(message)
            false
          }
        }
        if (dropped) {
          message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
        } else {
          drainOutbox()
        }
      }
    
  • drainOutbox():先判断是否已停止,client是否空等前置条件。取出一条消息,并将draining置为true,接下来将messages中所有消息调用sendWith()方法发送。耗尽消息队列。如果有其他线程正在排空,则直接退出。如果尚未建立连接,则在 nettyEnv.clientConnectionExecutor 中启动一个任务来建立连接。

  • launchConnectTask(): 初始化client

  • stop():停止Outbox

    • Outbox的停止状态stopped置为true
    • 关闭TransportClient
    • 清空messages中的消息

之所以要使用这种机制来发消息,是保证并发发送消息时,所有消息依次添加到Outbox中,并依次传输,同时不会阻塞send()方法文章来源地址https://www.toymoban.com/news/detail-761197.html

到了这里,关于【Spark源码分析】Spark的RPC通信一-初稿的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

    RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。   RpcService主要包含如下两个重要方法。 startServer():用于启动

    2024年02月22日
    浏览(36)
  • 远程通信-RPC

            在分布式微服务架构中,远程通信是最基本的需求。 常见的远程通信方式,有基于 REST 架构的 HTTP协议、RPC 框架。         下面,从三个维度了解一下 RPC。 1、什么是远程调用 2、什么是 RPC 3、RPC 的运用场景和优         RPC 的概念与技术其实是比较早的,

    2024年02月13日
    浏览(37)
  • rpc通信原理浅析

    rpc(remote procedure call),即远程过程调用,广泛用于分布式或是异构环境下的通信,数据格式一般采取protobuf。 protobuf(protocol buffer)是google 的一种数据交换的格式,它独立于平台语言。 google 提供了protobuf多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言

    2024年02月15日
    浏览(31)
  • RPC分布式通信框架

    在实际开发中单机服务器存在诸多问题: 1.受限于硬件资源无法提高并发量 2.任意模块的修改都将导致整个项目代码重新编译部署 3.在系统中,有些模块属于CPU密集型,有些属于I/O密集型,各模块对于硬件资源的需求不一样 什么是分布式?分布式是否可以解决以上问题? 分

    2024年04月28日
    浏览(41)
  • 自定义Dubbo RPC通信协议

    Dubbo 协议层的核心SPI接口是 org.apache.dubbo.rpc.Protocol ,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

    2024年01月19日
    浏览(52)
  • 微服务通信[HTTP|RPC同步通信、MQ异步通信]

    A服务调用B服务,B服务调C服务,C服务调D服务,即微服务之间的通信(也可以叫微服务之间的调用) 一种轻量级的通信协议,常用于在不同的微服务之间进行通信,也是最简单的通信方式 使用REST ful为开发规范,将服务对外暴露的HTTP调用方式为REST API(如GET、POST、PUT、DELETE等),已经成

    2024年02月09日
    浏览(37)
  • Spark Kubernetes 的源码分析系列 - scheduler

    这一块代码可以理解为 Spark 是如何实现一个基于 K8S 的调度器,来调度生成 Executor Pod 的。 由于上篇文章主要介绍了 Driver 的 Pod 是如何生成的,在讲 scheduler 之前,先补充一下 Executor 的配置步骤。重点代码在下面这个  features  里。步骤跟 Driver 类似,但是少了一些,剩下的

    2024年04月27日
    浏览(27)
  • Spark Kubernetes 的源码分析系列 - features

    features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。 看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。 还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到

    2024年04月10日
    浏览(35)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(32)
  • 聊聊分布式架构04——RPC通信原理

    目录 RPC通信的基本原理 RPC结构 手撸简陋版RPC 知识点梳理 1.Socket套接字通信机制 2.通信过程的序列化与反序列化 3.动态代理 4.反射 思维流程梳理 码起来 服务端时序图 服务端—Api与Provider模块 客户端时序图 RPC通信的基本原理 RPC(Remote Procedure Call)是一种远程过程调用协议,

    2024年02月07日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包