RPC教程 4.超时处理机制

这篇具有很好参考价值的文章主要介绍了RPC教程 4.超时处理机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

0.前言

对比原教程,这里使用context来处理子协程的泄露问题。

1.为什么需要超时处理机制

超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。

纵观整个远程调用的过程,需要客户端处理超时的地方有:

与服务端建立连接,导致的超时
发送请求到服务端,写报文导致的超时
等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:

读取客户端请求报文时,读报文导致的超时
发送响应报文时,写报文导致的超时
调用映射服务的方法时,处理报文导致的超时

其RPC 在 3 个地方添加超时处理机制。分别是:

  1. 客户端创建连接时
  2. 客户端 Client.Call() 整个过程导致的超时(不仅包含发送报文,还包括等待处理,接收报文所有阶段
  3. 服务端处理报文,即 Server.handleRequest 超时。

 2.客户端创建连接超时

为了实现简单,把一些超时时间设定放在Option结构体中。有两个超时时间,连接超时ConnectTimeout,服务端处理超时HandleTimeout。

type Option struct {
	MagicNumber    int            // MagicNumber marks this's a geerpc request
	CodecType      codec.CodeType // client may choose different Codec to encode body
	ConnectTimeout time.Duration    //0 表示没有限制
	HandleTimeout  time.Duration
}

var DefaultOption = &Option{
	MagicNumber:    MagicNumber,
	CodecType:      codec.GobType,
	ConnectTimeout: time.Second * 10,    //默认连接超时是10s
}

客户端连接时候是使用Dail方法,那我们就为 Dial 添加一层超时处理的外壳即可。

重点在dialTimeout函数。

  1. 将 net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
  2. 开启子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。
  3. 这里使用了context来通知子协程进行退出。要是没有这个通知的话,假如NewClient要处理很久,而这时case<-time.After(opt.ConnectTimeout)已经到时间了,那执行dialTimeout的协程也就退出了,那子协程中的通道ch就会一直被阻塞(因为没有接收方),那该子协程就不能退出,会泄露。用contex的cancel()就可以通知子协程退出。

 需要讲下newClientFunc,为什么需要这个类型呢,大家应该明白就是直接是用NewClient函数就行的,为什么还要多此一举,要从函数参数中把该函数入参呢,为什么不直接在代码里把f(conn,opt)就写成NewClient(conn,opt)呢。

这是为了后面方便的,下一节会支持HTTP协议的,那就会有HTTP连接,而现在的是TCP连接,而HTTP连接对比TCP连接还有些操作的。这样我们把这个做成参数可以入参,这样我们就可以复用dialTimeout,我们把建立HTTP连接的函数传递给dialTimeout,不再需要为HTTP连接的再重新写dialTimeout。而这也方便了后面对该函数的测试。

type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)

type clientResult struct {
	client *Client
	err    error
}

func dialTimeout(f newClientFunc, network, address string, opt *Option) (client *Client, err error) {
	//超时连接检测
	conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
	if err != nil {
		return nil, err
	}

	//设置超时时间的情况
	ch := make(chan clientResult)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func(ctx context.Context) {
		client, err = f(conn, opt) //在这一节,f(conn, opt)就是NewClient(conn, opt)

		select {
		case <-ctx.Done():
			return
		default:
			ch <- clientResult{client: client, err: err}
		}
	}(ctx)

	if opt.ConnectTimeout == 0 {
		result := <-ch
		return result.client, result.err
	}

	select {
	case <-time.After(opt.ConnectTimeout):
		cancel() //超时通知子协程结束退出
		return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
	case result := <-ch:
		return result.client, result.err
	}
}

func Dail(network, address string, opts ...*Option) (client *Client, err error) {
	opt, err := parseOptions(opts...)
	if err != nil {
		return nil, err
	}

	return dialTimeout(NewClient, network, address, opt)
}

3.Client.Call 超时

Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。

这里的超时处理,不止是客户端发送给服务端所需的时间,还包括了客户端等待服务端发送回复的这段时间。即是这个超时时间是要等接收完服务端的回复信息

代码case call := <-call.Done表示要等待收到服务端的回复信息,要是这时候先执行case <-ctx.Done(),那就表示超时了。

所以后面的测试Client.Call 超时要留意其超时。

func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error {
	call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
	select {
	case <-ctx.Done():
		client.removeCall(call.Seq)
		return errors.New("rpc client: call failed: " + ctx.Err().Error())
	case call := <-call.Done:
		return call.Error
	}
	//之前的写法
	// 	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	// 	return call.Error
}

