Go gRPC etcd实现服务注册发现与负载均衡

这篇具有很好参考价值的文章主要介绍了Go gRPC etcd实现服务注册发现与负载均衡。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前置

如果不了解go + grpc 调用方式和实现细节,可以参考上一篇文章 golang grpc配置使用实战教程

涉及技术点

技术点 版本 描述
golang 1.19 基础版本
grpc v1.41.0 gRPC golang包
etcd server 3.5.0 注册中心
etcd client v3.5.8 客户端服务发现和负载均衡

服务注册

  • 服务注册依赖etcd的 key-value操作,key作为gRPC服务唯一标识,value存储元数据信息。
  • 服务自动续期依赖etcd的lease操作,实现自动服务续期

服务发现

使用watch命令监控指定前缀的key,处理新增和删除事件,实现服务上下线

负载均衡

使用etcd客户端实现负载均衡,获取服务器gRPC IP端口列表后,客户端进行负载均衡

二、代码实现

目录结构说明

.
├── client
│ └── client.go
├── etcd
│ └── naming.go
├── go.mod
├── go.sum
├── proto
│ ├── helloworld.pb.go
│ ├── helloworld.proto
│ └── helloworld_grpc.pb.go
└── server
└── server.go

  • client 客户端相关代码实现,根据注册中心服务列表和负载均衡算法,发起grpc调用。
  • etcd 使用etcd client 实现一个简易的命名服务,进行服务注册、发现、下线等操作
  • proto grpc服务接口定义,主要是使用 protoc 命令生成的模板代码。
  • server 服务端代码实现,启动服务并注册到etcd。

先给出 go.mod 依赖

module example.com/ggrpc

go 1.19

require (
	go.etcd.io/etcd/client/v3 v3.5.8
	google.golang.org/grpc v1.41.0 // 这里版本需要和etcd匹配,不然编译失败
	google.golang.org/protobuf v1.26.0
)

require (
	github.com/coreos/go-semver v0.3.0 // indirect
	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang/protobuf v1.5.2 // indirect
	go.etcd.io/etcd/api/v3 v3.5.8 // indirect
	go.etcd.io/etcd/client/pkg/v3 v3.5.8 // indirect
	go.uber.org/atomic v1.7.0 // indirect
	go.uber.org/multierr v1.6.0 // indirect
	go.uber.org/zap v1.17.0 // indirect
	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
	golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect
	golang.org/x/text v0.3.5 // indirect
	google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
)

细节实现

Naming 命名服务

package etcd

import (
	"context"
	"encoding/json"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/naming/endpoints"
	"go.etcd.io/etcd/client/v3/naming/resolver"
	gresolver "google.golang.org/grpc/resolver"
	"log"
)

const (
	nameServicePrefix = "my-grpc"
	// 默认的租赁时间
	defLeaseSecond    = 30
)

// 保存已经注册的服务
var serviceMap map[string]*EndpointUnit

// 服务元数据信息
type Endpoint struct {
	Name    string `json:"name"`
	Addr    string `json:"addr"`
	Port    int
	Version string `json:"version"`
}
// 服务元数据信息
type EndpointUnit struct {
	Name    string `json:"name"`
	LeaseID clientv3.LeaseID
	E       Endpoint
}
// 命名服务结构体
type NamingService struct {
	EtcdUrl string
	Name    string
	MTarget string
	Client  *clientv3.Client
	manager endpoints.Manager
}

// 创建一个新的命名服务
func NewNamingService(protocol, etcdHost string, etcdPort int, serviceName string) (*NamingService, error) {
	url := fmt.Sprintf("%s://%s:%d", protocol, etcdHost, etcdPort)
	client, err := clientv3.NewFromURL(url)
	if err != nil {
		return nil, err
	}
	target := fmt.Sprintf("%s/%s", nameServicePrefix, serviceName)
	// etcd的endpoints管理
	manager, err := endpoints.NewManager(client, target)
	if err != nil {
		return nil, err
	}
	ns := NamingService{
		EtcdUrl: url,
		Name:    serviceName,
		MTarget: target,
		manager: manager,
		Client:  client,
	}
	serviceMap = make(map[string]*EndpointUnit)
	return &ns, nil
}

// 从本地etcd创建
func NewLocalDefNamingService(serviceName string) (*NamingService, error) {
	return NewNamingService("http", "localhost", 2379, serviceName)
}

func (naming *NamingService) GetFullServerName(name string, leaseID clientv3.LeaseID) string {
	return fmt.Sprintf("%s/%s/%d", naming.MTarget, name, leaseID)
}

