grpc流式使用和注意事项

这篇具有很好参考价值的文章主要介绍了grpc流式使用和注意事项。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

stream server client

流式grpc

  • Server-side streaming RPC:服务器端流式 RPC
  • Client-side streaming RPC:客户端流式 RPC
  • Bidirectional streaming RPC:双向流式 RPC

1、proto

syntax = "proto3";
package stream;

service StreamService {
  rpc Eat(EatRequest) returns (stream EatResponse) {} //服务端流式
  rpc Work(stream EatRequest) returns (EatResponse) {}//客户端流式
  rpc Sleep(stream EatRequest) returns (stream EatResponse) {}//双向流

}

message Item{
  string value = 1;
  string value2 = 2;
}

message EatRequest{
  Item req = 1;
}

message EatResponse{
    Item resp = 1;
}

2、服务端流式

客户端代码

/**
 * @Author: zhangsan
 * @Description:
 * @File:  main
 * @Version: 1.0.0
 * @Date: 2021/5/26 下午5:48
 */

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	pb "grpc/test/src/proto"
	"io"
	"log"
)
const (
	PORT = "9002"
)
func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("grpc.Dial err: %v", err)
	}
	defer conn.Close()
	client := pb.NewStreamServiceClient(conn)

	err = printEats(client, &pb.PublicRequest{
		Req:                  &pb.Item{
			Value:                "value",
			Value2:               "value1",

		},
	})
	if err != nil {
		log.Fatalf("printEats.err: %v", err)
	}

}

//服务端流式
func printEats(client pb.StreamServiceClient, r *pb.PublicRequest) error {
	var c context.Context

	c = context.WithValue(context.TODO(),"a","b")

	stream,err := client.Eat(c,r)
	if err != nil{
		return err
	}
	//接收server的header信息
	fmt.Println(stream.Header())//map[cc:[dd] content-type:[application/grpc]]

	for {
		resp ,err := stream.Recv()
		if err == io.EOF{
			break
		}
		if err != nil{
			return err
		}
		log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)
	}
	//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
	fmt.Println(stream.Trailer())//map[cc1:[dd1]]

	return nil
}

服务端代码

/**
 * @Author: zhangsan
 * @Description:
 * @File:  main
 * @Version: 1.0.0
 * @Date: 2021/5/26 下午5:32
 */

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	pb "grpc/test/src/proto"
	"log"
	"net"
	"time"
)
type StreamService struct{}

const (
	PORT = "9002"
)
func main() {
	server := grpc.NewServer()
	pb.RegisterStreamServiceServer(server, &StreamService{})
	lis, err := net.Listen("tcp", ":"+PORT)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	server.Serve(lis)
}
//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {
	//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
	if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
		return err
	}
	//设置header信息
	//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
	//	return err
	//}



	//设置metadata,注意一元和流式的区别
	stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})

	a := stream.Context().Value("a")
	fmt.Println(a)

	for i := 0; i < 10;i++{
		time.Sleep(1 *time.Second)
		err := stream.Send(&pb.PublicResponse{
			Resp:                 &pb.Item{
				Value:                "eat",
				Value2:               "服务端流式",

			},
		})

		if err != nil{
			return err
		}



	}

	return nil
}

func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {
	return nil
}
func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
	return nil
}

验证

 go run src/client/stream_client/main.go
map[cc2:[dd2] content-type:[application/grpc]] <nil>
2021/05/27 10:05:08 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:09 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:10 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:11 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:12 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:13 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:14 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:15 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:16 resp: value1 eat, value1 服务端流式
go run src/server/stream_server/main.go

分析

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eRmLdnpt-1622633151060)(readme.assets/image-20210527111442672.png)]

server

