stream server client
流式grpc
- Server-side streaming RPC:服务器端流式 RPC
- Client-side streaming RPC:客户端流式 RPC
- Bidirectional streaming RPC:双向流式 RPC
1、proto
syntax = "proto3";
package stream;
service StreamService {
rpc Eat(EatRequest) returns (stream EatResponse) {} //服务端流式
rpc Work(stream EatRequest) returns (EatResponse) {}//客户端流式
rpc Sleep(stream EatRequest) returns (stream EatResponse) {}//双向流
}
message Item{
string value = 1;
string value2 = 2;
}
message EatRequest{
Item req = 1;
}
message EatResponse{
Item resp = 1;
}
2、服务端流式
客户端代码
/**
* @Author: zhangsan
* @Description:
* @File: main
* @Version: 1.0.0
* @Date: 2021/5/26 下午5:48
*/
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc/test/src/proto"
"io"
"log"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printEats(client, &pb.PublicRequest{
Req: &pb.Item{
Value: "value",
Value2: "value1",
},
})
if err != nil {
log.Fatalf("printEats.err: %v", err)
}
}
//服务端流式
func printEats(client pb.StreamServiceClient, r *pb.PublicRequest) error {
var c context.Context
c = context.WithValue(context.TODO(),"a","b")
stream,err := client.Eat(c,r)
if err != nil{
return err
}
//接收server的header信息
fmt.Println(stream.Header())//map[cc:[dd] content-type:[application/grpc]]
for {
resp ,err := stream.Recv()
if err == io.EOF{
break
}
if err != nil{
return err
}
log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)
}
//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
fmt.Println(stream.Trailer())//map[cc1:[dd1]]
return nil
}
服务端代码
/**
* @Author: zhangsan
* @Description:
* @File: main
* @Version: 1.0.0
* @Date: 2021/5/26 下午5:32
*/
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb "grpc/test/src/proto"
"log"
"net"
"time"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":"+PORT)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
return err
}
//设置header信息
//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
// return err
//}
//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
a := stream.Context().Value("a")
fmt.Println(a)
for i := 0; i < 10;i++{
time.Sleep(1 *time.Second)
err := stream.Send(&pb.PublicResponse{
Resp: &pb.Item{
Value: "eat",
Value2: "服务端流式",
},
})
if err != nil{
return err
}
}
return nil
}
func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {
return nil
}
func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
return nil
}
验证
go run src/client/stream_client/main.go
map[cc2:[dd2] content-type:[application/grpc]] <nil>
2021/05/27 10:05:08 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:09 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:10 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:11 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:12 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:13 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:14 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:15 resp: value1 eat, value1 服务端流式
2021/05/27 10:05:16 resp: value1 eat, value1 服务端流式
go run src/server/stream_server/main.go
分析
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eRmLdnpt-1622633151060)(readme.assets/image-20210527111442672.png)]
server
Stream.Send 追后也是调用的SendMsg方法
- 消息体(对象)序列化
- 压缩序列化后的消息体
- 对正在传输的消息体增加5个字节的header
- 判断压缩+序列化后的消息体总字节是否大雨预设的maxSendMessageSize(math。MaxInt32),超出报错
- 写入给流的数据集
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
return err
}
//设置header信息
//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
// return err
//}
//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
-
SetHeader 和 SendHeader ,SendHeader会覆盖之前的setheader信息,尽量只使用一个
-
SetTrailer 和 SetHeader 区别,SetTrailer在一元的时候会和SetHeader一起返回,在流式rpc的时候会在send 和rev之后才会被发送和接收
setTrailer和setheader区别
Client
RecvMsg 会从流中读取完整的 gRPC 消息体,另外通过阅读源码可得知:
(1)RecvMsg 是阻塞等待的
(2)RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF
(3)RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误:
- io.EOF
- io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes
同时需要注意,默认的 MaxReceiveMessageSize 值为 1024 * 1024 * 4,建议不要超出 4M
- 在ServerBuilder中,通过SetMaxReceiveMessageSize(int)设置这个最大允许字节长度,因为这里的参数为Int型,所以其最大的字节允许长度也就是INT_MAX=2147483647 (2G)。
- 流式的grpc传输4M的大小是可以的,流式传输哈
Server
SendMsg
方法,该方法涉及以下过程:
- 消息体(对象)序列化
- 压缩序列化后的消息体
- 对正在传输的消息体增加 5 个字节的 header
- 判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为
math.MaxInt32
),若超出则提示错误 - 写入给流的数据集
3、客户端流式
客户端代码
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc/test/src/proto"
"io"
"log"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printWork(client, &pb.PublicRequest{
Req: &pb.Item{
Value: "valueWork",
Value2: "value1Work",
},
})
if err != nil {
log.Fatalf("printWork.err: %v", err)
}
}
func printWork(client pb.StreamServiceClient, r *pb.PublicRequest) error {
stream,err := client.Work(context.Background())
if err != nil{
return err
}
for i := 0 ;i < 6;i++{
fmt.Println(r)
err := stream.Send(r)
if err == io.EOF{
break
}
if err != nil{
return err
}
}
//注意这个header是设置不了的
//fmt.Println(stream.Header())
resp ,err := stream.CloseAndRecv()
if err != nil{
return err
}
log.Printf("resp: value1 %s, value1 %s",resp.Resp.Value,resp.Resp.Value2)
//在一元rpc中header和trailer是一起到达的,在流式中是在接受消息后到达的
fmt.Println(stream.Trailer())//map[cc1:[dd1]]
return nil
}
服务端代码
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb "grpc/test/src/proto"
"io"
"log"
"net"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
//设置客户端最大接收值
//opt := grpc.MaxRecvMsgSize()
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":"+PORT)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
//客户端流事rpc
func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {
//设置header信息 sendHeader不可同时用,否则SendHeader会覆盖前一个
if err := stream.SetHeader(metadata.MD{"cc2":[]string{"dd2"}});nil != err{
return err
}
//设置header信息
//if err := stream.SendHeader(metadata.MD{"cc":[]string{"dd"}});err != nil{
// return err
//}
//设置metadata,注意一元和流式的区别
stream.SetTrailer(metadata.MD{"cc1":[]string{"dd1"}})
a := stream.Context().Value("a")
fmt.Println(a)
for {
r ,err := stream.Recv()
if err == io.EOF{
return stream.SendAndClose(&pb.PublicResponse{
Resp: &pb.Item{
Value: "client-stream-server",
Value2: "client-stream-server-v2",
} ,
})
}
if err != nil{
return err
}
log.Printf("stream.Recv value: %s,value2: %s", r.Req.Value, r.Req.Value2)
}
}
//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {
return nil
}
func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
return nil
}
验证
-> % go run src/client/client-stream-client/main.go
req:<value:"valueWork" value2:"value1Work" >
req:<value:"valueWork" value2:"value1Work" >
req:<value:"valueWork" value2:"value1Work" >
req:<value:"valueWork" value2:"value1Work" >
req:<value:"valueWork" value2:"value1Work" >
req:<value:"valueWork" value2:"value1Work" >
2021/06/02 13:51:54 resp: value1 client-stream-server, value1 client-stream-server-v2
map[cc1:[dd1]]
-> % go run src/server/client-stream_server/mian.go
<nil>
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
2021/06/02 13:51:54 stream.Recv value: valueWork,value2: value1Work
分析
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PyN9Izz2-1622633151074)(readme.assets/image-20210602135900704.png)]文章来源:https://www.toymoban.com/news/detail-409340.html
- 服务端是可以设置grpc.MaxRecvMsgSize()的接收大小的 默认是102410244= 4m大小
还可以设置grpc.MaxSendMsgSize() 发送的大小,默认是int32,超出会报错 - stream.SendAndClose 当发现client的流关闭之后,需要将最终的结果响应给客户端,同时关闭在另一侧的recv
- stream.CloseAndRecv 就是和上面的一起使用的
4、客户端、服务端流式
客户端
package main
import (
"context"
"google.golang.org/grpc"
pb "grpc/test/src/proto"
"io"
"log"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
if err != nil {
log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printSleep(client, &pb.PublicRequest{
Req: &pb.Item{
Value: "valueSleep",
Value2: "value1Sleep",
},
})
if err != nil {
log.Fatalf("printSleep.err: %v", err)
}
}
//双向流
func printSleep(client pb.StreamServiceClient, r *pb.PublicRequest) error {
stream, err := client.Sleep(context.Background())
if err != nil {
return err
}
for n := 0; n <= 6; n++ {
err = stream.Send(r)
if err != nil {
return err
}
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Printf("resp: value1: %s, value2: %s", resp.Resp.Value, resp.Resp.Value2)
}
if err = stream.CloseSend();nil != err{
log.Println(err)
}
return nil
}
服务端
package main
import (
"google.golang.org/grpc"
pb "grpc/test/src/proto"
"io"
"log"
"net"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
//设置客户端最大接收值
//opt := grpc.MaxRecvMsgSize()
//grpc.MaxSendMsgSize()
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp", ":"+PORT)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
//客户端流事rpc
func (s *StreamService) Work(stream pb.StreamService_WorkServer) error {
return nil
}
//服务端流式
func (s *StreamService) Eat(r *pb.PublicRequest, stream pb.StreamService_EatServer) error {
return nil
}
//双向流
func (s *StreamService) Sleep(stream pb.StreamService_SleepServer) error {
n := 0
for {
err := stream.Send(&pb.PublicResponse{
Resp: &pb.Item{
Value: "gPRC Stream Client: Sleep",
Value2: "双向stream-value2",
},
})
if err != nil {
return err
}
r, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
n++
log.Printf("stream.Recv req.value: %s, pt.value2: %s", r.Req.Value, r.Req.Value2)
}
}
验证
-> % go run src/client/clientServer-stream_client/main.go
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
2021/06/02 14:11:55 resp: value1: gPRC Stream Client: Sleep, value2: 双向stream-value2
-> % go run src/server/clientServer-stream_server/main.go
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:39 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
2021/06/02 14:11:55 stream.Recv req.value: valueSleep, pt.value2: value1Sleep
简单的介绍了几种rpc的流的使用,大家根据需求,合理使用文章来源地址https://www.toymoban.com/news/detail-409340.html
到了这里,关于grpc流式使用和注意事项的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!