func (naming *NamingService) GetPathServerName(name string) string {
	return fmt.Sprintf("%s/%s", naming.MTarget, name)
}
// 添加/注册新的服务
func (naming *NamingService) AddEndpoint(e Endpoint) error {
	b, _ := json.Marshal(e)
	ep := endpoints.Endpoint{
		Addr:     fmt.Sprintf("%s:%d", e.Addr, e.Port),
		Metadata: string(b), // 这里有个坑,必须传入字符串,没来得及看原因
	}
	// 在etcd创建一个续期的lease对象
	lease, err := naming.Client.Grant(context.TODO(), defLeaseSecond)
	if err != nil {
		return err
	}
	key := naming.GetFullServerName(e.Name, lease.ID)
	// 向etcd注册一个Endpoint并绑定续期
	err = naming.manager.AddEndpoint(context.TODO(), key, ep, clientv3.WithLease(lease.ID))
	if err != nil {
		return err
	}
	log.Println("AddEndpoint success:", key)
	// 开启自动续期KeepAlive
	ch, err := naming.Client.KeepAlive(context.TODO(), lease.ID)
	if err != nil {
		return err
	}
	// 这个方法会异步打印出每次续期调用的日志
	go func() {
		for {
			ka := <-ch
			log.Println("ttl:", ka.ID, ka.TTL)
		}
	}()
	serviceMap[e.Name] = &EndpointUnit{Name: e.Name, LeaseID: lease.ID, E: e}
	return nil
}

// 移除一个服务
func (naming *NamingService) DelEndpoint(name string) error {
	eu := serviceMap[name]
	if eu == nil {
		return nil
	}
	err := naming.manager.DeleteEndpoint(context.TODO(), naming.GetFullServerName(name, eu.LeaseID))
	if err != nil {
		log.Fatalf("Delete endpoint error %v", err)
		return err
	}
	_, err = naming.Client.Revoke(context.TODO(), eu.LeaseID)
	if err != nil {
		log.Fatalf("Revoke lease error %v", err)
		return err
	}
	delete(serviceMap, name)
	log.Printf("DeleteEndpoint [%s] success\n", name)
	return nil
}
// 移除所有服务
func (naming *NamingService) DelAllEndpoint() {
	for k := range serviceMap {
		err := naming.DelEndpoint(k)
		if err != nil {
			log.Fatalln("Ignore Failure Continue...")
		}
	}
}
// 创建一个etcdResolver用于客户端发现服务
func (naming *NamingService) NewEtcdResolver() (gresolver.Builder, error) {
	etcdResolver, err := resolver.NewBuilder(naming.Client)
	if err != nil {
		log.Fatalf("Etcd resolver error %v", err)
		return nil, err
	}
	return etcdResolver, nil
}

服务端实现

package main

import (
	"context"
	"example.com/ggrpc/etcd"
	"flag"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"

	pb "example.com/ggrpc/proto"
	"google.golang.org/grpc"
)

var (
	port = flag.Int("port", 8000, "The server port")
)

const testServerName = "s1"

// 定义一个server实现UnimplementedGreeterServer
// UnimplementedGreeterServer 是第四步自动生成的,可以打开对应文件查看
type server struct {
	pb.UnimplementedGreeterServer
}

// server 重写SayHello方法,做业务处理
func (s *server) SayHello(c context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
	log.Printf("接收到客户端的消息: %v", req.GetName())
	time.Sleep(time.Second)
	ms := fmt.Sprintf("好的[%d]收到,%s %s", *port, req.GetName(), time.Now())
	log.Printf("回复客户端的消息: %s", ms)
	return &pb.HelloReply{Message: ms}, nil
}

func main() {
	// 解析命令行参数
	flag.Parse()
	// 监听本地tcp端口
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 创建一个grpc Server服务对象,Handler非必传
	// s := grpc.NewServer() // 可以直接创建对象
	s := grpc.NewServer()
	// 注册服务
	pb.RegisterGreeterServer(s, &server{})

	// 注册ETCD
	service, err := etcd.NewLocalDefNamingService("my1")
	if err != nil {
		log.Fatalf("failed to create NamingService: %v", err)
	}
	err = service.AddEndpoint(etcd.Endpoint{
		Addr:    "localhost",
		Name:    testServerName,
		Port:    *port,
		Version: "1.0.0",
	})
	if err != nil {
		log.Fatalf("failed to reg etcd: %v", err)
	}

	// 启动RPC并监听
	log.Printf("server listening at %v", lis.Addr())
	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("failed to serve: %v", err)
			service.DelAllEndpoint()
		}
	}()

	// 等待关闭信号
	quit := make(chan os.Signal)
	// kill (no param) default send syscanll.SIGTERM
	// kill -2 is syscall.SIGINT
	// kill -9 is syscall. SIGKILL but can"t be catch, so don't need add it
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	fmt.Printf("Shutdown Server ... \r\n")
	// 停止grpc服务
	s.GracefulStop()
	// 删除etcd注册信息
	service.DelAllEndpoint()
	fmt.Printf("Graceful Shutdown Server success\r\n")

}

