RPC教程 2.支持并发与异步的客户端

这篇具有很好参考价值的文章主要介绍了RPC教程 2.支持并发与异步的客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.客户端的使用例子

func main(){
	//1. 建立连接
	client, err := rpc.Dial("tcp", "localhost:1234")

	//2.调用调用指定的RPC方法
    var reply string //string有默认值
	err = client.Call("HelloService.Hello", "hi", &reply)    //即是一次请求
}

对 net/rpc 而言,一个函数需要能够被远程调用,它必须满足一定的条件,否则其会被忽略。

这些条件是:

  • 方法的类型是可输出的 (the method’s type is exported)
  • 方法本身也是可输出的 (the method is exported)
  • 方法必须由两个参数,必须是输出类型或者是内建类型 (the method has two arguments, both exported or builtin types)
  • 方法的第二个参数必须是指针类型 (the method’s second argument is a pointer)
  • 方法返回类型为 error (the method has return type error)

一个输出方法的格式如下: 

func (t *T) MethodName(argType T1, replyType *T2) error

这个方法的第一个参数代表调用者(client)提供的参数,第二个参数代表要返回给调用者的计算结果。

2.定义一个请求

封装结构体 Call 来承载一次 RPC 调用所需要的信息。

type Call struct {
	ServiceMethod string      // The name of the service and method to call.
	Args          interface{} // The argument to the function (*struct).
	Reply         interface{} // The reply from the function (*struct).
	Error         error       // After completion, the error status.
	Done          chan *Call  // Receives *Call when Go is complete.
	Seq           uint64
}

func (call *Call) done() {
	call.Done <- call
}

请求内容至少需要包括:

  • 请求的服务以及方法名
  • 请求参数和请求的回复
  • 请求出错时返回的错误信息

为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是 chan *Call,当调用结束时,会调用 call.done() 通知调用方。 

3.实现 Client

type Client struct {
	code     codec.Codec
	opt      *Option
	sending  sync.Mutex
	header   codec.Header
	mutex    sync.Mutex    //保护下面的变量
	seq      uint64
	pending  map[uint64]*Call
	closing  bool //user has called Close
	shutdown bool // server has told us to stop
}

var ErrShutdown = errors.New("connection is shut down")

func (client *Client) Close() error {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	if client.closing {
		return ErrShutdown
	}
	client.closing = true
	return client.code.Close()
}

func (client *Client) IsAvailable() bool {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	return !client.closing && !client.shutdown
}
  • code是消息的编解码器,和服务端类似的。
  • sending是互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
  • header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。
  • seq是用于给请求进行编号,从1开始编号自增,每个请求有唯一的编号。
  • pending是存储未完成的请求,键是编号,值是 Call 实例。
  • closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 Close 方法,而 shutdown 置为 true 一般是有错误发生。

 需要存储未完成的请求,可以想象,一个用户发出10个不同的请求,要是客户端不存储这些请求,那收到回复的时候,就难知道如何处理了。

所以在发起请求的时候,需要注册这个请求(往pending中添加),得到回复后需要删除(从pending中delete)

由此,需要实现和Call相关的注册和删除方法。

而terminateCalls方法是服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。该方法需要都获得sending锁和mutex锁。该方法的使用地方后面会讲到的。

func (client *Client) RegisterCall(call *Call) (uint64, error) {
	client.mutex.Lock()
	defer client.mutex.Unlock()

	if client.closing || client.shutdown {
		return 0, ErrShutdown
	}

	call.Seq = client.seq //设置Call的序号
	client.pending[call.Seq] = call
	client.seq++
	return call.Seq, nil
}

func (client *Client) removeCall(seq uint64) *Call {
	client.mutex.Lock()
	defer client.mutex.Unlock()
	call := client.pending[seq]
	delete(client.pending, seq)
	return call
}

func (client *Client) terminateCalls(err error) {
	client.sending.Lock()
	defer client.sending.Unlock()
	client.mutex.Lock()
	defer client.mutex.Unlock()

	client.shutdown = true
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
}

创建客户端

按照前面的例子,创建客户端就

client, err := rpc.Dial("tcp", "localhost:1234")

那我们也按照这样来。

Dail函数通过 ...*Option 将 Option 实现为可选参数(...表示可以0个参数或多个参数),可以不填写opts参数,使用默认的option(即是gob编解码)

//使用例子 client, err := rpc.Dial("tcp", "localhost:1234")
func Dail(network, address string, opts ...*Option) (client *Client, err error) {
	opt, err := parseOptions(opts...)
	if err != nil {
		return nil, err
	}
	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err
	}
	return NewClient(conn, opt)
}

parseOption函数就是解析Option,判断其Option是否符合要求等。

