go-zero的服务发现源码阅读

这篇具有很好参考价值的文章主要介绍了go-zero的服务发现源码阅读。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

服务发现原理与grpc源码解析_wangxiaoangg的博客-CSDN博客

 

go-zero rpc demo官方文档:rpc编写与调用 | go-zero

目录

一 服务注册

1. 创建rpc服务

2. 启动rpc服务

3. registerEtcd做了什么

4. discov.NewPublisher 服务发布者

二 服务发现

1.定义&注册resolver

2.解析etcd地址&创建链接

3.update方法


一 服务注册

在看rpc服务端服务注册前,可以先看下go-zero的官方的 user rpc服务 demo。

在rpc的配置文件中配置了Etcd信息,以及服务对应的key,如下:user.yaml

Name: user.rpc
ListenOn: 127.0.0.1:8080
Etcd:
  Hosts:
    - $etcdHost
  Key: user.rpc

1. 创建rpc服务


创建rpc服务调用了 zrpc/internal/rpcpubserver.go 中 NewRpcPubServer方法。

该方法返回一个server对象,并将registerEtcd方法注入到该sever。

// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,
	opts ...ServerOption) (Server, error) {
	registerEtcd := func() error {
		pubListenOn := figureOutListenOn(listenOn)
		var pubOpts []discov.PubOption
		if etcd.HasAccount() {
			pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
		}
		if etcd.HasTLS() {
			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
				etcd.CACertFile, etcd.InsecureSkipVerify))
		}
		if etcd.HasID() {
			pubOpts = append(pubOpts, discov.WithId(etcd.ID))
		}
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
		return pubClient.KeepAlive()
	}
	server := keepAliveServer{
		registerEtcd: registerEtcd,
		Server:       NewRpcServer(listenOn, middlewares, opts...),
	}

	return server, nil
}

2. 启动rpc服务

在启动Server的时候,调用Start方法,在Start方法中会调用registerEtcd进行真正的服务注册。
go-zerozrpc/internal/rpcpubserver.go


type keepAliveServer struct {
	registerEtcd func() error
	Server
}

func (s keepAliveServer) Start(fn RegisterFn) error {
	if err := s.registerEtcd(); err != nil {
		return err
	}

	return s.Server.Start(fn)
}

3. registerEtcd做了什么

	registerEtcd := func() error {
        //解析服务监听的地址
		pubListenOn := figureOutListenOn(listenOn)
		var pubOpts []discov.PubOption
		//etcd的链接方式
        if etcd.HasAccount() {
			pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
		}
		if etcd.HasTLS() {
			pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
				etcd.CACertFile, etcd.InsecureSkipVerify))
		}
		if etcd.HasID() {
			pubOpts = append(pubOpts, discov.WithId(etcd.ID))
		}

        //新建puslisher
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
	    //异步etcd 保活
    	return pubClient.KeepAlive()
	}

4. discov.NewPublisher 服务发布者

代码路径core/discov/publisher.go

在KeepAlive方法中,
1.首先创建etcd连接,
2.用register方法进行服务注册。
3.register创建租约,租约默认时间为10秒钟
4.最后通过Put方法进行注册。
5.调用 keepAliveAsync 进行租约的续期,保证服务一直是存活的状态,如果服务异常退出了,那么也就无法进行续期,服务发现也就能自动识别到该服务异常下线了。

// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
	cli, err := p.doRegister()
	if err != nil {
		return err
	}

	proc.AddWrapUpListener(func() {
		p.Stop()
	})

	return p.keepAliveAsync(cli)
}


func (p *Publisher) doRegister() (internal.EtcdClient, error) {
    //链接etcd
	cli, err := internal.GetRegistry().GetConn(p.endpoints)
	if err != nil {
		return nil, err
	}

	p.lease, err = p.register(cli)
	return cli, err
}

func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
    //创建租约

	resp, err := client.Grant(client.Ctx(), TimeToLive)
	if err != nil {
		return clientv3.NoLease, err
	}

	lease := resp.ID
	if p.id > 0 {
		p.fullKey = makeEtcdKey(p.key, p.id)
	} else {
		p.fullKey = makeEtcdKey(p.key, int64(lease))
	}
    //put key 注册
	_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

	return lease, err
}