Stream.Send 追后也是调用的SendMsg方法

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加5个字节的header
  • 判断压缩+序列化后的消息体总字节是否大雨预设的maxSendMessageSize(math。MaxInt32),超出报错
  • 写入给流的数据集
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
	if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
		return err
	}
	//设置header信息
	//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
	//	return err
	//}



	//设置metadata,注意一元和流式的区别
	stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
  1. SetHeader 和 SendHeader ,SendHeader会覆盖之前的setheader信息,尽量只使用一个

  2. SetTrailer 和 SetHeader 区别,SetTrailer在一元的时候会和SetHeader一起返回,在流式rpc的时候会在send 和rev之后才会被发送和接收

    setTrailer和setheader区别

Client

RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:

(1)RecvMsg 是阻塞等待的

(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF

(3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 * 1024 * 4,建议不要超出 4M

  1. 在ServerBuilder中,通过SetMaxReceiveMessageSize(int)设置这个最大允许字节长度,因为这里的参数为Int型,所以其最大的字节允许长度也就是INT_MAX=2147483647 (2G)。
  2. 流式的grpc传输4M的大小是可以的,流式传输哈

Server

SendMsg 方法,该方法涉及以下过程:

  • 消息体(对象)序列化
  • 压缩序列化后的消息体
  • 对正在传输的消息体增加 5 个字节的 header
  • 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
  • 写入给流的数据集

3、客户端流式

客户端代码

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	pb "grpc/test/src/proto"
	"io"
	"log"
)
const (
	PORT = "9002"
)
func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("grpc.Dial err: %v", err)
	}
	defer conn.Close()
	client := pb.NewStreamServiceClient(conn)

	err = printWork(client, &pb.PublicRequest{
		Req: &pb.Item{
			Value:                "valueWork",
			Value2:               "value1Work",

		},
	})
	if err != nil {
		log.Fatalf("printWork.err: %v", err)
	}
}

func printWork(client pb.StreamServiceClient, r *pb.PublicRequest) error {
	stream,err := client.Work(context.Background())
	if err != nil{
		return err
	}

	for i := 0 ;i < 6;i++{
		fmt.Println(r)
		err := stream.Send(r)
		if err == io.EOF{
			break
		}
		if err != nil{
			return err
		}
	}

	//注意这个header是设置不了的
	//fmt.Println(stream.Header())

	resp ,err := stream.CloseAndRecv()
	if err != nil{
		return err
	}

	log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)

	//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
	fmt.Println(stream.Trailer())//map[cc1:[dd1]]
	return nil
}

服务端代码

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	pb "grpc/test/src/proto"
	"io"
	"log"
	"net"
)
type StreamService struct{}

const (
	PORT = "9002"
)
func main() {

	//设置客户端最大接收值
	//opt := grpc.MaxRecvMsgSize()

	server := grpc.NewServer()
	pb.RegisterStreamServiceServer(server, &StreamService{})
	lis, err := net.Listen("tcp", ":"+PORT)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	server.Serve(lis)
}
//客户端流事rpc
func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {

	//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
	if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
		return err
	}
	//设置header信息
	//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
	//	return err
	//}



	//设置metadata,注意一元和流式的区别
	stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})

	a := stream.Context().Value("a")
	fmt.Println(a)
	for {
		r ,err := stream.Recv()
		if err == io.EOF{
			return stream.SendAndClose(&pb.PublicResponse{
				Resp:                &pb.Item{
					Value:                "client-stream-server",
					Value2:               "client-stream-server-v2",
				} ,
			})
		}
		if err != nil{
			return err
		}
		log.Printf("stream.Recv value: %s,value2: %s", r.Req.Value, r.Req.Value2)
	}
}

//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {

	return nil
}

func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
	return nil
}

验证

-> % go run src/client/client-stream-client/main.go
req:<value:"valueWork" value2:"value1Work" > 
req:<value:"valueWork" value2:"value1Work" > 
req:<value:"valueWork" value2:"value1Work" > 
req:<value:"valueWork" value2:"value1Work" > 
req:<value:"valueWork" value2:"value1Work" > 
req:<value:"valueWork" value2:"value1Work" > 
2021/06/02 13:51:54 resp: value1 client-stream-server, value1 client-stream-server-v2
map[cc1:[dd1]]