客户端实现

package main

import (
	"context"
	"example.com/ggrpc/etcd"
	pb "example.com/ggrpc/proto"
	"flag"
	"google.golang.org/grpc"
	"log"
	"time"
)

//var serAddr = flag.String("addr", "localhost:8000", "the address to connect to")

const etcdDialPrefix = "etcd://localhost:2379/"

func main() {
	// 解析命令行参数
	flag.Parse()

	service, err := etcd.NewLocalDefNamingService("my1")
	if err != nil {
		log.Fatalf("Create naming service error: %v", err)
	}
	resolver, err := service.NewEtcdResolver()
	if err != nil {
		log.Fatalf("Create etcd resolver error: %v", err)
	}
	// 连接服务端
	conn, err := grpc.Dial(etcdDialPrefix+service.GetPathServerName("s1"), grpc.WithInsecure(), grpc.WithResolvers(resolver),
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
	if err != nil {
		log.Fatalf("Conn server error: %v", err)
	}
	log.Printf("Conn success: %v", conn.GetState())
	// 执行完方法自动关闭资源
	defer func() {
		err := conn.Close()
		if err != nil {
			log.Fatalf("Close conn error: %v", err)
			return
		}
		log.Println("Close conn success")
	}()

	// 创建客户端
	c := pb.NewGreeterClient(conn)

	log.Println("5秒中之后调用SayHello方法")
	time.Sleep(time.Second * 5)
	num := 10
	for i := 0; i < num; i++ {
		// 创建2秒超时ctx
		ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
		// 发起RPC请求
		log.Println("开始调用SayHello方法")
		res, err := c.SayHello(ctx, &pb.HelloRequest{Name: "一号"})
		if err != nil {
			log.Fatalf("请求失败: %v", err)
		}
		log.Printf("请求结果: %s", res.GetMessage())
	}

	// 睡眠一会再结束
	log.Println("3秒后结束,客户端自动断开连接")
	time.Sleep(time.Second * 3)

}

proto 代码

proto 代码可以自己生成,这里只是给出本项目的测试代码

helloworld.pb.go
// 使用的语法协议版本 proto3

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// 	protoc-gen-go v1.28.1
// 	protoc        v3.18.2
// source: helloworld.proto

package proto

import (
	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
	reflect "reflect"
	sync "sync"
)

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

// 定义请求对象
type HelloRequest struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

func (x *HelloRequest) Reset() {
	*x = HelloRequest{}
	if protoimpl.UnsafeEnabled {
		mi := &file_helloworld_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *HelloRequest) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*HelloRequest) ProtoMessage() {}

func (x *HelloRequest) ProtoReflect() protoreflect.Message {
	mi := &file_helloworld_proto_msgTypes[0]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use HelloRequest.ProtoReflect.Descriptor instead.
func (*HelloRequest) Descriptor() ([]byte, []int) {
	return file_helloworld_proto_rawDescGZIP(), []int{0}
}

func (x *HelloRequest) GetName() string {
	if x != nil {
		return x.Name
	}
	return ""
}

// 定义返回对象
type HelloReply struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}

func (x *HelloReply) Reset() {
	*x = HelloReply{}
	if protoimpl.UnsafeEnabled {
		mi := &file_helloworld_proto_msgTypes[1]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *HelloReply) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*HelloReply) ProtoMessage() {}

func (x *HelloReply) ProtoReflect() protoreflect.Message {
	mi := &file_helloworld_proto_msgTypes[1]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use HelloReply.ProtoReflect.Descriptor instead.
func (*HelloReply) Descriptor() ([]byte, []int) {
	return file_helloworld_proto_rawDescGZIP(), []int{1}
}

func (x *HelloReply) GetMessage() string {
	if x != nil {
		return x.Message
	}
	return ""
}

var File_helloworld_proto protoreflect.FileDescriptor

var file_helloworld_proto_rawDesc = []byte{
	0x0a, 0x10, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x2e, 0x70, 0x72, 0x6f,
	0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x22, 0x0a, 0x0c, 0x48, 0x65, 0x6c,
	0x6c, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d,
	0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x26, 0x0a,
	0x0a, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d,
	0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65,
	0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x3f, 0x0a, 0x07, 0x47, 0x72, 0x65, 0x65, 0x74, 0x65, 0x72,
	0x12, 0x34, 0x0a, 0x08, 0x53, 0x61, 0x79, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x13, 0x2e, 0x70,
	0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
	0x74, 0x1a, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52,
	0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x3b, 0x70, 0x72, 0x6f,
	0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
	file_helloworld_proto_rawDescOnce sync.Once
	file_helloworld_proto_rawDescData = file_helloworld_proto_rawDesc
)

func file_helloworld_proto_rawDescGZIP() []byte {
	file_helloworld_proto_rawDescOnce.Do(func() {
		file_helloworld_proto_rawDescData = protoimpl.X.CompressGZIP(file_helloworld_proto_rawDescData)
	})
	return file_helloworld_proto_rawDescData
}

var file_helloworld_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_helloworld_proto_goTypes = []interface{}{
	(*HelloRequest)(nil), // 0: proto.HelloRequest
	(*HelloReply)(nil),   // 1: proto.HelloReply
}
var file_helloworld_proto_depIdxs = []int32{
	0, // 0: proto.Greeter.SayHello:input_type -> proto.HelloRequest
	1, // 1: proto.Greeter.SayHello:output_type -> proto.HelloReply
	1, // [1:2] is the sub-list for method output_type
	0, // [0:1] is the sub-list for method input_type
	0, // [0:0] is the sub-list for extension type_name
	0, // [0:0] is the sub-list for extension extendee
	0, // [0:0] is the sub-list for field type_name
}

func init() { file_helloworld_proto_init() }
func file_helloworld_proto_init() {
	if File_helloworld_proto != nil {
		return
	}
	if !protoimpl.UnsafeEnabled {
		file_helloworld_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
			switch v := v.(*HelloRequest); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
		file_helloworld_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
			switch v := v.(*HelloReply); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
	}
	type x struct{}
	out := protoimpl.TypeBuilder{
		File: protoimpl.DescBuilder{
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
			RawDescriptor: file_helloworld_proto_rawDesc,
			NumEnums:      0,
			NumMessages:   2,
			NumExtensions: 0,
			NumServices:   1,
		},
		GoTypes:           file_helloworld_proto_goTypes,
		DependencyIndexes: file_helloworld_proto_depIdxs,
		MessageInfos:      file_helloworld_proto_msgTypes,
	}.Build()
	File_helloworld_proto = out.File
	file_helloworld_proto_rawDesc = nil
	file_helloworld_proto_goTypes = nil
	file_helloworld_proto_depIdxs = nil
}

helloworld_grpc.pb.go

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc             v3.18.2
// source: helloworld.proto

package proto

import (
	context "context"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

// GreeterClient is the client API for Greeter service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GreeterClient interface {
	// 定义SayHello方法,接受HelloRequest消息, 并返回HelloReply消息
	SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

type greeterClient struct {
	cc grpc.ClientConnInterface
}

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
	return &greeterClient{cc}
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	out := new(HelloReply)
	err := c.cc.Invoke(ctx, "/proto.Greeter/SayHello", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
	// 定义SayHello方法,接受HelloRequest消息, 并返回HelloReply消息
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
	mustEmbedUnimplementedGreeterServer()
}

// UnimplementedGreeterServer must be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}

func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
	return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}

// UnsafeGreeterServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to GreeterServer will
// result in compilation errors.
type UnsafeGreeterServer interface {
	mustEmbedUnimplementedGreeterServer()
}

func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
	s.RegisterService(&Greeter_ServiceDesc, srv)
}

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(GreeterServer).SayHello(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/proto.Greeter/SayHello",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
	}
	return interceptor(ctx, in, info, handler)
}