NewClient函数,创建 Client 实例,首先需要完成一开始的协议交换,即发送 Option 信息给服务端,协商好消息的编解码方式。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {
	// send options with server
	if err := json.NewEncoder(conn).Encode(opt); err != nil {
		log.Println("rpc client: options error: ", err)
		conn.Close()
		return nil, err
	}
	f := codec.NewCodeFuncMap[opt.CodecType]
	if f == nil { //没有符合条件的编解码器
		err := fmt.Errorf("invalid codec type %s", opt.CodecType)
		log.Println("rpc client: codec error:", err)
		return nil, err
	}

	return &Client{
		seq:     1,    //序号从1开始,序号0表示可以表示错误
		code:    f(conn),
		opt:     opt,
		pending: make(map[uint64]*Call),
	}, nil
}

func parseOptions(opts ...*Option) (*Option, error) {
	if len(opts) == 0 || opts[0] == nil {
		return DefaultOption, nil
	}
	if len(opts) != 1 {
		return nil, errors.New("number of options is more than 1")
	}
	opt := opts[0]
	opt.MagicNumber = DefaultOption.MagicNumber

	if opt.CodecType == "" {
		opt.CodecType = DefaultOption.CodecType
	}
	if _, ok := codec.NewCodeFuncMap[opt.CodecType]; !ok {
		return nil, fmt.Errorf("invalid codec type %s", opt.CodecType)
	}
	return opt, nil
}

请求和创建客户端完成后,那就是到关键的接收和发送请求了。

实现接收回复和发送请求

那先来看看发送请求。

    var reply string //string有默认值
	err = client.Call("HelloService.Hello", "hi", &reply) 

 先实现个send方法,其参数是*Call。内容是注册该Call,进行编码并发送给服务端。

func (client *Client) send(call *Call) {
    // make sure that the client will send a complete request
	client.sending.Lock()
	defer client.sending.Unlock()

	//注册,添加到pending中
	seq, err := client.RegisterCall(call)
	if err != nil {
		call.Error = err
		call.done()
		return
	}
    
    //复用同一个header
	client.header.ServiceMethod = call.ServiceMethod
	client.header.Seq = seq
	client.header.Error = ""

	// encode and send the request
	if err := client.code.WriteResponse(&client.header, call.Args); err != nil {
		call := client.removeCall(seq)
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

代码中经常出现call.done(),done方法是为了支持异步调用的,当调用结束时,会调用 call.done() 通知调用方。 那就会有个异步调用的Go方法。

异步调用的Go方法中,会先判断chan是否符合条件,之后根据函数参数来创建Call,之后调用send方法。

func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10) //10或1或其他的也可以的,大于0即可
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")
	}

	call := &Call{
		ServiceMethod: serviceMethod,
		Args:          args,
		Reply:         reply,
		Done:          done,
	}
	client.send(call)
	return call
}

func (client *Client) Call(serviceMethod string, args, reply any) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

而Call方法中,其是对 Go 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。

发送解决后,如何进行接收信息呢?

调用Call方法,这是个同步接口,会一直阻塞在call := <-client.Go(...).Done这里,之后当使用call.done()时候,才会解除阻塞。但是按照目前的正常情况,是不会调用call.done()的。这时我们可以新启一个协程去接收信息,处理完信息后就调用call.done()即可。

接收功能,接收到的响应有三种情况:

  • call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
  • call 存在,但服务端处理出错,即 h.Error 不为空。
  • call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.code.ReadHeader(&h); err != nil {
			break
		}

		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			err = client.code.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.code.ReadBody(nil)
			call.done()
		default:
			err = client.code.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}
	client.terminateCalls(err)
}

在recieve中就使用了terminateCalls方法。在读取Header失败break,就执行该方法。

那么这个新的协程在哪里开启好呢?那可以在创建客户端的时候就开启这个协程。

func NewClient(conn net.Conn, opt *Option) (*Client, error) {
    //......
	f := codec.NewCodeFuncMap[opt.CodecType]

    //前面代码没有变化,就下面封装成一个函数,其内部就使用go client.receive()
	return newClientCodec(f(conn), opt), nil
}

func newClientCodec(code codec.Codec, opt *Option) *Client {
	client := &Client{
		seq:     1,
		code:    code,
		opt:     opt,
		pending: make(map[uint64]*Call),
	}
	go client.receive()
	return client
}

这样,接收和发送也都处理好了。至此,一个支持异步和并发的 GeeRPC 客户端已经完成。

4.测试

上一章节只实现了服务端,我们在 main 函数中手动模拟了整个通信过程。因此,这一章节我们就将 main 函数中通信部分替换为今天的客户端。

startServer 没有发生变化。

