Rpc服务消费者(Rpc服务调用者)实现思路

这篇具有很好参考价值的文章主要介绍了Rpc服务消费者(Rpc服务调用者)实现思路。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Rpc服务消费者(Rpc服务调用者)实现思路

前面几节说到Rpc消费者主要通过UserServiceRPc_Stub这个protobuf帮我们生成的类来实现,上代码回顾一下

class UserServiceRpc_Stub : public UserServiceRpc {
 public:
  UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
  UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel,
                   ::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership);
  ~UserServiceRpc_Stub();

  inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; }

  // implements UserServiceRpc ------------------------------------------

  void Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::fixbug::LoginRequest* request,
                       ::fixbug::LoginResponse* response,
                       ::google::protobuf::Closure* done);
 private:
  ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
  bool owns_channel_;
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);

//xxx.pb.cc
void UserServiceRpc_Stub::Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                              const ::fixbug::LoginRequest* request,
                              ::fixbug::LoginResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}
};

UserServiceRpc_Stub可以看做是一个给用户提供rpc远程调用的代理类,这里面有rpcclient和rpcserver约定好的远程方法Login,Login方法是调用了一个channel_的callMethod方法,那么联想到其他服务方法应该也是底层调用了这个函数(底层根据method的索引来区分具体的method),那么这个方法具体如何工作的?这个类需要接受一个RpcChannel类作为参数,看看RpcChannel类:

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

很简单就是一个抽象类,有个纯虚函数CallMethod需要rpc_stub来实现。要实现rpcService_Stub,显然先需要具体化一个MyRpcChannel类,这个MyRpcChannel类继承自RpcChannel 类,最后通过多态调用在callMethod方法里面完成rpc远程调用的过程。那么具体怎么进行?

void UserServiceRpc_Stub::Login(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                              const ::fixbug::LoginRequest* request,
                              ::fixbug::LoginResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}

Login方法接收4个参数,第一个controller先不用管,显然第四个参数done也不需要管因为作为rpc消费者不需要在将什么结果反馈给rpc提供者。那么就只需要传递两个参数,request即是rpc_stub封装用户端请求的参数,response即是在远程调用完成之后rpcserver帮rpcclient填写的,然后rpc_stub只需要反序列化出来结果反馈给用户即可,那么基本实现思路就有了:
step1实现MyRpcchannel类,继承自RpcChannel ,并重写其CallMethod方法,CallMethod实现有如下几小步
step1.1:获取具体服务名和方法名以及将用户传入的request序列化,最终封装成一个字符流(header_size + service_name method_name args_size + args_str)

//获取服务名和方法名
    const google::protobuf::ServiceDescriptor* service = method->service();
    std::string service_name = service->name();   //service_name
    std::string method_name = method->name();     //method_name


//获取参数的序列化字符串长度 args_size
    std::string args_str;
    uint32_t args_size = 0;
    if(request->SerializeToString(&args_str))
    {
        args_size = args_str.size();
    }
    else
    {
        std::cout << "request serlize error" << std::endl;
        return;
    }

//定义rpc请求的Header
    RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    std::string rpc_header_str;
    uint32_t header_size = 0;
    if(rpcHeader.SerializeToString(&rpc_header_str))
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        std::cout << "rpcheader_str serlize error" << std::endl;
        return;
    }

    //组织待发送的rpc请求的字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
    send_rpc_str += rpc_header_str; //rpcHeader
    send_rpc_str += args_str; //args

step1.2:连接远程rpcServer服务器(可以是不同机器上的不同进程,也可以是在不同的机器上)
connect to rpcServer
step1.2:将封装的rpc字符流通过socket发送给RpcServer(远程rpc调用请求)
send----->send_rpc_str
step1.3:阻塞等待rpcServer的响应,并将响应饭序列化出来给用户端。
recv------>recv_buf

//反序列化rpc调用的响应数据
    std::string response_str(recv_buf, 0, recv_size);
    if(!response->ParseFromString(response_str))
    {
        std::cout<< "parse error ! response_str:" << response_str << std::endl;
        return;
    }

step2:用户创建UserServiceRpc_Stub类对象,并将MyRpcchannel实例对象传入进行构造。最后通过UserServiceRpc_Stub对象实例调用对应的rpc服务。(实际调用的就是上述实现的CallMethod方法)

	fixbug::UserServiceRpc_Stub stub(new MyRpcChannel());
    
    //rpc方法的请求参数
    fixbug::LoginRequest request;
    request.set_name("zhang san");
    request.set_pwd("123");
    //rpc方法的响应,同步的rpc调用过程
    fixbug::LoginResponse response;
    stub.Login(nullptr, &request, &response, nullptr);   //RpcChannel-->Rpchannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送

step3:处理阻塞的响应消息response做相应的业务处理。

 //一次rpc调用完成,读调用的结果
    if(0 == response.result().errcode())
    {
        std::cout << "rpc login response success: " << response.success() << std::endl;
    }
    else
    {
        std::cout << "rpc login error msg : " << response.result().errmsg() << std::endl;
    }

至此RpcConsumer基本实现。文章来源地址https://www.toymoban.com/news/detail-624287.html

到了这里,关于Rpc服务消费者(Rpc服务调用者)实现思路的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(36)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(27)
  • Spring Cloud Alibaba【OpenFeign实现服务降级、Dubbo实现服务生产者、 Dubbo消费者调用接口 】(三)

    目录 服务调用_OpenFeign实现服务降级 服务调用_Dubbo实现服务生产者 

    2024年02月17日
    浏览(34)
  • [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

    网络层与传输层 内置于操作系统的内核中 ,网络层一般使用 ip 协议,传输层常用协议为 Tcp 协议和 Udp 协议, Tcp 协议和 Udp 协议拥有各自的特点和应用场景: sockaddr_in 结构体用于存储网络通信主机进程的 ip 和端口号等信息 小项目的完整文件的gittee链接 Tcp 服务器架构: 序列反序列

    2024年02月22日
    浏览(27)
  • 三,创建订单微服务消费者 第三章

          因为  支付提供者端的代码如下      

    2024年02月15日
    浏览(29)
  • Eureka(F版本)教程二 服务消费者

    service-ribbon 0.0.1-SNAPSHOT service-ribbon jar Demo project for Spring Boot java.version1.8/java.version spring-cloud.versionGreenwich.SR2/spring-cloud.version org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-netflix-ribbon org.springframewor

    2024年04月12日
    浏览(28)
  • (18)不重启服务动态停止、启动RabbitMQ消费者

            我们在消费RabbitMQ消息的过程中,有时候可能会想先暂停消费一段时间,然后过段时间再启动消费者,这个需求怎么实现呢?我们可以借助RabbitListenerEndpointRegistry这个类来实现,它的全类名是org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry,通过这个类可以

    2024年02月09日
    浏览(27)
  • 服务提供者 Eureka + 服务消费者(Rest + Ribbon)实战

    Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将Netflix的中间层服务连接在一起。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单来说,就是在配置文件中列出Load Balancer(简称LB)后面所有的机器,Ribbon会自动的帮助你基于某

    2024年02月04日
    浏览(29)
  • 实现 Kafka 分区内消费者多线程顺序消费

    生产者在写的时候,可以指定一个 key,被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。 消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是没有错乱的。 但是消费者里可能会有多个线程来并发处理消息,而多个线程并发

    2024年02月07日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包