// Greeter_ServiceDesc is the grpc.ServiceDesc for Greeter service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Greeter_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "proto.Greeter",
	HandlerType: (*GreeterServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "SayHello",
			Handler:    _Greeter_SayHello_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "helloworld.proto",
}

三、测试

启动etcd

Go gRPC etcd实现服务注册发现与负载均衡

启动两个服务端

 go run .\server\server.go
 023/05/13 13:46:20 AddEndpoint success: my-grpc/my1/s1/7587870565126837256
 2023/05/13 13:46:20 server listening at [::]:8000
 2023/05/13 13:46:21 ttl: 7587870565126837256 30
go run .\server\server.go -port 8001
2023/05/13 13:47:03 AddEndpoint success: my-grpc/my1/s1/7587870565126837259
2023/05/13 13:47:03 server listening at [::]:8001
2023/05/13 13:47:03 ttl: 7587870565126837259 30

查看etcd中注册的服务

lizheng@lz-x:/mnt/d/dev/go/workspace/my-grpc$ etcdctl get my-grpc --prefix
my-grpc/my1/s1/7587870565126837256
{"Op":0,"Addr":"localhost:8000","Metadata":"{\"name\":\"s1\",\"addr\":\"localhost\",\"Port\":8000,\"version\":\"1.0.0\"}"}
my-grpc/my1/s1/7587870565126837259
{"Op":0,"Addr":"localhost:8001","Metadata":"{\"name\":\"s1\",\"addr\":\"localhost\",\"Port\":8001,\"version\":\"1.0.0\"}"}

