【Spark源码分析】Spark的RPC通信二-初稿

这篇具有很好参考价值的文章主要介绍了【Spark源码分析】Spark的RPC通信二-初稿。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark的RPC通信二-初稿

Spark RPC的传输层

传输层主要还是借助netty框架进行实现。

TransportContext包含创建 TransportServerTransportClientFactory 和使用 TransportChannelHandler 设置 Netty Channel 管道的上下文。TransportClient 提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在 TransportContext 的范围之外进行(即由用户提供的处理程序执行),它负责设置流,这些流可以使用零拷贝 IO 以块为单位通过数据平面进行流式传输。对消息的处理由RpcHandler处理。TransportServerTransportClientFactory 都会为每个通道创建一个 TransportChannelHandler。由于每个 TransportChannelHandler 都包含一个 TransportClient,因此服务器进程可以通过现有通道向客户端发送消息。

传输上下文TransportContext

TransportContext的核心成员与核心方法

  • TransportConf conf:传输的配置信息
  • RpcHandler rpcHandler:对接收的RPC消息进行处理
  • EventLoopGroup chunkFetchWorkers:处理 ChunkFetchRequest 的独立线程池。这有助于控制通过底层通道将 ChunkFetchRequest 信息写回客户端时阻塞的 TransportServer 工作线程的最大数量。
  • createClientFactory():初始化 ClientFactory,在返回新客户端之前运行给定的 TransportClientBootstraps。Bootstraps 将同步执行,并且必须成功运行才能创建客户端。
  • createServer():创建传输服务端TransportServer的实例
  • initializePipeline():对TransportClientTransportRequestHandlerTransportResponseHandler进行初始化,然后在用其构造TransportChannelHandler对象。借助Netty的API对管道进行配置。

TransportContextcreateClientFactory方法创建传输客户端工厂TransportClientFactory的实例。在构造TransportClientFactory的实例时,还会传递客户端引导程序TransportClientBootstrap的列表。TransportClientFactory内部维护每个Socket地址的连接池。通过调用TransportContextcreateServer方法创建传输服务端TransportServer的实例。

核心类TransportClientFactory

用于使用 createClient方法 创建 TransportClients 的工厂。该工厂负责维护与其他主机的连接池,并为同一远程主机返回相同的 TransportClient。它还为所有 TransportClients 共享一个工作线程池。只要有可能,就会重复使用 TransportClients。在完成创建新的 TransportClient 之前,将运行所有给定的 TransportClientBootstraps