-> % go run src/server/client-stream_server/mian.go
<nil>
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work

分析

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PyN9Izz2-1622633151074)(readme.assets/image-20210602135900704.png)]

  1. 服务端是可以设置grpc.MaxRecvMsgSize()的接收大小的 默认是102410244= 4m大小
    还可以设置grpc.MaxSendMsgSize() 发送的大小,默认是int32,超出会报错
  2. stream.SendAndClose 当发现client的流关闭之后,需要将最终的结果响应给客户端,同时关闭在另一侧的recv
  3. stream.CloseAndRecv 就是和上面的一起使用的

4、客户端、服务端流式

客户端

package main

import (
	"context"
	"google.golang.org/grpc"
	pb "grpc/test/src/proto"
	"io"
	"log"
)
const (
	PORT = "9002"
)
func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("grpc.Dial err: %v", err)
	}
	defer conn.Close()
	client := pb.NewStreamServiceClient(conn)

	err = printSleep(client, &pb.PublicRequest{
		Req: &pb.Item{
			Value:                "valueSleep",
			Value2:               "value1Sleep",
		},
	})
	if err != nil {
		log.Fatalf("printSleep.err: %v", err)
	}
}

//双向流
func printSleep(client pb.StreamServiceClient, r *pb.PublicRequest) error {
	stream, err := client.Sleep(context.Background())
	if err != nil {
		return err
	}
	for n := 0; n <= 6; n++ {
		err = stream.Send(r)
		if err != nil {
			return err
		}
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		log.Printf("resp: value1: %s, value2: %s", resp.Resp.Value, resp.Resp.Value2)
	}
	if err = stream.CloseSend();nil != err{
		log.Println(err)
	}
	return nil
}

服务端

package main

import (
	"google.golang.org/grpc"
	pb "grpc/test/src/proto"
	"io"
	"log"
	"net"
)
type StreamService struct{}

const (
	PORT = "9002"
)
func main() {

	//设置客户端最大接收值
	//opt := grpc.MaxRecvMsgSize()
	//grpc.MaxSendMsgSize()

	server := grpc.NewServer()
	pb.RegisterStreamServiceServer(server, &StreamService{})
	lis, err := net.Listen("tcp", ":"+PORT)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	server.Serve(lis)
}
//客户端流事rpc
func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {
	return nil
}

//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {

	return nil
}

//双向流
func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
	n := 0
	for {
		err := stream.Send(&pb.PublicResponse{
			Resp: &pb.Item{
				Value:  "gPRC Stream Client: Sleep",
				Value2: "双向stream-value2",
			},
		})
		if err != nil {
			return err
		}
		r, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		n++
		log.Printf("stream.Recv req.value: %s, pt.value2: %s", r.Req.Value, r.Req.Value2)
	}

}

验证

-> % go run src/client/clientServer-stream_client/main.go
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2

-> % go run src/server/clientServer-stream_server/main.go 
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep

简单的介绍了几种rpc的流的使用,大家根据需求,合理使用文章来源地址https://www.toymoban.com/news/detail-409340.html

