【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

这篇具有很好参考价值的文章主要介绍了【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

零. RpcService服务概述

RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。

 

RpcService主要包含如下两个重要方法。

  1. startServer():用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装,启动完成后,RpcEndpoint中的RpcServer就能够对外提供服务了。
  2. connect():用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法,建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。

例如在JobMaster组件中创建与ResourceManager组件之间的RPC连接时。此时可以通过Akka发送消息到ResourceManager的RpcServer中,这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。

 

1. AkkaRpcService的创建和初始化

在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中,会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService,接着启动RpcServer。

例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。

创建AkkaRpcService主要包括如下步骤。

  1. 在ClusterEntrypoint中创建RpcService。
  2. 启动ActorSystem服务。
  3. 创建RobustActorSystem。RobustActorSystem实际上是对Akka的ActorSystem进行了封装和拓展,相比于原生Akka
    ActorSystem,RobustActorSystem包含了UncaughtExceptionHandler组件,能够对ActorSystem抛出的异常进行处理。
  4. 使用RobustActorSystem创建AkkaRpcService实例。
  5. 将AkkaRpcService返回到ClusterEntrypoint中,用于启动集群中各个RpcEndpoint组件服务

【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信,# flink源码,flink

 

2.通过AkkaRpcService初始化RpcServer

在集群运行时中创建了共用的AkkaRpcService服务,相当于创建了Akka系统中的ActorSystem,接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。(AkkaRpcService服务作为共用的rpc服务,启动其他各个组件的RpcServer实例?)

 
这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中,执行了RpcServer的初始化

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
   this.rpcService = checkNotNull(rpcService, "rpcService");
   this.endpointId = checkNotNull(endpointId, "endpointId");
   // 初始化RpcEndpoint中的RpcServer
   this.rpcServer = rpcService.startServer(this);
   this.mainThreadExecutor = new MainThreadExecutor(rpcServer, 
   this::validateRunsInMainThread);
}

具体看下rpcService.startServer(this) 启动rpcServer的逻辑

  1. ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。
  2. 启动RpcEndpoint对应的RPC服务,首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来,例如JobMaster组件会抽取JobMasterGateway接口定义。
  3. 创建InvocationHandler代理类,根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。
  4. 根据RpcEndpoint是否为FencedRpcEndpoint,InvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。

FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。
AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。

  1. 创建好InvocationHandler代理类后,通过反射的方式(Proxy.newProxyInstance())创建代理类。创建的代理类会被转换为RpcServer实例,再返回给RpcEndpoint使用。

在RpcServer创建的过程中可以看出,实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类,此时实现的组件就能够获取到RpcServer服务,且通过RpcServer代理了所有的RpcGateways接口,提供了本地方法调用和远程方法调用两种模式。

@Override  
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {  
    checkNotNull(rpcEndpoint, "rpc endpoint");  
  
    final SupervisorActor.ActorRegistration actorRegistration =  
            registerAkkaRpcActor(rpcEndpoint);  
    final ActorRef actorRef = actorRegistration.getActorRef();  
    final CompletableFuture<Void> actorTerminationFuture =  
            actorRegistration.getTerminationFuture();  
    //启动RpcEndpoint对应的RPC服务
    LOG.info(  
            "Starting RPC endpoint for {} at {} .",  
            rpcEndpoint.getClass().getName(),  
            actorRef.path());  
  
    final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);  
    final String hostname;  
    Option<String> host = actorRef.path().address().host();  
    if (host.isEmpty()) {  
        hostname = "localhost";  
    } else {  
        hostname = host.get();  
    }  
    //解析EpcEndpoint实现的所有RpcGateway接口
    Set<Class<?>> implementedRpcGateways =  
            new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));  
    //额外添加RpcServer和AkkaBasedEndpoint类
    implementedRpcGateways.add(RpcServer.class);  
    implementedRpcGateways.add(AkkaBasedEndpoint.class);  
  
    final InvocationHandler akkaInvocationHandler;  
    //根据是否是FencedRpcEndpoint创建不同的动态代理对象
    if (rpcEndpoint instanceof FencedRpcEndpoint) {  
        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler  
        akkaInvocationHandler =  
                new FencedAkkaInvocationHandler<>(  
                        akkaAddress,  
                        hostname,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        actorTerminationFuture,  
                        ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,  
                        captureAskCallstacks);  
  
        implementedRpcGateways.add(FencedMainThreadExecutable.class);  
    } else {  
        akkaInvocationHandler =  
                new AkkaInvocationHandler(  
                        akkaAddress,  
                        hostname,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        actorTerminationFuture,  
                        captureAskCallstacks);  
    }  
  
    // Rather than using the System ClassLoader directly, we derive the ClassLoader  
    // from this class . That works better in cases where Flink runs embedded and all Flink    // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader    ClassLoader classLoader = getClass().getClassLoader();  
  
    @SuppressWarnings("unchecked")  
    RpcServer server =  
            (RpcServer)  
                    Proxy.newProxyInstance(  
                            classLoader,  
                            implementedRpcGateways.toArray(  
                                    new Class<?>[implementedRpcGateways.size()]),  
                            akkaInvocationHandler);  
  
    return server;  
}

 