TransportClientFactory的核心成员和核心方法

  • 静态内部类ClientPool:一种简单的数据结构,用于跟踪两个对等节点之间的客户端连接池,保障其可以复用,由于线程不安全,所以增加了客户端对应的锁。

      private static class ClientPool {
        TransportClient[] clients;
        Object[] locks;
    
        ClientPool(int size) {
          clients = new TransportClient[size];
          locks = new Object[size];
          for (int i = 0; i < size; i++) {
            locks[i] = new Object();
          }
        }
      }
    
  • TransportContext context:TransportContext 的实例对象

  • TransportConf conf:链接配置信息的实例对象

  • List<TransportClientBootstrap> clientBootstraps:客户端的引导程序,主要是客户端在建立连接的时候,进行一些初始化的准备操作。

  • ConcurrentHashMap<SocketAddress, ClientPool> connectionPool:维护了连接地址上的客户端连接池的映射表。

  • createClient(String remoteHost, int remotePort)

    • 首先根据远程地址,确认客户端连接池connectionPool中是否存在关于这个地址的客户端池clientPool,如果没有就新建一个客户端池放入连接池中。
    • 检查通道是否超时和客户端是否存活,如果客户端失活,则需要重建一个客户端。创建客户端的在createClient(InetSocketAddress address)方法中。
      public TransportClient createClient(String remoteHost, int remotePort)
          throws IOException, InterruptedException {
        // 此处使用未解析地址,以避免每次创建客户端时都进行 DNS 解析。
        final InetSocketAddress unresolvedAddress =
          InetSocketAddress.createUnresolved(remoteHost, remotePort);
    
        // 如果clientPool不存在,则新建.
        ClientPool clientPool = connectionPool.get(unresolvedAddress);
        if (clientPool == null) {
          connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
          clientPool = connectionPool.get(unresolvedAddress);
        }
    
        int clientIndex = rand.nextInt(numConnectionsPerPeer);
        TransportClient cachedClient = clientPool.clients[clientIndex];
    
        if (cachedClient != null && cachedClient.isActive()) {
          // 更新处理程序的最后使用时间,确保通道不会超时
          TransportChannelHandler handler = cachedClient.getChannel().pipeline()
            .get(TransportChannelHandler.class);
          synchronized (handler) {
            handler.getResponseHandler().updateTimeOfLastRequest();
          }
          // 然后检查客户端是否还活着,以防在代码更新之前超时。
          if (cachedClient.isActive()) {
            logger.trace("Returning cached connection to {}: {}",
              cachedClient.getSocketAddress(), cachedClient);
            return cachedClient;
          }
        }
    
        // 如果我们到达这里,就没有打开现有连接,尝试创建一个新连接。
        final long preResolveHost = System.nanoTime();
        final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
        final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
        if (hostResolveTimeMs > 2000) {
          logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
        } else {
          logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
        }
        // 多个线程可能会竞相在这里创建新连接。通过同步原语只保留其中一个处于活动状态。
        synchronized (clientPool.locks[clientIndex]) {
          cachedClient = clientPool.clients[clientIndex];
    
          if (cachedClient != null) {
            if (cachedClient.isActive()) {
              logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
              return cachedClient;
            } else {
              logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
            }
          }
          clientPool.clients[clientIndex] = createClient(resolvedAddress);
          return clientPool.clients[clientIndex];
        }
      }
    
  • createClient(InetSocketAddress address)

    • 通过Netty的根引导程序进行初始化配置
    • 通过回调函数初始化bootstrap的Pipeline,设置好客户端引用和管道引用。
    • 遍历客户端引导程序集clientBootstraps,执行其初始化的内容
      private TransportClient createClient(InetSocketAddress address)
          throws IOException, InterruptedException {
        logger.debug("Creating new connection to {}", address);
        // netty的连接创建的根引导程序
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
          .channel(socketChannelClass)
          // 禁用纳格尔算法,因为我们不想让数据包等待
          .option(ChannelOption.TCP_NODELAY, true)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
          .option(ChannelOption.ALLOCATOR, pooledAllocator);
    
        if (conf.receiveBuf() > 0) {
          bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
        }
    
        if (conf.sendBuf() > 0) {
          bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
        }
    
        final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
        final AtomicReference<Channel> channelRef = new AtomicReference<>();
    
        // 通过回调函数初始化bootstrap的Pipeline
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel(SocketChannel ch) {
            TransportChannelHandler clientHandler = context.initializePipeline(ch);
            clientRef.set(clientHandler.getClient());
            channelRef.set(ch);
          }
        });
    
        // 连接远程服务器
        long preConnect = System.nanoTime();
        ChannelFuture cf = bootstrap.connect(address);
        if (!cf.await(conf.connectionTimeoutMs())) {
          throw new IOException(
            String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
        } else if (cf.cause() != null) {
          throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
        }
    
        TransportClient client = clientRef.get();
        Channel channel = channelRef.get();
        assert client != null : "Channel future completed successfully with null client";
    
        // 在将客户端标记为成功之前,同步执行任何客户端引导。
        long preBootstrap = System.nanoTime();
        logger.debug("Connection to {} successful, running bootstraps...", address);
        try {
            // 遍历客户端引导程序集clientBootstraps,执行其初始化的内容
          for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
            clientBootstrap.doBootstrap(client, channel);
          }
        } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
          long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
          logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
          client.close();
          throw Throwables.propagate(e);
        }
        long postBootstrap = System.nanoTime();
    
        logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
          address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
    
        return client;
      }
    

TransportClient

用于向server端发送rpc请求和从server 端获取流的chunk块,旨在高效传输大量数据,这些数据被分成大小从几百 KB 到几 MB 不等的数据块。

典型流程

// 打开远程文件
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
// 获取远程文件的chunk
client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
// 关闭远程文件
client.sendRPC(new CloseStream(100))