到了这里,关于grpc流式使用和注意事项的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Unity之Addressable使用注意事项

    Profile文件中配置资源构建路径和资源首次加载路径,资源如何设置了缓存,在首次加载之后会将再用缓存在缓存目录,后面将直接从缓存目录中读取,方便项目发包时候进行使用 AddressableAssetSettings文件 DisableCatalogUpdateOnStartup 勾选改选项,禁止自动更新,项目资源下载一般需

    2024年02月08日
    浏览(41)
  • 6 使用强制类型转换的注意事项

    概述         在C语言中,强制类型转换是通过直接转换为特定类型的方式来实现的,类似于下面的代码。         这种方式可以在任意两个类型间进行转换,太过随意和武断,很容易带来一些难以发现的隐患和问题。C++为了向下兼容,保留了这种方式,但新增了四个

    2023年04月08日
    浏览(38)
  • C++使用类的一些注意事项

    目录 前言: 1.再谈构造函数 2.(c++98)隐式类型转换中的编译器的优化 3.explicit 4.static成员 5.匿名对象 6.友元函数 7.内部类 8.编译器的一些场上的优化 总结: 若有歧义,请指出,感谢阅读! 1.再谈构造函数 我们在构造函数体中,给成员变量赋值能叫做成员变量的初始

    2024年04月10日
    浏览(47)
  • mysql中使用IN的注意事项

    在写sql语句过程中,难免会使用IN条件查询,那你知道使用IN要注意那些事项呢?下面我们就来一列举 使用IN查询是否会使用索引 答:有时会使用,有时就不会使用。当IN 的范围小时会使用索引查询,当IN的范围大的时候,就会全表扫描。 IN和EXISTS那个效率高 答:1、如果查询的

    2024年02月01日
    浏览(41)
  • 【Visual Leak Detector】使用注意事项

    使用 VLD 内存泄漏检测工具辅助开发时整理的学习笔记。本篇介绍使用 VLD 时的注意事项。同系列文章目录可见 《内存泄漏检测工具》目录 目录 说明 1. 官网文档 2. 注意事项 可以在 Using-Visual-Leak-Detector 官方文档里看到如何使用 VLD,里面介绍了如何在 Visual C++ 2003/2005/2008/201

    2023年04月11日
    浏览(38)
  • 相机拍摄3要素及其使用注意事项(全)

    1、原文持续更新中:https://www.cnblogs.com/MrFlySand/p/17897031.html 2、PS修图插件,一键美颜,你知道吗?(戳我),后台回复“230707PS插件”获取相关插件应用,回复“230708PS插件教程”获取教学链接;回复“230730camera快捷键”获取快捷键链接。 感光度(ISO) : 数字越大,感光度越大(对光

    2024年02月03日
    浏览(43)
  • Vue中data变量使用的注意事项

    因为在Vue中,data中的属性往往都是用于双向绑定,所以Vue会对其有劫持,所以我们在对data属性进行操作时,尽量不要对其直接操作,比如下面代码: 最终打印结果如下: 可以看到包含一个Observer属性,这是Vue自动加上的。 上面代码不断向 this.list 中添加数据,这样会造成过

    2024年02月13日
    浏览(47)
  • RabbitMQ基本使用及企业开发中注意事项

    目录 一、基本使用 二、使用注意事项 1. 生产者重连机制 - 保证mq服务是通的 2. 生产者确认机制 - 回调机制 3. MQ的可靠性 4. Lazy Queue模式 5. 消费者确认机制 部署完RabbitMQ有两种使用方式: 网页客户端 Java代码 MQ组成部分: 虚拟主机 - 进行数据隔离的,好比mysql中的不同数据库

    2024年04月26日
    浏览(43)
  • java中interface的使用以及注意事项

    一、接口(interface)基本概念 接口(interface):是java中一种引用数据类型,可以看做方法的集合,其内部主要就是封装了方法,包含抽象方法(JDK 7及以前),默认方法和静态方法(JDK 8),私有方法(JDK 9). 二、使用格式   1.定义格式:   public interface 接口名称{     //抽象方法    

    2024年02月06日
    浏览(36)
  • 使用物理机服务器应该注意的事项

    使用物理机服务器应该注意的事项 如今云计算的发展已经遍布各大领域,尽管现在的云服务器火遍全网,但是仍有一些大型企业依旧选择使用独立物理服务器,你知道这是为什么吗?壹基比小鑫来告诉你吧。 独立物理服务器托管业务适合大中型企业及新兴网络业务如网络视

    2024年02月09日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包