3. ResourceManager中RPC服务的启动

RpcServer在RpcEndpoint的构造器中完成初始化后,接下来就是启动RpcEndpoint和RpcServer,这里以ResourceManager为例进行说明。

在启动集群时,看下如何启动ResourceManager的RPC服务的。如下调用链

ClusterEntrypoint.startCluster->runCluster
->dispatcherResourceManagerComponentFactory.create
->resourceManager.start();
=>
public final void start() {  
    rpcServer.start();  
}

继续探索RPC服务是如何启动的

首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例,此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法,启动ResourceManager所在RpcEndpoint中的RpcServer,如下。

【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信,# flink源码,flink

  1. 调用ResourceManager.start()方法,此时会调用RpcEndpoint.start()父方法,启动ResourceManager组件的RpcServer。
  2. 通过动态代理AkkaInvocationHandler.invoke()方法执行流程,发现调用的是StartStoppable.start()方法,此时会直接调用AkkaInvocationHandler.start()本地方法。
  3. 在AkkaInvocationHandler.start()方法中,实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息,表明当前Actor实例可以正常启动并接收来自远端的RPC请求。
  4. AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。
  5. 将AkkaRpcActor中的状态更新为StartedState,此时ResourceManager的RpcServer启动完成,ResourceManager组件能够接收来自其他组件的RPC请求。

在flink1.12中省略了AkkaInvocationHandler的干预。

经过以上步骤,指定组件的RpcEndpoint节点就正常启动,此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中,处理本地和远程提交的RPC请求

 

4. 实现相互通讯能力

当AkkaRpcService启动RpcEndpoint中的RpcServer后,RpcEndpoint组件仅能对外提供处理RPC请求的能力,RpcEndpoint组件需要在启动后向其他组件注册自己的RpcEndpoint信息,并完成组件之间的RpcConnection注册,才能相互访问和通信。而创建RPC连接需要调用RpcService.connect()方法。

如代码所示,在AkkaRpcService.connect()方法中,完成了RpcConnection对象的创建。

@Override  
public <C extends RpcGateway> CompletableFuture<C> connect(  
        final String address, final Class<C> clazz) {  
  
    return connectInternal(  
            address,  
            clazz,  
            (ActorRef actorRef) -> {  
                Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);  
  
                return new AkkaInvocationHandler(  
                        addressHostname.f0,  
                        addressHostname.f1,  
                        actorRef,  
                        configuration.getTimeout(),  
                        configuration.getMaximumFramesize(),  
                        null,  
                        captureAskCallstacks);  
            });  
}

具体看AkkaRpcService.connectInternal()方法逻辑。

  1. 获取ActorRef引用对象。
  2. 调用Patterns.ask()方法,向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息,确保连接的RpcEndpoint节点正常,如果成功,则RpcEndpoint会返回HandshakeSuccessMessage消息。
  3. 调用invocationHandlerFactory创建invocationHandler动态代理类,此时可以看到传递的接口列表为new Class<?>[]{clazz},也就是当前RpcEndpoint需要访问的RpcGateway接口。例如JobMaster访问ResourceManager时,这里就是ResourceManagerGateway接口。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(  
        final String address,  
        final Class<C> clazz,  
        Function<ActorRef, InvocationHandler> invocationHandlerFactory) {  
    checkState(!stopped, "RpcService is stopped");  
  
    LOG.debug(  
            "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",  
            address,  
            clazz.getName());  
        
    //获取actorRef实例  
    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);  
    //进行handshake操作,确保需要连接的RpcEndpoint节点正常  
    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =  
            actorRefFuture.thenCompose(  
                    (ActorRef actorRef) ->  
                            FutureUtils.toJava( 
                            //调用Patterns.ask()方法,向actorRef对应的
                            //RpcEndpoint节点发送RemoteHandshakeMessage消息,
                            //确保连接的RpcEndpoint节点正常,如果成功,则
                            //RpcEndpoint会返回HandshakeSuccessMessage消息。 
                                    Patterns.ask(  
                                                    actorRef,  
                                                    new RemoteHandshakeMessage(  
                                                            clazz, getVersion()),  
                                                    configuration.getTimeout().toMilliseconds())  
                                            .<HandshakeSuccessMessage>mapTo(  
                                                    ClassTag$.MODULE$  
                                                            .<HandshakeSuccessMessage>apply(  
                                                                    HandshakeSuccessMessage  
                                                                            .class))));  
    //创建RPC动态代理类  
    return actorRefFuture.thenCombineAsync(  
            handshakeFuture,  
            (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {  
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);  
  
                // Rather than using the System ClassLoader directly, we derive the ClassLoader  
                // from this class . That works better in cases where Flink runs embedded and                // all Flink                // code is loaded dynamically (for example from an OSGI bundle) through a custom                // ClassLoader                ClassLoader classLoader = getClass().getClassLoader();  
  
                @SuppressWarnings("unchecked")  
                C proxy =  
                        (C)  
                                Proxy.newProxyInstance(  
                                        classLoader, new Class<?>[] {clazz}, invocationHandler);  
  
                return proxy;  
            },  
            actorSystem.dispatcher());  
}