//异步续租 保活
func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
	ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
	if err != nil {
		return err
	}

	threading.GoSafe(func() {
		for {
			select {
			case _, ok := <-ch:
				if !ok {
					p.revoke(cli)
					if err := p.doKeepAlive(); err != nil {
						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
					}
					return
				}
			case <-p.pauseChan:
				logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
				p.revoke(cli)
				select {
				case <-p.resumeChan:
					if err := p.doKeepAlive(); err != nil {
						logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
					}
					return
				case <-p.quit.Done():
					return
				}
			case <-p.quit.Done():
				p.revoke(cli)
				return
			}
		}
	})

	return nil
}

二 服务发现

前面的已经介绍了,rpc服务启动时候是如何将服务注册到etcd中的。

在rpc的服务调用方 配置服务提供方的Etcd信息,以及服务对应的key,如下:user.yaml

Name: search-api
Host: 0.0.0.0
Port: 8889
Auth:
  AccessSecret: $AccessSecret
  AccessExpire: $AccessExpire
UserRpc:
  Etcd:
    Hosts:
      - $etcdHost
    Key: user.rpc

1.定义&注册resolver

go-zero的服务发现是在客户端实现的。在创建zRPC客户端的时候,通过init方法进行了自定义Resolver的注册。

go-zero/zrpc/internal/client.go

func init() {
	resolver.Register()
}

 zrpc/resolver/internal/resolver.go

// RegisterResolver registers the direct and discov schemes to the resolver.
func RegisterResolver() {
	resolver.Register(&directResolverBuilder)
	resolver.Register(&discovResolverBuilder)
	resolver.Register(&etcdResolverBuilder)
	resolver.Register(&k8sResolverBuilder)
}

gozero注册了四个revlover builder 这里我们只看etcd reslover。

2.解析etcd地址&创建链接

首先从target中解析出etcd的地址,和服务对应的key。然后创建etcd连接,接着执行update方法,在update方法中,通过调用cc.UpdateState方法进行服务状态的更新。 

zrpc/resolver/internal/discovbuilder.go

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
		return r == EndpointSepChar
	})
	sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
	if err != nil {
		return nil, err
	}

	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			logx.Error(err)
		}
	}
	sub.AddListener(update)
	update()

	return &nopResolver{cc: cc}, nil
}

3.update方法

update方法会被添加到事件监听中,当有PUT和DELETE事件触发,都会调用update方法进行服务状态的更新,事件监听是通过etcd的Watch机制实现,代码如下:

func (c *cluster) watchStream(cli EtcdClient, key string) bool {
    rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
    for {
        select {
        case wresp, ok := <-rch:
            if !ok {
                logx.Error("etcd monitor chan has been closed")
                return false
            }
            if wresp.Canceled {
                logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
                return false
            }
            if wresp.Err() != nil {
                logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
                return false
            }

            c.handleWatchEvents(key, wresp.Events)
        case <-c.done:
            return true
        }
    }
}

go-zero的服务发现源码阅读文章来源地址https://www.toymoban.com/news/detail-459005.html