用于获取预协商数据流中连续数据块的客户端,处理的是从数据流(即数据平面)中获取数据块的过程,但数据流的实际设置是在传输层范围之外完成的。提供 "sendRPC "方便方法是为了在客户端和服务器之间进行控制平面通信,以执行此设置。使用 TransportClientFactory 构建一个 TransportClient 实例。单个 TransportClient 可用于多个流,但任何给定的流都必须仅限于单个客户端,以避免响应顺序混乱。注意:该类用于向服务器发出请求,而 TransportResponseHandler 则负责处理来自服务器的响应。并发性:线程安全,可由多个线程调用。

TransportServer

服务器,提供高效的底层流媒体服务。

消息的处理

消息处理类MessageHandler处理来自 Netty 的请求或响应信息。一个 MessageHandler 实例只与一个Netty通道相关联(尽管同一通道上可能有多个客户端)。以下是其定义的抽象方法。

  • abstract void handle(T message):对接收的单条信息的处理。
  • abstract void channelActive():当该消息处理程序所在的频道处于活动状态时调用。
  • abstract void exceptionCaught(Throwable cause):当通道上出现异常时调用。
  • abstract void channelInactive():当此 MessageHandler 所处的通道处于非活动状态时调用。

MessageHandler有两个继承类TransportRequestHandlerTransportResponseHandler分别用来进行Server端处理Client的请求信息和Client端处理Server的响应信息。

TransportRequestHandlerhandle(RequestMessage request)方法

  public void handle(RequestMessage request) {
    if (request instanceof RpcRequest) {
     // 处理RPC请求,依赖RpcHandler的receive()方法
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
     // 处理无需回复的RPC请求,依赖RpcHandler的receive()方法
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
     // 处理流请求,依赖StreamManager的openStream()方法获取流数据并封装成ManagedBuffer
      processStreamRequest((StreamRequest) request);
    } else {
     // 未知请求抛异常
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

TransportResponseHandlerhandle(ResponseMessage message)方法

在client端发送消息时,根据发送消息的类型调用TransportResponseHandler中的方法注册回调函数,回调函数和请求信息放入相应的缓存中。

TransportResponseHandler收到server端的响应消息时,再调用主要的工作方法handle(),根据响应消息类型从对应缓存中取出回调函数并调用

  @Override
  public void handle(ResponseMessage message) throws Exception {
    if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel));
        resp.body().release();
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
      }
    } else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(new RuntimeException(resp.errorString));
      }
    } else if (message instanceof StreamResponse) {
      StreamResponse resp = (StreamResponse) message;
      Pair<String, StreamCallback> entry = streamCallbacks.poll();
      if (entry != null) {
        StreamCallback callback = entry.getValue();
        if (resp.byteCount > 0) {
          StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
            callback);
          try {
            TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
              channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
            frameDecoder.setInterceptor(interceptor);
            streamActive = true;
          } catch (Exception e) {
            logger.error("Error installing stream handler.", e);
            deactivateStream();
          }
        } else {
          try {
            callback.onComplete(resp.streamId);
          } catch (Exception e) {
            logger.warn("Error in stream handler onComplete().", e);
          }
        }
      } else {
        logger.error("Could not find callback for StreamResponse.");
      }
    } else if (message instanceof StreamFailure) {
      StreamFailure resp = (StreamFailure) message;
      Pair<String, StreamCallback> entry = streamCallbacks.poll();
      if (entry != null) {
        StreamCallback callback = entry.getValue();
        try {
          callback.onFailure(resp.streamId, new RuntimeException(resp.error));
        } catch (IOException ioe) {
          logger.warn("Error in stream failure handler.", ioe);
        }
      } else {
        logger.warn("Stream failure with unknown callback: {}", resp.error);
      }
    } else {
      throw new IllegalStateException("Unknown response type: " + message.type());
    }
  }

消息的分类

MessageHandler用来处理的消息都是继承或实现自Message接口的。