经过以上步骤,实现了创建RpcEndpoint组件之间的RPC连接,此时集群RPC组件之间可以进行相互访问,例如JobMaster可以向ResourceManager发送Slot资源请求。
RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵文章来源地址https://www.toymoban.com/news/detail-835270.html

到了这里,关于【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++ 简单实现RPC网络通讯

            RPC是远程调用系统简称,它允许程序调用运行在另一台计算机上的过程,就像调用本地的过程一样。RPC 实现了网络编程的“过程调用”模型,让程序员可以像调用本地函数一样调用远程函数。最近在做的也是远程调用过程,所以通过重新梳理RPC来整理总结一下。  

    2023年04月08日
    浏览(43)
  • Flink源码之RPC

    Flink是一个典型的Master/Slave分布式实时处理系统,分布式系统组件之间必然涉及通信,也即RPC,以下图展示Flink组件之间的关系: RPCGateWay 一般RPC框架可根据用户业务类生成客户端和服务器端通信底层代码,此时只需定义业务类接口以及实现接口的业务逻辑,网络通信以及序列

    2024年02月14日
    浏览(33)
  • 深度思考rpc框架面经之五:rpc熔断限流、rpc复用连接机制

    推荐文章:RPC实现原理之核心技术-限流熔断 限流是一种常见的系统保护手段。在分布式系统和微服务架构中, 一个接口的过度使用可能会导致资源的过载,例如CPU、内存、带宽等都可能成为瓶颈。为了避免系统崩溃,确保系统的可用性,并为所有用户提供公平和合理的服务

    2024年02月11日
    浏览(43)
  • RPC教程 4.超时处理机制

    对比原教程,这里使用context来处理子协程的泄露问题。 超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加

    2024年01月24日
    浏览(41)
  • 模仿Activiti工作流自动建表机制,实现Springboot项目启动后自动创建多表关联的数据库与表的方案

    文/朱季谦 熬夜写完,尚有不足,但仍在努力学习与总结中,而您的点赞与关注,是对我最大的鼓励! 在一些本地化项目开发当中,存在这样一种需求,即开发完成的项目,在第一次部署启动时,需能自行构建系统需要的数据库及其对应的数据库表。 若要解决这类需求,其实

    2024年01月24日
    浏览(57)
  • Flink中RPC实现原理简介

    Akka是一套可扩展、弹性和快速的系统,为此Flink基于Akka实现了一套内部的RPC通信框架;为此先对Akka进行了解 Akka Akka是使用Scala语言编写的库,基于Actor模型提供一个用于构建可扩展、弹性、快速响应的系统;并被应用到Flink中,基于Akka实现了集群组件之间的RPC通信框架 Acto

    2024年02月10日
    浏览(35)
  • 快速了解 RPC & Replication 机制(纯问题版)

    1、本次讨论不会明晰底层原理。 2、官方基础知识 : https://docs.unrealengine.com/4.27/zh-CN/InteractiveExperiences/Networking/Actors/ 3、底层原理解析参考文章 : https://zhuanlan.zhihu.com/p/587136954 : https://zhuanlan.zhihu.com/p/590990669 : http://www.aclockworkberry.com/custom-struct-serialization-for-networking-in-unreal-engin

    2024年02月05日
    浏览(25)
  • 集群部署专题之二:超高性能RPC框架Zeroc-ICE集群部署简易教程

    Zeroc ICE在简中互联网的资料十分匮乏,以至于大家线上使用时可能会有所顾虑。其实大家尽可放心,ZerocICE是一款性能和稳定性都非常优秀的RPC组件,这也是我当时选择ZerocICE作为XL-LightHouse的RPC组件的唯一原因。为便于大家快速了解ZerocICE,本文以v3.7版本为例介绍其部署和使

    2024年02月10日
    浏览(37)
  • SignalR WebSocket通讯机制

    1、什么是SignalR ASP.NET SignalR 是一个面向 ASP.NET 开发人员的库,可简化向应用程序添加实时 Web 功能的过程。 实时 Web 功能是让服务器代码在可用时立即将内容推送到连接的客户端,而不是让服务器等待客户端请求新数据。 SignalR使用的三种底层传输技术分别是Web Socket, Server

    2024年02月06日
    浏览(46)
  • 警惕看不见的重试机制:为什么使用RPC必须考虑幂等性

    在RPC场景中因为重试或者没有实现幂等机制而导致的重复数据问题,必须引起大家重视,有可能会造成例如一次购买创建多笔订单,一条通知信息被发送多次等问题,这是技术人员必须面对和解决的问题。 有人可能会说:当调用失败时程序并没有显示重试,为什么还会产生重

    2024年02月06日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包