用户可以使用 context.WithTimeout 创建具备超时检测能力的 context 对象来控制。

	var reply int
	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
	client.Call(ctx, "My.Sum", args, &reply);

4.服务端处理超时

和客户端连接超时处理相似,也是使用context来控制子协程的退出。

开启一个新协程去执行call方法。通道called用来表示消息处理发送是否完毕。超时没有限制时,主协程就会阻塞在timeout==0的<-called中,等到发送完毕后,通道called有数据了,子协程结束,主协程也解除阻塞,退出。

超时有限制情况,假如超时了,就会执行在 case <-time.After(timeout)处调用cancel(),那子协程中的case <-ctx.Done()就会处理,发送超时处理的信息给客户端并退出子协程。

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
	defer wg.Done()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	called := make(chan struct{})
	go func(ctx context.Context) {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		select {
		case <-ctx.Done():
			req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
			server.sendResponse(cc, req.h, invalidRequest, sending)
		default:
			if err != nil {
				fmt.Println("call err:", err)
				req.h.Error = err.Error()
				server.sendResponse(cc, req.h, invalidRequest, sending)
			} else {
				server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
			}
			called <- struct{}{}
		}
	}(ctx)
	if timeout == 0 {
		<-called
		return
	}
	select {
	case <-time.After(timeout):
		cancel()
	case <-called:
		return
	}
}

上一节的handleRequest中是没有timeout这个参数的,所以在使用该方法时候,需要加上timeout。需要修改下(Server).Serveconn方法。

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
    //其余的没有改变
	server.servCode(f(conn), &opt)  //之前是server.servCode(f(conn))
}
//使用handleRequest的地方
func (server *Server) servCode(cc codec.Codec, opt *Option) {
    //...................
	for {
		go server.handleRequest(cc, req, sending, &wg, opt.HandleTimeout)
	}
}

5.测试

第一个测试用例,用于测试连接超时。NewClient 函数耗时 3s,ConnectionTimeout 分别设置为 1s 和 0 两种场景。

这里newClientFunc类型就派上用场了,这里就可以设置该函数耗时,方便测试。

func TestClient_dialTimeout(t *testing.T) {
	t.Parallel() //表示该测试将与(并且仅与)其他并行测试并行运行。

	l, _ := net.Listen("tcp", "localhost:10000")

	f := func(conn net.Conn, opt *Option) (*Client, error) {
		conn.Close()
		time.Sleep(time.Second * 2)
		return nil, nil
	}
    //命令行执行 go test -run TestClient_dialTimeout/timeout 测试
	t.Run("timeout", func(t *testing.T) {
		_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})
		_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")
	})
//命令行执行 go test -run TestClient_dialTimeout/0 测试
	t.Run("0", func(t *testing.T) {
		_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})
		_assert(err == nil, "0 means no limit")
	})
}

func _assert(condition bool, msg string, v ...interface{}) {
	if !condition {
		panic(fmt.Sprintf("assertion failed: "+msg, v...))
	}
}

第二个测试用例,用于测试处理超时。Bar.Timeout 耗时 2s。

场景一:客户端设置超时时间为 1s,服务端无限制(这个就是Client.Call超时的情况,需要注意)

场景二:服务端设置超时时间为1s,客户端无限制。

type Bar int

func (b *Bar) Timeout(argv int, reply *int) error {
	time.Sleep(time.Second * 3)    // 模拟3s的工作
	return nil
}

func startServer(addr chan string) {
	var b Bar
	_ = Register(&b)
	l, _ := net.Listen("tcp", "localhost:10000")
	addr <- l.Addr().String()
	Accept(l)
}

func TestClient_Call(t *testing.T) {
	t.Parallel()
	addrCh := make(chan string)
	go startServer(addrCh)
	addr := <-addrCh

	time.Sleep(time.Second)

	t.Run("client_timeout", func(t *testing.T) {
		client, _ := Dail("tcp", addr)
		ctx, _ := context.WithTimeout(context.Background(), time.Second*1)
		var reply int
		err := client.Call(ctx, "Bar.Timeout", 1, &reply)
		_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")
	})

	t.Run("server_hander_timeout", func(t *testing.T) {
		client, _ := Dail("tcp", addr, &Option{
			HandleTimeout: time.Second,
		})
		var reply int
		err := client.Call(context.Background(), "Bar.Timeout", 1, &reply)
		_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")
	})
}

