go语言 grpc 拦截器

这篇具有很好参考价值的文章主要介绍了go语言 grpc 拦截器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

go语言 grpc 拦截器,go web开发框架,golang,开发语言,后端

一元拦截器
package main

import (
	"context"
	"flag"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "mygrpc/proto/hello" // 引入编译生成的包
)

const (
	defaultName = "world"
)

var (
	addr = flag.String("addr", "localhost:50051", "the address to connect to")
	name = flag.String("name", defaultName, "Name to greet")
)

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"

流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
	log.Printf("Recved %v", req.GetName())
	// 具体返回多少个response根据业务逻辑调整
	for i := 0; i < 10; i++ {
		// 通过 send 方法不断推送数据
		err := stream.Send(&pb.HelloReply{})
		if err != nil {
			log.Fatalf("Send error:%v", err)
			return err
		}
	}
	return nil
}

type wrappedStream struct {
	// 包装器流
	grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.SendMsg(m)
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedStream{s}
}

func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前置处理
	log.Println("====== [Server Stream Interceptor] ", info.FullMethod)

	// 包装器流调用 流RPC
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Printf("RPC failed with error %v", err)
	}
	return err
}
func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 开启rpc
	s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
	// 注册服务
	pb.RegisterGreeterServer(s, &server{})
	log.Printf("service listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)


客户端拦截器

go语言 grpc 拦截器,go web开发框架,golang,开发语言,后端

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// 前置处理逻辑
	log.Println("Method : " + method)

	// 调用invoker 执行远程方法
	err := invoker(ctx, method, req, reply, cc, opts...)

	// 后置处理逻辑
	log.Println(reply)
	return err
}

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {
	grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	
  // 前置处理逻辑
	log.Println("======= [Client Interceptor] ", method)
  
  // 调用streamer 来获取客户端流
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return newWrappedStream(s), nil
}

func main(){
   // 注册拦截器到客户端流
 conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
                     
	c := pb.NewOrderManagementClient(conn)
  ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
  
  // 调用客户端流RPC方法
  searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
	for {
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}
		if err == nil {
			log.Print("Search Result : ", searchOrder)
		}
	}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {
	interceptors := cc.dopts.chainUnaryInts
	if cc.dopts.unaryInt != nil {
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
	}
	var chainedInt UnaryClientInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
		}
	}
	cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
	n := len(interceptors)
	if n > 1 {
		lastI := n - 1
		return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
			var (
				chainHandler grpc.UnaryInvoker
				curI         int
			)

			chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
				if curI == lastI {
					return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
				}
				curI++
				err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
				curI--
				return err
			}

			return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
		}
	}
    ...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。文章来源地址https://www.toymoban.com/news/detail-757107.html

到了这里,关于go语言 grpc 拦截器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloud微服务实战——搭建企业级开发框架:微服务安全加固—自定义Gateway拦截器实现防止SQL注入/XSS攻击

     SQL注入是常见的系统安全问题之一,用户通过特定方式向系统发送SQL脚本,可直接自定义操作系统数据库,如果系统没有对SQL注入进行拦截,那么用户甚至可以直接对数据库进行增删改查等操作。   XSS全称为Cross Site Script跨站点脚本攻击,和SQL注入类似,都是通过特定方

    2024年02月03日
    浏览(65)
  • 分布式项目 16 购物车系统,dubbo框架(重点是拦截器),优化userId,配合拦截器

    01.创建jt-cart项目 第一步: 第二步: 第三步: 第四步: 在pom.xml文件中添加jt-common的依赖,如图所示: 第五步: 添加插件 第六步:创建pojo实体类对象 说明:在jt-common项目下的com.jt.pojo创建Cart实体类 第七步:创建Dubbo接口 说明:在jt-common项目com.jt.service包下创建DubboCartSer

    2024年02月09日
    浏览(49)
  • 【Java Web】用拦截器的方式获取用户信息

    流程:从cookie中获取凭证,根据凭证查询用户,并在本次请求中持有用户,在视图模板上显示登录用户的信息。 1. 定义拦截器 2. 配置拦截器

    2024年02月10日
    浏览(42)
  • Java开发 - 拦截器初体验

    目录 前言  拦截器 什么是拦截器 拦截器和过滤器 Spring MVC的拦截器

    2023年04月17日
    浏览(42)
  • 学习 [Spring MVC] 的JSR 303和拦截器,提高开发效率

                                                   🎬 艳艳耶✌️ :个人主页                                                  🔥 个人专栏 : 《推荐】Spring与Mybatis集成整合》                                                  ⛺️    生活的理想,不断更

    2024年02月09日
    浏览(46)
  • SpringBoot -05 SpringBoot web相关配置(静态资源访问、统一异常处理、文件上传、拦截器、统一跨域请求处理)

    小总结 SpringBoot是一个基于Spring的工具集,去帮我们完成了大量的配置。在SpringBoot中有一个约定大于配置的概念,就是他把我们很多第三方框架帮我们写好了,而且把我们整个第三方框架所需要的依赖全都通过起步依赖加进去了。开发中只需要加入起步依赖就可以实现某个场

    2024年02月01日
    浏览(45)
  • 数据权限拦截器,多租户拦截器

    WEB类型软件产品,在Java(SpringBoot)+MybatisPlus架构场景下,本文针对下面两个问题,提供解决方案: 多租户的产品,想在表内级别上,实现租户数据隔离(分表、分库方案不在本文讨论范围内)。 ToB、ToG类型的软件产品,需要实现数据权限鉴权。例如用户数据、部门数据、租户

    2024年02月02日
    浏览(45)
  • VUE3 请求拦截器 响应拦截器

    1,导入axios  (使用axios进行接口的请求,页面发送http请求,很多情况我们要对请求和其响应进行特定的处理,如:判断token,设置请求头。如果请求数非常多,单独对每一个请求进行处理会变得非常麻烦,程序的优雅性也会大打折扣。所以axios为开发者提供了这样一个API:拦

    2024年02月16日
    浏览(49)
  • SpringBoot加入拦截器——登录拦截器的实现

            拦截器 Interceptor 在 Spring MVC 中的地位等同于 Servlet 规范中的过滤器 Filter,拦截的是处理器的执行,由于是全局行为,因此常用于做一些通用的功能,如请求日志打印、权限控制等。         核心原理:AOP思想 preHandle:  预先处理,在目标的controller方法执行之前,进行

    2024年02月15日
    浏览(43)
  • 自定义注解与拦截器实现不规范sql拦截(拦截器实现篇)

    最近考虑myBatis中sql语句使用规范的问题,如果漏下条件或者写一些不规范语句会对程序性能造成很大影响。最好的方法就是利用代码进行限制,通过拦截器进行sql格式的判断在自测环节就能找到问题。写了个简单情景下的demo,并通过idea插件来将myBatis的mapper方法都打上拦截器

    2024年01月22日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包