启动客户端进行测试

PS D:\dev\go\workspace\my-grpc> go run .\client\client.go
2023/05/13 13:49:41 Conn success: IDLE
2023/05/13 13:49:41 5秒中之后调用SayHello方法
2023/05/13 13:49:46 开始调用SayHello方法
2023/05/13 13:49:47 请求结果: 好的[8001]收到,一号 2023-05-13 13:49:47.8763026 +0800 CST m=+164.396410901
2023/05/13 13:49:47 开始调用SayHello方法
2023/05/13 13:49:48 请求结果: 好的[8000]收到,一号 2023-05-13 13:49:48.8924889 +0800 CST m=+208.647296501
2023/05/13 13:49:48 开始调用SayHello方法
2023/05/13 13:49:49 请求结果: 好的[8001]收到,一号 2023-05-13 13:49:49.8945126 +0800 CST m=+166.414620901
2023/05/13 13:49:49 开始调用SayHello方法
2023/05/13 13:49:50 请求结果: 好的[8000]收到,一号 2023-05-13 13:49:50.9095842 +0800 CST m=+210.664391801
2023/05/13 13:49:50 开始调用SayHello方法
2023/05/13 13:49:51 请求结果: 好的[8001]收到,一号 2023-05-13 13:49:51.9276213 +0800 CST m=+168.447729601
2023/05/13 13:49:51 开始调用SayHello方法
2023/05/13 13:49:52 请求结果: 好的[8000]收到,一号 2023-05-13 13:49:52.9444446 +0800 CST m=+212.699252201
2023/05/13 13:49:52 开始调用SayHello方法
2023/05/13 13:49:53 请求结果: 好的[8001]收到,一号 2023-05-13 13:49:53.9605415 +0800 CST m=+170.480649801
2023/05/13 13:49:53 开始调用SayHello方法
2023/05/13 13:49:54 请求结果: 好的[8000]收到,一号 2023-05-13 13:49:54.9625634 +0800 CST m=+214.717371001
2023/05/13 13:49:54 开始调用SayHello方法
2023/05/13 13:49:55 请求结果: 好的[8001]收到,一号 2023-05-13 13:49:55.9772376 +0800 CST m=+172.497345901
2023/05/13 13:49:55 开始调用SayHello方法
2023/05/13 13:49:56 请求结果: 好的[8000]收到,一号 2023-05-13 13:49:56.9792537 +0800 CST m=+216.734061301
2023/05/13 13:49:56 3秒后结束,客户端自动断开连接
2023/05/13 13:49:59 Close conn success

可以看出实现了自动负载均衡,如果此时停止一个服务可以自动切换识别

四、结语

项目是demo实例工程,提供实现思路和简单方案,有很多不足之处和潜在问题,多多指出!文章来源地址https://www.toymoban.com/news/detail-447129.html

