前言
在看下面时,我们先来 分析下入口方法,trpc.NewServer都做了拿一些事情
1.读取配置文件,这里会读取用户设置的配置文件路径在(./trpc_go.yaml)和设置默认配置(网络类型tcp,协议类型trpc),然后设置到Config对象中
2.把配置通过localstore的方式设置到全局变量中
3.开始设置用户自定义插件,亮点功能
4.初始化服务和rpc连接的最大并发数
5.关闭插件方法,前提是插件必须实现了closes接口
- 根据以上,我们对trpc框架进行三个方面的讲解
1.trpc框架配置文件加载方式
trpc配置文件采用yaml格式,文件默认目录在 ./trpc_go.yaml下,所有自定义配置都需要写在这个yaml文件中,所有支持用户自定义的配置都可以参考结构体config
// Config is the configuration for trpc, which can be divided into 4 parts:
// 1. Global config.
// 2. Server config.
// 3. Client config.
// 4. Plugins config.
type Config struct {
Global struct {
Namespace string `yaml:"namespace"` // Namespace for the configuration.
EnvName string `yaml:"env_name"` // Environment name.
ContainerName string `yaml:"container_name"` // Container name.
LocalIP string `yaml:"local_ip"` // Local IP address.
EnableSet string `yaml:"enable_set"` // Y/N. Whether to enable Set. Default is N.
// Full set name with the format: [set name].[set region].[set group name].
FullSetName string `yaml:"full_set_name"`
// Size of the read buffer in bytes. <=0 means read buffer disabled. Default value will be used if not set.
ReadBufferSize *int `yaml:"read_buffer_size,omitempty"`
}
Server struct {
App string `yaml:"app"` // Application name.
Server string `yaml:"server"` // Server name.
BinPath string `yaml:"bin_path"` // Binary file path.
DataPath string `yaml:"data_path"` // Data file path.
ConfPath string `yaml:"conf_path"` // Configuration file path.
Admin struct {
IP string `yaml:"ip"` // NIC IP to bind, e.g., 127.0.0.1.
Nic string `yaml:"nic"` // NIC to bind.
Port uint16 `yaml:"port"` // Port to bind, e.g., 80. Default is 9028.
ReadTimeout int `yaml:"read_timeout"` // Read timeout in milliseconds for admin HTTP server.
WriteTimeout int `yaml:"write_timeout"` // Write timeout in milliseconds for admin HTTP server.
EnableTLS bool `yaml:"enable_tls"` // Whether to enable TLS.
RPCZ *RPCZConfig `yaml:"rpcz"` // RPCZ configuration.
}
Transport string `yaml:"transport"` // Transport type.
Network string `yaml:"network"` // Network type for all services. Default is tcp.
Protocol string `yaml:"protocol"` // Protocol type for all services. Default is trpc.
Filter []string `yaml:"filter"` // Filters for all services.
StreamFilter []string `yaml:"stream_filter"` // Stream filters for all services.
Service []*ServiceConfig `yaml:"service"` // Configuration for each individual service.
// Minimum waiting time in milliseconds when closing the server to wait for deregister finish.
CloseWaitTime int `yaml:"close_wait_time"`
// Maximum waiting time in milliseconds when closing the server to wait for requests to finish.
MaxCloseWaitTime int `yaml:"max_close_wait_time"`
Timeout int `yaml:"timeout"` // Timeout in milliseconds.
}
Client ClientConfig `yaml:"client"` // Client configuration.
Plugins plugin.Config `yaml:"plugins"` // Plugins configuration.
}
1.1 trpc配置文件加载流程和干了哪些事情
简单来说干了以下几件事情
-
1.获取配置文件的默认目录,并且解析目录到 ServerConfigPath 中
-
2.读取yaml文件,把用户自定义配置读取到config结构体对象中,用于后续的使用和初始化
-
3.初始化用户配置
1.读取配置文件中的ip地址,如果是网卡名称,则获取网卡对应的ip地址,然后设置成服务的ip地址
2.读取用户自定义配置 设置网络协议类型和ip:prot地址,设置连接最大存活时间和请求超时时间
3.读取自定义客户端网络配置,并且设置到config对象中
- 4.采用atomic,Value的local store 写时复制技术(线程安全)把config配置文件加载到全局对象 globalConfig 之中
2.插件化实现原理和简单demo
trpc-go实现插件化简单来说就分为三步
1.加载插件,设置一个插件队列,按照用户自定义的顺序去加载读取插件,并且把插件默认状态变成不可用,使用方法 loadPlugins()
2.从插件队列中取出插件,进行启动,使用方法setupPlugins()
2.1.首先判断插件的依赖关系,把先依赖的插件先启动
2.2.一个一个的去启动插件并且执行,然后把插件状态变成可用
3.执行插件执行结束关闭方法,onFinish() 如果你不想插件结束可以不用实现这个方法,该方法不是必选
2.1 加载插件方法分析 loadPlugins
1.首先设置默认支持最大插件个数1000、初始化channel插件队列,设置插件默认状态为false
2.读取用户配置文件,获取到用户自定义插件配置
3.通过用户配置的yaml文件,在factory中获取到具体初始化完成的插件
4.把初始化完成的插件状态变成 false
注意:根据代码可以分析出来 可以注册多个类型的插件,每个类型又可以绑定多个插件,最小唯独是插件类型+插件名称,也就是一个具体的插件
2.2 插件启动和插件启动校验 setupPlugins()
1.按照顺序读取插件队列中的插件,插件队列是利用channel实现的,先进先出队列
2.检查当前插件依赖的插件是否已经初始化完成,如果没有完成,则会把当前插件取出来放在channel队尾
如果想自定义依赖实现接口 Depender 即可,注意这里分为强依赖和弱依赖
强依赖:如果插件a强依赖插件b,那么插件[]b们必须存在,插件a会在插件b初始化完成后再初始化
弱依赖:如果插件a弱依赖插件[]b们,只需要有一个插件存在就可以
3.如果插件检查完毕通过后,trpc会调用 插件实现的 setup()方法去运行插件,而且把用户自定义的配置信息传递过去
4.最后检验插件是否初始化和执行完毕
如果没有完毕,会返回一个错误 cycle depends, not plugin is setup,服务将不会启动
如果完毕,则会返回插件队列和插件关闭函数,前提是插件实现了插件关闭函数
2.3. 如果插件执行完毕,实现 OnFinish()方法,关闭插件;当然这个方法可以不用实现
2.4 流程总结
2.1 首先实现 trpc提供的 Factory接口,完成自定义插件设计
2.2 插件中init() 方法里面调用 trpc提供的 plugins,Register() 方法 把插件注册到 局部变量 var plugins = make(map[string]map[string]Factory)中
2.3 trpc框架在程序启动的时候会调用 loadPlugins()方法和plugins中的 get()方法,把插件从factory中获取出来
- 1.首先实现 trpc提供的 Factory接口,完成自定义插件设计
Factory接口有两个方法 Type()和SetUp()
Type()的作用是设置插件的类型,也就是插件的名称,要保持唯一
SetUp()的作用是实现插件具体的业务逻辑,trpc框架会进行调用
- 2.在插件的init()方法中调用 trpc提供的 plugins,Register() 方法 把插件注册到 局部变量 var plugins = make(map[string]map[string]Factory)中
- 3.trpc框架在程序启动的时候会调用 loadPlugins()方法和plugins中的 get()方法,把插件从factory中获取出来,放入到channel队列中依次执行
2.5 实现插件demo,这里拿日志上报插件举例子
插件demo
package main
import (
"context"
"trpc.group/trpc-go/trpc-go"
"trpc.group/trpc-go/trpc-go/filter"
"trpc.group/trpc-go/trpc-go/plugin"
)
const (
pluginName = "metric_log"
pluginType = "tracing"
)
// Config 插件配置
type MetricLogConfig struct {
ServiceName string `yaml:"service_name"`
}
var metricLogConfigPluginCfg = MetricLogConfig{}
func init() {
plugin.Register(pluginName, &MetricLogPlugin{})
}
// EduTracingPlugin
type MetricLogPlugin struct {
}
// PluginType 返回插件类型
func (e *MetricLogPlugin) Type() string {
return pluginType
}
// Setup 装载tracer实现
func (e *MetricLogPlugin) Setup(name string, decoder plugin.Decoder) error {
cfg := MetricLogConfig{}
//1。解析在trpc.yaml中自定义的插件配置文件
if err := decoder.Decode(&cfg); err != nil {
return err
}
//2.设置插件相关的配置参数和网络类型
if cfg.ServiceName == "" {
cfg.ServiceName = pluginName
}
metricLogConfigPluginCfg = cfg
//3.创建拦截器ServerFilterRaw 和ClientFilterRaw,拦截rpc请求,然后注册到trpc拦截器中
filter.Register(name, ServerFilterRaw(e), ClientFilterRaw(e))
return nil
}
func ServerFilterRaw(e *MetricLogPlugin) filter.ServerFilter {
return func(ctx context.Context, req interface{}, handler filter.ServerHandleFunc) (rsp interface{}, err error) {
// 设置日志中的上下文处理.
msg := trpc.Message(ctx)
// 业务处理.
rsp, err = handler(ctx, req)
// 日志上报
UploadLog(ctx, msg)
return rsp, err
}
}
func ClientFilterRaw(e *MetricLogPlugin) filter.Filter {
return func(ctx context.Context, req, rsp interface{}, handler filter.HandleFunc) (err error) {
msg := trpc.Message(ctx)
// 业务处理.
err = handler(ctx, req, rsp)
// 日志上报
UploadLog(ctx, msg)
return err
}
}
使用方式在自己服务的main,go中引入即可
3.trpc-go rpc注册、调用流程
trpc-go rpc 分成了三个部分
1.在 trpc.NewServer()中调用 NewServerWithConfig()方法,把用户自定义服务端配置加载到service对象中
2.调用trpc工具生成的pb文件 如 pb.RegisterGoTrpcTestService(s.Service("trpc.test.svr"), service)把服务名称和服务队员的rpc方法注册到server中
3.调用s.serve()方法,创建tpc连接进行 rpc方法调用
3.1 初始化sevice NewServerWithConfig()
1.读取用户自定义拦截器,进行拦截器去重
拦截器支持普通拦截器Filter和流式拦截器StreamFilter
2. 把网络相关、拦截器相关等参数设置到service中
3.初始化Transport和ServeOptions对象
1.这里特别注意下 Transport对象,
该对象如果用户在配置文件中没有自定义,那么trpc框架会给他一个默认值,后续会使用该默认值进行rpc调用
2.这里特别注意下 s.opts.ServeOptions
把serve中的handler地址给了s.opts.ServeOptions,当在进行rpc方法注册时,也会在s.opts.ServeOptions中注册一份rpc方法表
3.2 注册服务名称和服务对应的rpc方法到server中 service.Register()方法
Register(serviceDesc interface{}, serviceImpl interface{})
-
1.register方法有两个入参数,一个是serviceDesc代表注册的服务名称,一个是serviceImpl代表服务的rpc方法列表
-
2.register方法逻辑,主要就是初始化拦截器和注册rpc方法到handler中
-
2.1 首先初始化流式拦截器
-
2.2 然后把serviceImpl中的方法依次注册到server的Handlers中,注意这一步也会把对象注册到service的opts.ServeOptions中
-
2.3 初始化普通拦截器
3.3 建立rpc连接,进行rpc方法调用 s.serve()
建立rpc连接,也是按照传统的net包进行的,分别分为
1.创建监听器
2.等待客户端连接
3.调用rpc方法处理数据,然后把数据写入到连接中
4.关闭连接
- 1.获取初始化时的sevice对象,建立tcp连接,一个对象单独建立一个连接
-
- 调用第一步初始化 Transport对象的ListenAndServe()方法,如果没有设置默认的listener,就会根据用户设置的networker类型来创建监听器
-
- 创建一个tcp的监听器,内部用的就是net包的listne方法
- 创建一个tcp的监听器,内部用的就是net包的listne方法
-
- 监听客户端连接,如果没有客户端连接,Accept方法会阻塞
- 监听客户端连接,如果没有客户端连接,Accept方法会阻塞
-
5.把rpc方法列表从opts.ServeOptions设置到conn.handler中去
文章来源:https://www.toymoban.com/news/detail-771135.html -
6.调用rpc方法处理请求,然后再把数据通过conn连接写会到客户端,写完后关闭连接,整个流程结束
文章来源地址https://www.toymoban.com/news/detail-771135.html
到了这里,关于go语言:腾讯终于开源trpc框架——对trpc-go源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!