到了这里,关于go-zero的服务发现源码阅读的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • go-zero学习 第三章 微服务

    1.1 API服务模块 goctl 使用 api 文件生成 api服务 命令: 1.2 RPC服务模块 goctl 使用 protoc 文件生成 rpc服务 命令: 注意: --go_out 、 --go-grpc_out 、 --zrpc_out 三者配置的路径需要完全一致,否则会报下列错误。 基础代码:已生成基本的API服务、RPC服务。 这里以API服务调用RPC服务的登

    2024年02月16日
    浏览(72)
  • go-zero的rpc服务案例解析

    go-zero的远程调用服务是基于gRpc的gRPC教程与应用。 zero使用使用gRpc需要安装 protoc 插件,因为gRpc基于protoc插件使用protocol buffers文件生成rpc服务器和api的代码的。 gRPC 的代码生成还依赖 protoc-gen-go,protoc-gen-go-grpc 插件来配合生成 Go 语言的 gRPC 代码。 也可以使用go get命令安装

    2024年02月13日
    浏览(76)
  • 【go-zero】go-zero阿里云oss 前端上传文件到go-zero API服务 并在k8s pod中创建文件 并推送到阿里云oss 最佳实践

    问题:在本地通过上传文件,然后将文件推送到aliyun的oss中,是没问题的 但是部署到了k8s中,则出现了问题,一直报错没有创建的权限 思路:开始认为应该将该文件挂载到configmap中,然后通过这种方式修改了deployment和dockerfile。最终发现应该是go的创建文件路径方式搞错了,

    2024年02月13日
    浏览(48)
  • go-zero数据库连接池 database/sql 源码学习

    database/sql 中接口的层级关系 https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-database-sql/ database/sql源码地址 : https://github.com/golang/go/tree/release-branch.go1.17/src/database/sql go-zero数据库连接池源码地址 https://github.com/zeromicro/go-zero/blob/master/core/stores/sqlx/sqlmanager.go 滑动验证页面 go

    2024年02月06日
    浏览(46)
  • Go-Zero微服务快速入门和最佳实践(一)

    并发编程和分布式微服务 是我们Gopher升职加薪的关键。 毕竟Go基础很容易搞定,不管你是否有编程经验,都可以比较快速的入门Go语言进行简单项目的开发。 虽说好上手,但是想和别人拉开差距,提高自己的竞争力, 搞懂分布式微服务和并发编程还是灰常重要的,这也是我

    2024年04月28日
    浏览(41)
  • 微服务框架 go-zero logx 日志组件剖析

    上一篇我们说到咱们还剩下 addTenant 功能还未实现,不知道有没有兄弟感兴趣去实验一波的,本篇文章进行简要补充 根据上一篇文章分析,其实我们只需要执行如下几步即可: 编写 tenant.api,提供外部 addTenant 的 http 接口 编写 tenant.api 提供一个 POST http 的接口 / api /tenant/addt

    2024年02月11日
    浏览(50)
  • 微服务架构|go-zero 的自适应熔断器

    原文链接: go-zero 的自适应熔断器 上篇文章我们介绍了微服务的限流,详细分析了计数器限流和令牌桶限流算法,这篇文章来说说熔断。 熔断和限流还不太一样,限流是控制请求速率,只要还能承受,那么都会处理,但熔断不是。 在一条调用链上,如果发现某个服务异常,

    2024年02月10日
    浏览(52)
  • go-zero/grpc的rpc服务间传递额外数据

    go-zero/grpc的rpc服务间传递额外数据 2024/02/18 客户端: 初始化 md 也可如下方式: 追加新的如下: 也可使用 md 的 Set 和 Append 方法追加: 服务端: 注意 key 都会被转为小写,即使客户端为大写: 而且 key 只能由 数字、字母和三个特殊字符“-_.”组成,大写字母会自动被转为小写

    2024年02月19日
    浏览(62)
  • GoZero微服务个人探究之路(二)Go-Zero官方api demo示例探究

    api demo 代码生成 | go-zero Documentation 编辑 demo-api.yaml 编辑 服务名称:demo-api HOST地址:0.0.0.0监听所有可用网络接口 Port地址:服务运行在8888端口 config/config.go 编辑 存储config信息,这里rest.RestConf是RESTful API的结构体,此外还可以添加数据库,缓存配置信息 handler/demohandler.go 编辑

    2024年01月18日
    浏览(48)
  • GoZero微服务个人探索之路(三)Go-Zero官方rpc demo示例探究

    两个文件均为protoc-gen-go-grpc自动生成 构成一个完整的 gRPC 服务的定义和实现 demo.go goctl生成的客户端代码 Request 和 Response 别名: 定义了 Request 和 Response 两个别名,实际上是从 demo 包中导入的对应的消息类型。 Demo 接口: 定义了一个 Demo 接口,其中包含了调用 gRPC 服务中 P

    2024年01月18日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包