到了这里,关于Go gRPC etcd实现服务注册发现与负载均衡的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloud学习笔记(上):服务注册与发现:Eureka、Zookeeper、Consul+负载均衡服务调用:Ribbon

    SpringCloud=分布式微服务架构的一站式解决方案,是多种微服务架构落地技术的集合体,俗称微服务全家桶。 springboot版本选择: git源码地址:https://github.com/spring-projects/spring-boot/releases/ SpringBoot2.0新特性:https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.0-Release springcloud版本选

    2024年02月08日
    浏览(43)
  • 【Spring Cloud】深入探索 Nacos 注册中心的原理,服务的注册与发现,服务分层模型,负载均衡策略,微服务的权重设置,环境隔离

    在微服务架构中,服务注册中心是整个体系中的关键组件之一。它负责服务的注册、发现和管理,为微服务之间的通信提供了基础设施。在这方面,Nacos(Namespace Aware Clustered Object Storage)作为一种服务发现和配置管理系统,提供了丰富的功能,旨在简化微服务架构中的服务注

    2024年02月06日
    浏览(168)
  • 【云原生】k8s Service 实现服务发现和负载均衡

    在容器编排系统中,如 Kubernetes,Pod 是最小的部署单元。而一组 Pod 通常对外提供某种服务。在 Kubernetes 中,Service 就是用来对外暴露一组 Pod 的服务的资源对象。Service 可以通过 IP 地址和端口号访问,从而对外提供服务。 Service 是 Kubernetes 中一个非常重要的概念,它可以将一

    2023年04月16日
    浏览(39)
  • golang云原生项目之:etcd服务注册与发现

    1直接调包 kitex-contrib: 上面有实现的案例,直接cv。下面是具体的理解 2 相关概念 EtcdResolver: etcd resolver是一种DNS解析器,用于将域名转换为etcd集群中的具体地址,以便应用程序可以与etcd集群进行通信。etcd是一个分布式键值存储系统,常用于服务发现、配置共享和分布式锁等

    2024年02月11日
    浏览(35)
  • 微服务-服务的注册与发现(Consul、zookeeper、etcd、eureka、Nacos)

    一. 对比常用的注册中心 Consul、zookeeper、etcd、eureka、Nacos Feature Consul Zookeeper Etcd Eureka Nacos 服务健康检查  服务状态,内存,硬盘等  (弱)长连接,keepalive  连接心跳  可配支持 传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、用户自定义)的健康检查 多数据中心  支持  —  

    2023年04月19日
    浏览(49)
  • 【go-zero】docker镜像直接部署go-zero的API与RPC服务 如何实现注册发现?docker network 实现 go-zero 注册发现

    使用docker直接部署go-zero微服务会发现API无法找到RPC服务 用docker直接部署 我们会发现API无法注册发现RPC服务 原因是我们缺少了docker的network网桥 RPC服务运行正常 API服务启动,通过docker logs 查看日志还是未发现RPC API的yaml配置 RPC服务的IP是 127.0.0.1 与对应的端口 下图为改成了定

    2024年02月13日
    浏览(45)
  • springCloudAlibaba组件-Nacos-服务发现与负载均衡(三)

    如果项目使用微服务架构,如果A微服务需要访问B微服务,需要http请求进行调用,当然需要B微服务的地址与端口号,微服务可以向之前提到的服务中心进行获取B服务的ip地址和端口号,这就是服务发现 1.客户端主动获取 客户端: 流程: 1.先是故障转移机制判断是否去本地文

    2024年02月10日
    浏览(41)
  • 传统DNS、负载均衡服务发现框架与专业服务发现框架(Eurek、nacos)分析

    DNS 服务器可以在一定程度上用作服务发现的机制,以下是其冲动服务发现的一些利弊 优势 广泛性 : DNS是互联网的标准协议之一,已经广泛地被支持和使用。因此,使用DNS作为服务发现的机制可以借助现有的网络基础设施,无需引入新的工具。 简单性 : DNS的域名解析机制

    2024年02月12日
    浏览(34)
  • 如何从eureka-server上进行服务发现,负载均衡远程调用服务

    在spring cloud的maven的pom文件中添加eureka-client的依赖坐标 添加运行服务的名称以及eureka-server的地址 需要在RestTemplate这个Bean添加一个@LoadBalanced注解,实现负载均衡策略 在调用远程服务的方法处自动注入RestTemplate,用远程调用的对象服务名代替服务IP地址以及端口号,比如当前

    2024年02月04日
    浏览(50)
  • brpc负载均衡load balance和服务发现name servicing

    1.SharedLoadBalancer(load_balancer.h):包含LoadBalancer指针_lb, AddServersInBatch 2. LoadBalancerWithNaming :继承SharedLoadBalancer和NamingServiceWatcher 2.1Init函数:SharedLoadBalancer::Init, new一个load balance对象(服务发现线程,也是操作actions-owner是NamingServiceThread线程对象-watchers-LoadBalanceWithNamingServi

    2024年01月22日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包