完整代码:https://githubfast.com/liwook/Go-projects/tree/main/geerpc/4-timeout文章来源地址https://www.toymoban.com/news/detail-819678.html

到了这里,关于RPC教程 4.超时处理机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink集群RPC通讯机制(二)】创建AkkaRpcService、启动RPC服务、实现相互通信

    RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。   RpcService主要包含如下两个重要方法。 startServer():用于启动

    2024年02月22日
    浏览(38)
  • Go七天实现RPC

    本文是学习自7天用Go从零实现RPC框架GeeRPC | 极客兔兔 在此基础上,加入自己的学习过程与理解。 自己实现过程的完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,允许调用不同进程空间的程序。RPC 的客户端和服

    2024年01月19日
    浏览(39)
  • golang实现rpc方法二:使用jsonrpc库【跨平台】

    首先在golang实现rpc方法一net/rpc库中实现了RPC方法,但是那个方法不是跨平台的,没法在其他语言中调用这个实现的RPC方法,接下来我们可以通过jsonroc库实现跨语言的RPC方法。俩种实现方式的代码其实也是差不多的,大差不差,只是调用的库不同。 serverrpc.go实现代码如下 cl

    2024年01月17日
    浏览(61)
  • Go微服务: 基于net/rpc/jsonrpc模块实现微服务跨语言调用

    概述 Golang 提供 net/rpc/jsonrpc 库来实现rpc方法 采用 json 方式进行数据编解码,支持跨语言调用 这里实现跨语言示例 1 ) go 服务端 2 ) nodejs 客户端1 3 ) nodejs 客户端2 4 ) go 客户端3 总结 这里演示了,基于go语言为服务端,nodejs 和 golang 为客户端的3种示范 注意,上面 nodejs版本

    2024年03月17日
    浏览(43)
  • Go语言网络编程入门:TCP、HTTP、JSON序列化、Gin、WebSocket、RPC、gRPC示例

    在本文中,我们将介绍Go语言中的网络编程的不同方式,包括TCP、HTTP、Gin框架、WebSocket、RPC、gRPC的介绍与连接实例,并对所有示例代码都给出了详细的注释,最后对每种模式进行了总结。 TCP(传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议,提供

    2024年02月16日
    浏览(59)
  • RPC原理与Go RPC详解

    RPC(Remote Procedure Call),即远程过程调用。它允许像调用本地服务一样调用远程服务。 RPC是一种服务器-客户端(Client/Server)模式,经典实现是一个通过发送请求-接受回应进行信息交互的系统。 首先与RPC(远程过程调用)相对应的是本地调用。 本地调用 将上述程序编译成二

    2024年02月14日
    浏览(33)
  • 实现跨语言通信的便捷之道:RPC在Java和Go中的使用案例

    导语: 在现代软件开发中,构建分布式系统和跨语言通信变得越来越常见。RPC(远程过程调用)作为一种通信协议,提供了一种简单而高效的方式来实现不同语言之间的通信。本文将探讨RPC的使用案例,并展示如何在Java和Go之间实现跨语言通信。 什么是RPC? RPC是一种允许不

    2024年01月19日
    浏览(44)
  • 深度思考rpc框架面经之五:rpc熔断限流、rpc复用连接机制

    推荐文章:RPC实现原理之核心技术-限流熔断 限流是一种常见的系统保护手段。在分布式系统和微服务架构中, 一个接口的过度使用可能会导致资源的过载,例如CPU、内存、带宽等都可能成为瓶颈。为了避免系统崩溃,确保系统的可用性,并为所有用户提供公平和合理的服务

    2024年02月11日
    浏览(43)
  • C++ 简单实现RPC网络通讯

            RPC是远程调用系统简称,它允许程序调用运行在另一台计算机上的过程,就像调用本地的过程一样。RPC 实现了网络编程的“过程调用”模型,让程序员可以像调用本地函数一样调用远程函数。最近在做的也是远程调用过程,所以通过重新梳理RPC来整理总结一下。  

    2023年04月08日
    浏览(44)
  • 【Go】四、rpc跨语言编程基础与rpc的调用基础原理

    早期 Go 语言不使用 go module 进行包管理,而是使用 go path 进行包管理,这种管理方式十分老旧,两者最显著的区别就是:Go Path 创建之后没有 go.mod 文件被创建出来,而 go module 模式会创建出一个 go.mod 文件用于管理包信息 现在就是:尽量使用 Go Modules 模式 另外,我们在引入包

    2024年02月19日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包