根据上面的类图可以看出,主要分类

  • AbstractMessage:抽象类,用于在单独的缓冲区中保存正文。其他消息类基本都继承该类。

  • RequestMessage:定义了从客户端到服务端的消息接口

    • ChunkFetchRequest:请求获取数据流中单个数据块的序列。这将对应一个响应信息(成功或失败)。
    • RpcRequest:由远程服务端 org.apache.spark.network.server.RpcHandler 处理的通用 RPC。这将对应一个响应信息(成功或失败)。
    • OneWayMessage:由远程服务端 org.apache.spark.network.server.RpcHandler 处理。不需要进行回复客户端。
    • StreamRequest:请求从远端流式传输数据。数据流 ID 是一个任意字符串,需要两个端点协商后才能流式传输数据
  • ResponseMessage:定义了从服务端到客户端的消息接口文章来源地址https://www.toymoban.com/news/detail-771913.html

    • AbstractResponseMessage:响应信息的抽象类。
      • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息。
      • RpcResponse:处理RpcRequest成功后返回的消息。
      • StreamResponse:处理StreamRequest成功后返回的消息。
    • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息。
    • RpcFailure:处理RpcRequest失败后返回的消息。
    • StreamFailure:处理StreamRequest失败后返回的消息。

client端请求和响应的流程

server端处理请求和响应的流程

到了这里,关于【Spark源码分析】Spark的RPC通信二-初稿的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

    RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。   RpcService主要包含如下两个重要方法。 startServer():用于启动

    2024年02月22日
    浏览(36)
  • 远程通信-RPC

            在分布式微服务架构中,远程通信是最基本的需求。 常见的远程通信方式,有基于 REST 架构的 HTTP协议、RPC 框架。         下面,从三个维度了解一下 RPC。 1、什么是远程调用 2、什么是 RPC 3、RPC 的运用场景和优         RPC 的概念与技术其实是比较早的,

    2024年02月13日
    浏览(37)
  • rpc通信原理浅析

    rpc(remote procedure call),即远程过程调用,广泛用于分布式或是异构环境下的通信,数据格式一般采取protobuf。 protobuf(protocol buffer)是google 的一种数据交换的格式,它独立于平台语言。 google 提供了protobuf多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言

    2024年02月15日
    浏览(31)
  • RPC分布式通信框架

    在实际开发中单机服务器存在诸多问题: 1.受限于硬件资源无法提高并发量 2.任意模块的修改都将导致整个项目代码重新编译部署 3.在系统中,有些模块属于CPU密集型,有些属于I/O密集型,各模块对于硬件资源的需求不一样 什么是分布式?分布式是否可以解决以上问题? 分

    2024年04月28日
    浏览(41)
  • 自定义Dubbo RPC通信协议

    Dubbo 协议层的核心SPI接口是 org.apache.dubbo.rpc.Protocol ,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

    2024年01月19日
    浏览(52)
  • 微服务通信[HTTP|RPC同步通信、MQ异步通信]

    A服务调用B服务,B服务调C服务,C服务调D服务,即微服务之间的通信(也可以叫微服务之间的调用) 一种轻量级的通信协议,常用于在不同的微服务之间进行通信,也是最简单的通信方式 使用REST ful为开发规范,将服务对外暴露的HTTP调用方式为REST API(如GET、POST、PUT、DELETE等),已经成

    2024年02月09日
    浏览(37)
  • Spark Kubernetes 的源码分析系列 - scheduler

    这一块代码可以理解为 Spark 是如何实现一个基于 K8S 的调度器,来调度生成 Executor Pod 的。 由于上篇文章主要介绍了 Driver 的 Pod 是如何生成的,在讲 scheduler 之前,先补充一下 Executor 的配置步骤。重点代码在下面这个  features  里。步骤跟 Driver 类似,但是少了一些,剩下的

    2024年04月27日
    浏览(27)
  • Spark Kubernetes 的源码分析系列 - features

    features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。 看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。 还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到

    2024年04月10日
    浏览(35)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(32)
  • 聊聊分布式架构04——RPC通信原理

    目录 RPC通信的基本原理 RPC结构 手撸简陋版RPC 知识点梳理 1.Socket套接字通信机制 2.通信过程的序列化与反序列化 3.动态代理 4.反射 思维流程梳理 码起来 服务端时序图 服务端—Api与Provider模块 客户端时序图 RPC通信的基本原理 RPC(Remote Procedure Call)是一种远程过程调用协议,

    2024年02月07日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包