func main() {
	addr := make(chan string)
	go startServer(addr)

	// in fact, following code is like a simple geerpc client
	client, _ := geerpc.Dail("tcp", <-addr) //上一节是使用net.Dail
	defer client.Close()
	time.Sleep(time.Second * 1)
	num := 3
	var wg sync.WaitGroup
	wg.Add(num)

	for i := 0; i < num; i++ {
		go func(i int) {
			defer wg.Done()
			args := uint64(i)
			var reply string
			if err := client.Call("foo.sum", args, &reply); err != nil {
				log.Fatal("call Foo.Sum error:", err)
			}
			log.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func startServer(addr chan string) {
	l, err := net.Listen("tcp", "localhost:10000")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", l.Addr())
	addr <- l.Addr().String()
	geerpc.Accept(l)
}

完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/2-client文章来源地址https://www.toymoban.com/news/detail-821418.html

到了这里,关于RPC教程 2.支持并发与异步的客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasaticsearch新版java客户端ElasticsearchClient详细教程,支持响应式编程,Lambda表达式,兼容旧版High Level Rest Client

    elasaticsearch新版java客户端详细教程,支持响应式编程,Lambda表达式。兼容旧版High Level Rest Client。网上相关教程少,我在这里出一个。elasaticsearch相关安装这里不介绍了 有几种方式,这里介绍两种,如果不考虑之前旧版High Level Rest Client的客户端采用第一种就行 阻塞和异步客户

    2023年04月15日
    浏览(38)
  • php-golang-rpc jsonrpc和php客户端tivoka/tivoka包实践

    golang 代码: package main import (     \\\"fmt\\\"     \\\"net\\\"     \\\"net/rpc\\\"     \\\"net/rpc/jsonrpc\\\" ) type App struct{} type Res struct {     Code int    `json:\\\"code\\\"`     Msg  string `json:\\\"msg\\\"`     Data any    `json:\\\"data\\\"` } func (*App) Hi(mp map[string]any, res *Res) error {     res.Code = 200     res.Msg = \\\"成功\\\"     var rmp = mak

    2024年02月15日
    浏览(57)
  • C/S客户端核服务端-并发服务器

    1、新建两个程序,分别引用两个函数,先执行server端的程序,再执行client端的程序 2、实现功能:当client和sever连接成功后,从client输入什么都会传输给server端,当输入第一个字母为q时 两端程序都会退出 3、特别注意:需要修改SERVER_HOST 为自己主机地址 4、本程序编写的环境

    2024年02月09日
    浏览(36)
  • Python中websockets服务端从客户端接收消息并发送给多线程

    目录 一、消息队列 二、服务端 三、设备功能 四、主线程 五、客户端 六、更新 思路: 1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息

    2024年04月25日
    浏览(72)
  • C# Socket通信从入门到精通(14)——多个异步UDP客户端C#代码实现

    在之前的文章C# Socket通信从入门到精通(13)——单个异步UDP客户端C#代码实现我介绍了单个异步Udp客户端的c#代码实现,但是有的时候,我们需要连接多个服务器,并且对于每个服务器,我们都有一些比如异步发送、异步接收的操作,那么这时候我们使用之前单个异步Udp客户

    2024年02月03日
    浏览(80)
  • 使用Go语言的HTTP客户端进行负载均衡

    负载均衡是分布式系统中的重要概念,它用于将流量分散到多个服务器或服务上,以实现更好的性能、可靠性和可扩展性。在Go语言中,可以使用HTTP客户端进行负载均衡,确保请求被均匀地分配到多个服务器或服务上。 下面是一个使用Go语言HTTP客户端进行负载均衡的示例:

    2024年01月21日
    浏览(49)
  • client-go源码结构及客户端对象

    G  Goup 资源组,包含一组资源操作的集合 V Version 资源版本,用于区分不同API的稳定程度及兼容性 R Resource 资源信息,用于区分不同的资源API K Kind 资源对象类型,每个资源对象都需要Kind来区分它自身代表的资源类型 (1)通过 GVR 可以构造 REST Api  进行接口调用,而 GVK 可以

    2024年04月26日
    浏览(42)
  • SocketTools 11在所有HTTP客户端组件支持

    在所有HTTP客户端组件中添加了对HTTP/2.0协议的支持。 更新了TLS 1.2(及更高版本)和SSH 2.0的安全选项,以使用Microsoft Windows 11和Windows Server 2022中提供的密码套件。较旧、安全性较低的密码套件已被弃用,在建立连接时将不会使用。回退选项可用于使用TLS 1.0连接到旧版服务器。

    2024年02月05日
    浏览(49)
  • MYSQL连接报错:客户端不支持服务器请求的身份验证协议;考虑升级MYSQL客户端数据库

    在进行MYSQL数据库连接时,有时候可能会遇到如上所述的错误:“客户端不支持服务器请求的身份验证协议;考虑升级MYSQL客户端数据库”。这个错误通常发生在客户端使用的MYSQL版本与服务器所要求的身份验证协议不兼容的情况下。幸运的是,您可以通过升级MYSQL客户端来解

    2024年02月03日
    浏览(56)
  • 【六、http】go的http的客户端重定向

    重定向过程 :客户浏览器发送http请求----》web服务器接受后发送302状态码响应及对应新的location给客户浏览器–》客户浏览器发现是302响应,则自动再发送一个新的http请求,请求url是新的location地址----》服务器根据此请求寻找资源并发送给客户。在这里location可以重定向到任

    2024年02月05日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包