云原生学习路线导航页(持续更新中)
- 本文是 Kubernetes operator学习 系列第一篇,主要对client-go进行学习,从源码阅读角度,学习client-go各个组件的实现原理、如何协同工作等
- 参考视频:Bilibili 2022年最新k8s编程operator篇,UP主:白丁云原生
- 本文参考资料
- https://pan.baidu.com/s/1BibLAishAFJLeTyYCLnlbA 提取码: en2p
- https://zhuanlan.zhihu.com/p/573982128
- https://xinchen.blog.csdn.net/article/details/113753087
- 并根据个人理解进行了汇总和修改
1.为什么要学习 client-go
- 为了适应更多的业务场景,k8s提供了很多的扩展点,用于满足更复杂的需求
-
K8s的扩展点如下:
- ① kubectl
- 用户通过kubectl与ApiServer进行交互,kubectl提供了插件,可以扩展kubectl的行为,但是这些插件只能影响用户本地的环境
- ② API Server
- 处理所有的请求,可以对用户请求进行 身份认证、基于其内容阻止请求、编辑请求内容、处理删除操作等等。
- 这个扩展点应该说的是:用户可以 自定义 API Server
- ③ k8s提供的内置资源
- 我们无法修改,只能通过 annotation、label 控制他们
- ④ CRD
- 自定义资源,配合 自定义控制器Custom Controller,扩展k8s的特定业务场景
- ⑤ scheduler
- 调度器,决定k8s把Pod放到哪个节点执行。k8s提供了多种方式扩展调度行为
- ⑥ Controller Manager
- 实际上也是k8s的一个客户端,通过与API Server交互。k8s的每种资源都有对应的控制器,都属于ControllerManager
- client-go本质上就是一个与apiserver交互的库,所以Controller Manager也是通过 client-go 库与 API Server交互的
- ⑦ Custom Controller
- 自定义控制器,可以控制 内置资源,也可以控制自定义资源CRD
- ⑧ kubelet
- 使用CNI:使得k8s可以使用不同技术,连接Pod网络
- 使用CSI:使得k8s可以支持不同的存储类型
- 使用CRI:使得k8s可以支持不同的容器运行时
- ⑨ client-go
- 一个通用的Golang库,用于和 apiserver 交互
- 不管是k8s的各个组件,还是我们自己为CRD开发Custom Controller,都需要使用 client-go 与 API Server 进行通信
综上所述,要想学习 Operator,使用 CRD + Custom Controller 扩展kubernetes功能,必须先学习 Client-go 库,学会如何与APIServer进行交互
2.client-go与kubernetes版本对应关系
我们假设前提:kubernetes版本为 v1.x.y
- kubernetes版本 >= v1.17.0时,client-go 版本使用 v0.x.y
- kubernetes版本 < v1.17.0时,client-go 版本使用 v1.x.y
3.client-go架构
3.1.client-go 源代码目录介绍
- github地址:https://github.com/kubernetes/client-go
- discovery:用于发现API Server都是支持哪些API。kubectl apiversions使用了同样的机制
- dynamic:包含了kubernetes dynamic client的逻辑,可以操作任意的k8s资源API对象,包括内置的、自定义的资源对象
- informers:包含了所有内置资源的informer,便于操作k8s的资源对象
- kubernetes:包含了访问Kubernetes API的 所有ClientSet
- listers:包含了所有内置资源的lister,用于读取缓存中k8s资源对象的信息
- plugin/pkg/client/auth:包含所有可选的认证插件,用于从外部获取credential(凭证)
- tools:包含一系列工具,编写控制器时会用到很多里面的工具方法
- transport:包含了创建连接、认证的逻辑,会被上层的ClientSet使用
3.2.client-go 架构
图片参考来源:https://zhuanlan.zhihu.com/p/573982128
下面先介绍各组件整体的运转流程,然后对 client-go 和 一个 CRDController 应该包含哪些组件进行详细介绍。
-
整体流程简介:
- Reflector会持续监听k8s集群中指定资源类型的API,当发现变动和更新时,就会创建一个发生变动的 对象副本,并将其添加到队列DeltaFIFO中
- Informer监听DeltaFIFO队列,取出对象,做两件事:
- (1)将对象加入Indexer,Indexer 会将 [对象key, 对象] 存入一个线程安全的Cache中
- (2)根据对象的 资源类型和操作,找到对应 Controller 预先提供的 Resource Event Handler,调用Handler,将对象的Key加入该 Controller 的 Workqueue
- Controller 的循环函数 ProcessItem,监听到 Workqueue 有数据了,就会取出一个key,交给处理函数Worker,Worker 会根据 Key,使用 Indexer reference 从 Cache 中 获取 该key对应的 真实对象。然后就可以进行调谐了。
-
注意点
- DeltaFIFO 中 存的是 对象副本
- Cache 中 存的是 [对象key, 对象] 的映射
- Workqueue 中存的是 对象Key
- CRDController 中,使用Informer对象,是为了向其中添加一些 Resource Event Handlers
- CRDController 中,使用Indexer对象,是为了根据对象Key,获取对象实例
-
client-go组件
-
Reflector
- reflector会一直监听kubernetes中指定资源类型的API,实现监听的函数就是ListAndWatch。这种监听机制既适用于k8s的内建资源,也适用于自定义资源。
- 当reflector通过监听API发现资源对象实例存在新的 notification 时,它就会使用 listing API 获取这个新的实例对象,并将其放入 watchHandler 函数内的 DeltaFIFO 中;
-
Informer
- Informer 会从 Delta FIFO 中取出对象。实现这个功能的方法对应源码中的 processLoop;
- Informer 取出对象后,根据Resource类型,调用对应的 Resource Event Handler 回调函数,该函数实际上由某个具体的 Controller 提供,函数中会获取对象的 key,并将 key 放入到 该Controller 内部的 Workqueue 中,等候处理。
-
Indexer 和 Thread Safe Store
- Indexer 会提供对象的索引功能,通常是基于对象Key来创建索引。默认索引函数是MetaNamespaceKeyFunc, 它生成的索引键为/格式。
- Indexer 维护着一个线程安全的 Cache,即 Thread Safe Store。存储的是[对象key, 对象],用对象Key可以进行获取对象实例。
-
Resource Event Handlers reference
- 这实际上是所有Controller的Resource Event Handlers的引用。
- 这些 handlers 由具体的Controller提供,就是 Informer 的回调函数。Informer 会根据资源的类型,调用对应Controller的 handler 方法
- handler 通常都是用于将资源对象的key放入到 该Controller 内部的 Workqueue 中,等候处理。
-
Reflector
-
自定义控制器组件
-
Informer reference
- Informer reference 是 Informer 实例对象的引用,用于操作和处理自定义资源对象
- 我们编写自定义控制器时,需要引用自己需要的Informer,向其中加入一系列 Resource Event Handlers
-
Indexer reference
- Indexer reference 是 Indexer实例对象的引用,用于根据对象Key索引资源对象
- 我们编写自定义控制器时,应该创建Indexer的引用,将对象Key传给它,就可以获取想要处理的对象
-
NewIndexerInformer函数
- client-go中的基本控制器提供了 NewIndexerInformer 函数,用于创建Informer和Indexer。
- 可以直接使用NewIndexerInformer 函数,或者也可以使用工厂方法来创建Informer
-
Resource Event Handlers
- 由具体的 Controller 给 Client-go 的Informer 提供的回调函数,获取待处理对象的key,并将key放入到Workqueue中。
-
Workqueue
- 此队列是 具体的Controller 内部创建的队列,用于暂时存储从Resource event handler 中 传递过来的,待处理对象的Key。
- Resource event handler 函数通常会获取待处理对象的key,并将key放入到这个workqueue中。
-
Process Item
- 这个函数为循环函数,它不断从 Work queue 中取出对象的key,并使用 Indexer Reference 获取这个key对应的具体资源对象,然后根据资源的变化,做具体的调谐 Reconcile 动作。
-
Informer reference
3.3.使用client-go编写Controller的步骤
- 根据2.2中所述,编写一个 自定义Controller,需要实现如下功能。
- 先从client-go中获取对应资源的 Informer
- 提供 一系列的 Resource event handlers,并加入对应的Informer,供该informer回调
- 提供一个 Workqueue 队列,存储待处理的对象的Key
- 提供一个 循环函数 ProcessItem,不断从 Workqueue 中取出对象的key,交给 处理函数 Worker
- 提供一个 处理函数 Worker,根据对象Key,使用对应资源的Indexer,获取到该对象的实例,根据对象的属性变化,做真正的调谐过程。
4.client-go的client组件
4.1.Client的4种类型
- 我们知道,client-go 可以实现与 kubernetes 的通信。如何实现的呢?
- client-go 主要提供了4种 client 组件:
-
RESTClient
:最基础的客户端,提供最基本的封装,可以通过它组装与API Server即时通讯时 的 url -
Clientset
:是一个Client的集合,在Clientset中包含了所有K8S内置资源 的 Client,通过Clientset便可以很方便的操作如Pod、Service这些资源 -
dynamicClient
:动态客户端,可以操作任意K8S的资源,包括CRD定义的资源 -
DiscoveryClient
:用于发现K8S提供的资源组、资源版本和资源信息,比如:kubectl api-resources
-
- 4种client分别对应源码目录:
4.2.RESTClient详解
4.2.1.RESTClient 是什么
- RESTClient 是最基础的客户端,提供与APIServer通信的最基本封装,可以向APIServer发送 Restful 风格请求。
- 之所以称 RESTClient 是 最基础的客户端,是因为其他三种Client,其实都是 RESTClient 的再封装,内部都使用了 RESTClient。
4.2.2.RESTClient结构体
type RESTClient struct {
// base is the root URL for all invocations of the client
base *url.URL
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// content describes how a RESTClient encodes and decodes responses.
content ClientContentConfig
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
rateLimiter flowcontrol.RateLimiter
// warningHandler is shared among all requests created by this client.
// If not set, defaultWarningHandler is used.
warningHandler WarningHandler
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}
4.2.3.RESTClient常用方法
-
RESTClientFor()
- 位置:rest/config.go 文件
- 函数签名:
func RESTClientFor(config *Config) (*RESTClient, error)
,直接 rest点 调用 - 该方法是用于 创建一个 RESTClient 实例
- 接收一个
rest.Config
类型参数,Config中包含了 限速器、编解码器 等- RESTClientFor 方法内部会从 Config 中取出这些配置,设置给RESTClient 实例
- 这样RESTClient 实例就具有了 限速、编解码 等多种功能
- 因此,我们创建Config的时候,可以手动设置这些功能,下面的示例中会展示。
-
RESTClient实例的常用方法
- /rest/client.go 中有一个接口 Interface
// Interface captures the set of operations for generically interacting with Kubernetes REST apis. type Interface interface { GetRateLimiter() flowcontrol.RateLimiter Verb(verb string) *Request Post() *Request Put() *Request Patch(pt types.PatchType) *Request Get() *Request Delete() *Request APIVersion() schema.GroupVersion }
- RESTClient 实现了这个接口,因此具有所有的方法,用于发送各种类型的请求
- 另外,Interface 每个方法的返回值都是 Request 类型,Request 类型的各种方法,很多的返回值也是 Request,这样就可以实现 链式编程 了
- /rest/client.go 中有一个接口 Interface
4.2.4.RESTClient的一些其他知识点(建议看一遍)
4.2.4.1.Request 和 Result 常用方法
-
Request 位于 /rest/request.go
-
func (r *Request) Namespace(namespace string) *Request
:设置 当前Resquest 访问的 namespace -
func (r *Request) Resource(resource string) *Request
:设置 当前Resquest 想要访问的资源类型 -
func (r *Request) Name(resourceName string) *Request
:设置 当前Resquest 想要访问的资源的名称 -
func (r *Request) Do(ctx context.Context) Result
:格式化并执行请求。返回一个 Result 对象,以便于处理响应。
-
-
Result 也位于 /rest/request.go
4.2.4.2.rest.Config 结构体
- 位于 rest/config.go 中,用于描述 kubernetes客户端的通用属性
type Config struct { // API 服务器的主机地址,格式为 https://<hostname>:<port>。默认情况下,它为空字符串,表示使用当前上下文中的集群配置。 Host string // 指定 API 服务器的路径,目前只有两种取值:/api、/apis // - /api:访问core API 组资源时,其实group值为空 // - /apis:访问其他 API 组资源时,都是apis,他们都有group值 APIPath string // 对请求内容的配置,会影响对象在发送到服务器时的转换方式 // - ContentConfig中有两个重要属性: // - NegotiatedSerializer:用于序列化和反序列化请求和响应的接口 // - GroupVersion:请求资源的 API 组和版本 ContentConfig // 用于进行基本身份验证的用户名的字符串 Username string // 用于进行基本身份验证的密码的字符串 Password string `datapolicy:"password"` // 用于进行身份验证的令牌的字符串 BearerToken string `datapolicy:"token"` // 包含身份验证令牌的文件的路径 BearerTokenFile string // TLS 客户端配置,包括证书和密钥 TLSClientConfig // 每秒允许的请求数(Queries Per Second)。默认为 5.0。 QPS float32 // 突发请求数。默认为 10 Burst int // 速率限制器,用于控制向 API 服务器发送请求的速率 RateLimiter flowcontrol.RateLimiter // 与 API 服务器建立连接的超时时间 Timeout time.Duration // 用于创建网络连接的 Dial 函数 Dial func(ctx context.Context, network, address string) (net.Conn, error) // ...... }
4.2.4.3.tools/clientcmd 工具
- 源码位于 client-go/tools/clientcmd 包下
- clientcmd 是 Kubernetes Go 客户端库(client-go)中的一个包,用于加载和解析 Kubernetes 配置文件,并辅助创建与 Kubernetes API 服务器进行通信的客户端。
- clientcmd 提供了一些功能,使得在客户端应用程序中处理 Kubernetes 配置变得更加方便。主要包含以下几个方面的功能:
- 加载配置文件:clientcmd 可以根据指定的路径加载 Kubernetes 配置文件,例如 kubeconfig 文件。
- 解析配置文件:一旦加载了配置文件,clientcmd 提供了解析配置文件的功能,可以获取各种配置信息,如集群信息、认证信息、上下文信息等。
- 辅助创建客户端:clientcmd 可以使用配置文件中的信息,辅助创建与 Kubernetes API 服务器进行通信的客户端对象。这些客户端对象可以用来执行对 Kubernetes 资源的增删改查操作。
- 切换上下文:clientcmd 还支持在多个上下文之间进行切换。上下文表示一组命名空间、集群和用户的组合,用于确定客户端与哪个Kubernetes 环境进行通信。
4.2.5.RESTClient使用示例
- 需求:获取default命名空间下的所有pod,并打印所有pod的name
- 首先到kubernetes的官方API文档中,查看 请求url、响应
- https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#pod-v1-core
- 或者:https://v1-24.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#list-pod-v1-core
- 代码编写
package main import ( "context" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) func main() { // 在你机器的homeDir下,放入集群的config文件,用于连接集群(可以直接从集群master的~/.kube/config拷贝过来) // clientcmd是位于client-go/tools/clientcmd目录下的工具 config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) } // 设置默认 GroupVersion(我要操作的是pod,不属于任何的group,所以使用了SchemeGroupVersion。你要操作什么,就写什么GroupVersion即可) config.GroupVersion = &v1.SchemeGroupVersion // 设置序列化/反序列化器(后面的 Into方法 就是使用它完成 反序列化 的) config.NegotiatedSerializer = scheme.Codecs // 设置 API 根的子路径(我们操作的是pod,属于core资源,所以设置为/api) config.APIPath = "/api" // 创建一个 RESTClient restClient, err := rest.RESTClientFor(config) if err != nil { panic(err) } // 创建一个Pod,用于接收请求结果 pods := v1.PodList{} // 链式编程 发送请求,并反序列化结果到pod中 err = restClient.Get().Namespace(v1.NamespaceDefault).Resource("pods").Do(context.TODO()).Into(&pods) if err != nil { panic(err) } // 打印pod名称 for _, pod := range pods.Items { println(pod.Name) } }
- 输出结果
cassandra-5hbf7 liveness-exec mysql-87pgn myweb-7f8rh myweb-rjblc nginx-pod-node1
4.3.Clientset详解
4.3.1.Clientset是什么
- 结论:Clientset 是 一系列 RESTClient 的 集合。
- 从4.2.4的 RESTClient 使用示例来看,使用 RESTClient 操作kubernetes资源,太麻烦了
- 要操作 pods,需要指定config,给config设置 APIPath 为 “/api”、设置序列化器、设置 GroupVersion,最后还要调用 rest.RESTClientFor(config) 得到一个 用于操作pods的Clientset
- 而如果我要操作 deployment,这个过程又需要写一遍,然后又得到一个 用于操作deployment的Clientset
- 代码冗余,不优雅,而且到处创建Clientset,耗时又浪费资源
- 因此,就有了事先创建 各种资源的RESTClient,存起来备用的需求。Clientset就是这样封装起来的一个set集合。
4.3.2.Clientset的结构体
- 位于
/kubernetes/clientset.go
中 - Clientset结构体:
type Clientset struct { ...... appsV1 *appsv1.AppsV1Client appsV1beta1 *appsv1beta1.AppsV1beta1Client appsV1beta2 *appsv1beta2.AppsV1beta2Client authenticationV1 *authenticationv1.AuthenticationV1Client authenticationV1alpha1 *authenticationv1alpha1.AuthenticationV1alpha1Client authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client authorizationV1 *authorizationv1.AuthorizationV1Client authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client autoscalingV1 *autoscalingv1.AutoscalingV1Client autoscalingV2 *autoscalingv2.AutoscalingV2Client autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client batchV1 *batchv1.BatchV1Client batchV1beta1 *batchv1beta1.BatchV1beta1Client certificatesV1 *certificatesv1.CertificatesV1Client certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client certificatesV1alpha1 *certificatesv1alpha1.CertificatesV1alpha1Client coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client coordinationV1 *coordinationv1.CoordinationV1Client coreV1 *corev1.CoreV1Client ...... }
- 以
appsv1
的类型*appsv1.AppsV1Client
举例:可以看到,内部包含了一个 restClient。这也进一步认证,Clientset 就是一系列 RESTClient 的集合。type AppsV1Client struct { restClient rest.Interface }
4.3.3.Clientset的常用方法
4.3.3.1.NewForConfig()方法
- 位于
/kubernetes/clientset.go
中,所以可以直接使用 kubernetes.NewForConfig() 使用 - 用于创建一个Clientset,传入一个rest.Config配置对象
func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.UserAgent == "" { configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent() } // share the transport between all clients httpClient, err := rest.HTTPClientFor(&configShallowCopy) if err != nil { return nil, err } // 这个方法,就完成了所有 RESTClient 的创建 return NewForConfigAndClient(&configShallowCopy, httpClient) } func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { if configShallowCopy.Burst <= 0 { return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset var err error // 下面就是创建各种 RESTClient 了,创建结果,被保存到 cs 中 cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient) if err != nil { return nil, err } cs.admissionregistrationV1alpha1, err = admissionregistrationv1alpha1.NewForConfigAndClient(&configShallowCopy, httpClient) if err != nil { return nil, err } cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfigAndClient(&configShallowCopy, httpClient) if err != nil { return nil, err } ...... return &cs, nil }
4.3.3.2.Clientset的实例方法
- Clientset 实现了 /kubernetes/clientset.go 下的 Interface接口,将自己内部的 私有属性 供外部使用
- Interface 接口源码
type Interface interface { ...... AppsV1() appsv1.AppsV1Interface AppsV1beta1() appsv1beta1.AppsV1beta1Interface AppsV1beta2() appsv1beta2.AppsV1beta2Interface AuthenticationV1() authenticationv1.AuthenticationV1Interface AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface AuthorizationV1() authorizationv1.AuthorizationV1Interface AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface AutoscalingV1() autoscalingv1.AutoscalingV1Interface AutoscalingV2() autoscalingv2.AutoscalingV2Interface AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface BatchV1() batchv1.BatchV1Interface BatchV1beta1() batchv1beta1.BatchV1beta1Interface CertificatesV1() certificatesv1.CertificatesV1Interface CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface CoordinationV1() coordinationv1.CoordinationV1Interface CoreV1() corev1.CoreV1Interface ...... }
- 以
AppsV1()
方法为例,返回值是接口appsv1.AppsV1Interface
的实现类appsv1.AppsV1Client
的对象// 接口 type AppsV1Interface interface { RESTClient() rest.Interface ControllerRevisionsGetter DaemonSetsGetter DeploymentsGetter ReplicaSetsGetter StatefulSetsGetter } // 实现类 type AppsV1Client struct { restClient rest.Interface } // AppsV1Client 实现 AppsV1Interface 接口的方法 func (c *AppsV1Client) RESTClient() rest.Interface { if c == nil { return nil } return c.restClient }
-
appsv1.AppsV1Client
的其他实例方法
- 以 appsv1.AppsV1Client.Deployments() 方法举例
- Deployments() 方法源码
// 返回值是DeploymentInterface func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface { // 实际上,返回值是 DeploymentInterface 的实现类 deployments 的对象 return newDeployments(c, namespace) } // 构造一个 deployments 的对象 func newDeployments(c *AppsV1Client, namespace string) *deployments { return &deployments{ client: c.RESTClient(), ns: namespace, } }
- 返回值:
DeploymentInterface
接口源码,可以看到包含操作Deployment的各种方法type DeploymentInterface interface { Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error) Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error) UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error) List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error) Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error) ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error) GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error) UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error) DeploymentExpansion }
- Deployments() 方法源码
- 挑选 DeploymentInterface.Create 方法,查看 实现类 deployments 的 Create实现
- 可以看出,Create方法的内容,就跟我们4.2.4中使用 RESTClient 的方式差不多
- 这更加印证了,Clientset 就是 对各种 GroupVersion 的 RESTClient 的封装
func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) { result = &v1.Deployment{} err = c.client.Post(). Namespace(c.ns). Resource("deployments"). VersionedParams(&opts, scheme.ParameterCodec). Body(deployment). Do(ctx). Into(result) return }
- 实际上,这些方法都不是人工写的,都是 code-generator 自动生成的
- code-generator 提供了很多工具用于为k8s中的资源生成相关代码,其中包括一个 client-gen
- client-gen:可以为资源生成标准的操作方法(get;list;watch;create;update;patch;delete)
- 比如,在kuberentes源码
staging/src/k8s.io/api/core/v1/types.go
中,可以看到 type Pod struct 注释上,就使用了 genclient 的标记// +genclient // +genclient:method=UpdateEphemeralContainers,verb=update,subresource=ephemeralcontainers // ...... type Pod struct { ...... }
- client-gen 常用标记
// +genclient - 生成默认的客户端动作函数(create, update, delete, get, list, update, patch, watch以及 是否生成updateStatus取决于.Status字段是否存在)。 // +genclient:nonNamespaced - 所有动作函数都是在没有名称空间的情况下生成 // +genclient:onlyVerbs=create,get - 指定的动作函数被生成. // +genclient:skipVerbs=watch - 生成watch以外所有的动作函数. // +genclient:noStatus - 即使.Status字段存在也不生成updateStatus动作函数
4.3.4.Clientset使用示例
- 需求:获取default命名空间下的pod列表,并获取kube-system命名空间下的deploy列表
- 从下面代码来看,创建了一个 clientset,就可以操作不同 GroupVersion 下的 不同资源,也无需再去手动指定 APIPath 等值了
- 代码编写
package main import ( "context" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) func main() { // 同样是先 创建一个客户端配置config config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) } // 使用 kubernetes.NewForConfig(),创建一个ClientSet对象 clientSet, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } // 1、从 clientSet 中调用操作pod的 RESTClient,获取default命名空间下的pod列表 pods, err := clientSet.CoreV1().Pods(v1.NamespaceDefault).List(context.TODO(), v1.ListOptions{}) if err != nil { panic(err) } // 打印pod名称 for _, pod := range pods.Items { println(pod.Name) } println("------") // 2、从 clientSet 中调用操作 deploy 的 RESTClient,获取kube-system命名空间下的deploy列表 deploys, err := clientSet.AppsV1().Deployments("kube-system").List(context.TODO(), v1.ListOptions{}) if err != nil { panic(err) } // 打印 deploy 名称 for _, deploy := range deploys.Items { println(deploy.Name) } }
- 输出结果
cassandra-5hbf7 liveness-exec mysql-87pgn myweb-7f8rh myweb-rjblc nginx-pod-node1 ------ coredns default-http-backend metrics-server
4.4.DynamicClient详解
4.4.1.DynamicClient是什么
- 由4.3.1中可知,Clientset是一系列RESTClient的集合,创建一个 Clientset 实例,其中已经包含了所有kubernetes内置资源的RESTClient,可是对于CustomResource(CR)资源,Clientset就没有操作它们的RESTClient了。
- 因此,client-go特意提供了一种 客户端,即 DynamicClient,可以操作任意的kubernetes资源,包括 内置资源 + CR 资源
4.4.2.DynamicClient结构体分析
- 位于
/client-go/dynamic/simple.go
- DynamicClient结构体
- 可以看到,DynamicClient 中只包含一个RESTClient的字段 client
- client 类型为
rest.Interface
,即为 RESTClient实现的那个接口,可以在4.2.2中看到
type DynamicClient struct { client rest.Interface }
4.4.3.DynamicClient的常用方法
4.4.3.1.dynamic.NewForConfig()方法
- 位于
/client-go/dynamic/simple.go
,是一个函数,位于dynamic包下,所以直接点就可以使用 - 该方法用于创建一个 DynamicClient 实例,入参也是
*rest.Config
类型func NewForConfig(inConfig *rest.Config) (*DynamicClient, error) { config := ConfigFor(inConfig) httpClient, err := rest.HTTPClientFor(config) if err != nil { return nil, err } return NewForConfigAndClient(config, httpClient) }
4.4.3.2.DynamicClient 的 实例方法
- 结论:
-
DynamicClient 只有一个实例方法:
Resource(resource schema.GroupVersionResource)
,用于指定当前 DynamicClient 要操作的究竟是什么类型。
-
DynamicClient 只有一个实例方法:
- 下面进行源码分析。
- Resource()其实是 DynamicClient 实现了接口
dynamic.Interface
- 接口中只有一个 Resource()
- 参数是一个 GVR,指定group、version、resource,就可以确定到底是哪个资源了
- 返回值为 NamespaceableResourceInterface 接口类型
type Interface interface { Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface }
- DynamicClient 实现了这个接口,自然也实现了Resource这个方法
- 下面代码返回的是 NamespaceableResourceInterface 接口的实现类型 dynamicResourceClient 的对象
- 这个对象就是我们最终可以用于操作
你所指定资源
的 Client 了
func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface { return &dynamicResourceClient{client: c, resource: resource} }
- dynamicResourceClient 类型
- 可见,dynamicResourceClient 包含了DynamicClient对象、要操作的类型的GVR、还有资源的ns
type dynamicResourceClient struct { client *DynamicClient namespace string resource schema.GroupVersionResource }
- 可以调用它的Namespace方法,为namespace字段赋值
- 这个方法就是 NamespaceableResourceInterface 接口提供的
func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface { ret := *c ret.namespace = ns return &ret }
4.4.4.认识kubrenetes 的 Unstructured
4.4.4.1.为什么需要 Unstructured
- 前面我们使用 RESTClient 或 Clientset,调用的是某个资源的 Get、List 等方法,返回值都是确定的。比如 调用Pod资源的List,我们能够确定返回的一定是PodList,所以Pod的List方法,就写死了,返回值就是PodList
- 可当使用 DynamicClient 时,DynamicClient 没有确定的资源,资源类型是我们使用的时候才会去指定GVR的,所以 开发DynamicClient 的时候,Get 方法自然无法确定返回值类型。
- 那么kubernetes人员应该如何开发这个Get方法?
- 我们自然而然的能够想到,如果有一个通用的数据结构,可以表示所有类型的资源,问题就解决了
- Unstructured 应运而生。它就是一个 可以表示所有资源的类型。
4.4.4.2.Unstructured源码
- 位于
staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unsructured/unsructured.go
中 - Unstructured 结构
type Unstructured struct { // Object is a JSON compatible map with string, float, int, bool, []interface{}, or // map[string]interface{} // children. Object map[string]interface{} }
- Unstructured 包含一个 map,键为string,value为interface{},因此value可以为任意类型
4.4.4.3.dynamicResourceClient对于 Unstructured 的应用
- 在
4.4.3.2.DynamicClient 的 实例方法
中我们知道,我们最终操作资源,其实使用的是dynamicResourceClient
- dynamicResourceClient 实现了 ResourceInterface 接口的所有方法,这些方法就是我们最终操作资源所调用的。下面我们看一下 ResourceInterface 接口的所有方法签名
type ResourceInterface interface { Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error) Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error) ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error) }
- 可以看到:
- Create方法因为不知道要创建什么资源,所以参数接收的是
obj *unstructured.Unstructured
- Get方法因为不知道返回的是什么资源,所以返回的是
*unstructured.Unstructured
- List方法因为不知道返回的是什么资源列表,所以返回的是
*unstructured.UnstructuredList
。这个结构里面包含一个[]unstructured.Unstructured
- Create方法因为不知道要创建什么资源,所以参数接收的是
4.4.4.4.Unstructured 与 资源对象的相互转换
- 既然 dynamicResourceClient 的方法 接收和返回的,很多都是 Unstructured 类型,那么我们就需要实现 真正的资源对象 与 Unstructured 的 相互转换
- runtime包下,给我们提供了一个 UnstructuredConverter 接口,接口中提供了两个方法,分别用于
资源对象-->Unstructured
和Unstructured-->资源对象
- 位于
staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
type UnstructuredConverter interface { ToUnstructured(obj interface{}) (map[string]interface{}, error) FromUnstructured(u map[string]interface{}, obj interface{}) error }
- 位于
- UnstructuredConverter 接口只有一个实现类 unstructuredConverter
- 位于
staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
type unstructuredConverter struct { // If true, we will be additionally running conversion via json // to ensure that the result is true. // This is supposed to be set only in tests. mismatchDetection bool // comparison is the default test logic used to compare comparison conversion.Equalities }
- unstructuredConverter 实现了UnstructuredConverter 接口的 两个方法
-
func (c *unstructuredConverter) ToUnstructured(obj interface{}) (map[string]interface{}, error)
:资源对象–>Unstructured -
func (c *unstructuredConverter) FromUnstructured(u map[string]interface{}, obj interface{}) error
:Unstructured–>资源对象
-
- 位于
- 不过,unstructuredConverter 类型开头小写,我们无法直接使用,kubernetes 创建了一个全局变量 DefaultUnstructuredConverter,类型就是unstructuredConverter,用以供外界使用
- 位于
staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
var ( ...... // DefaultUnstructuredConverter performs unstructured to Go typed object conversions. DefaultUnstructuredConverter = &unstructuredConverter{ mismatchDetection: parseBool(os.Getenv("KUBE_PATCH_CONVERSION_DETECTOR")), comparison: conversion.EqualitiesOrDie( func(a, b time.Time) bool { return a.UTC() == b.UTC() }, ), } )
- 位于
- 总结:我们直接使用
runtime.DefaultUnstructuredConverter
,调用它的 ToUnstructured 或 FromUnstructured 方法,就可以实现 Unstructured 与 资源对象的相互转换 了
4.4.5.DynamicClient使用示例
- 需求:获取 kube-system 命名空间下 name=coredns 的 deploy 对象
- 步骤
- 同样是先 创建一个客户端配置config
- 使用 dynamic.NewForConfig(),创建一个 DynamicClient 对象
- 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client
- 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象
- 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象
- 代码编写
package main import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { // 同样是先 创建一个客户端配置config config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) } // 使用 dynamic.NewForConfig(),创建一个 DynamicClient 对象 dynamicClient, err := dynamic.NewForConfig(config) if err != nil { panic(err) } // 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client dynamicResourceClient := dynamicClient.Resource(schema.GroupVersionResource{ Group: "apps", Version: "v1", Resource: "deployments", }) // 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象 unstructured, err := dynamicResourceClient. Namespace("kube-system"). Get(context.TODO(), "coredns", metav1.GetOptions{}) if err != nil { panic(err) } // 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象 deploy := &appsv1.Deployment{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), deploy) if err != nil { panic(err) } // 打印 deploy 名称和命名空间 fmt.Printf("deploy.Name: %s\ndeploy.namespace: %s", deploy.Name, deploy.Namespace) }
- 输出结果
deploy.Name: coredns deploy.namespace: kube-system
4.5.DiscoveryClient详解
4.5.1.DiscoveryClient是什么
- 我们前面学习的3种Client:RESTClient、Clientset、DynamicClient,都是用来操作 kubernetes 资源的,我们目前还缺少一个用于检索 kuberentes资源的 Client
- DiscoveryClient 就是这样一个 Client。用于检索 kuberentes集群中支持的 API 资源的相关信息,例如版本、组、资源类型等。
- DiscoveryClient 提供了一组方法来查询和获取这些信息,以便在编写 Controller 或 Operator 时,能够动态地了解集群中可用的资源。
4.5.2.DiscoveryClient结构体
- 位于
staging/src/k8s.io/client-go/discovery/discovery_client.go
- DiscoveryClient 结构体很简单
- LegacyPrefix 字段表示旧版本资源的访问前缀,一般值都是/api。Kubernetes 1.16 以前,资源的访问前缀都是 /api,1.16及之后,全面改成 /apis,为了兼容旧资源,这里特意保存了一个常量字符串 /api
type DiscoveryClient struct { restClient restclient.Interface LegacyPrefix string // Forces the client to request only "unaggregated" (legacy) discovery. UseLegacyDiscovery bool }
4.5.3.DiscoveryClient的常用方法
4.5.3.1.discovery.NewDiscoveryClientForConfig()方法
- 位于
/client-go/discovery/discovery_client.go
,是一个函数,位于discovery包下,所以直接点就可以使用 - 该方法用于创建一个 DiscoveryClient 实例,入参也是
*rest.Config
类型func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) { config := *c if err := setDiscoveryDefaults(&config); err != nil { return nil, err } httpClient, err := restclient.HTTPClientFor(&config) if err != nil { return nil, err } return NewDiscoveryClientForConfigAndClient(&config, httpClient) }
4.5.3.2.DiscoveryClient的实例方法
- DiscoveryClient 实现了接口的所有方法,用于获取API资源的各种信息
- ServerGroupsInterface等这些接口,内部也有很多方法
type DiscoveryInterface interface { RESTClient() restclient.Interface ServerGroupsInterface ServerResourcesInterface ServerVersionInterface OpenAPISchemaInterface OpenAPIV3SchemaInterface // Returns copy of current discovery client that will only // receive the legacy discovery format, or pointer to current // discovery client if it does not support legacy-only discovery. WithLegacy() DiscoveryInterface }
4.5.4.CachedDiscoveryClient 和 DiscoveryClient 的区别
- 在查看 DiscoveryInterface 的时候,除了
DiscoveryClient
,还有一个实现类CachedDiscoveryClient
- 顾名思义,CachedDiscoveryClient 就是实现了缓存机制的 DiscoveryClient 的封装
- CachedDiscoveryClient 在 DiscoveryClient 的基础上增加了一层缓存,用于缓存获取的资源信息,以减少对 API Server 的频繁请求。
- 在首次调用时,CachedDiscoveryClient 会从 API Server 获取资源信息,并将其缓存在本地。之后的调用会直接从缓存中获取资源信息,而不需要再次向 API Server 发送请求。
- 因为 集群部署完成后,API 资源基本很少变化,所以缓存下来可以很好的提高请求效率。
- kubectl 工具内部,kubectl api-versions命令其实就是使用了这个CachedDiscoveryClient,所以多次执行kubectl api-versions命令,其实只有第一次请求了API Server,后续都是直接使用的 本地缓存。
4.5.5.DiscoveryClient 使用示例
- 需求:获取 当前kubernetes集群 的 所有资源列表
- 代码编写
func main() { // 1、先创建一个客户端配置config config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err.Error()) } // 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err.Error()) } // 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表 _, resourceLists, err := discoveryClient.ServerGroupsAndResources() if err != nil { panic(err.Error()) } // 4、遍历资源列表,打印出资源组和资源名称 for _, resource := range resourceLists { fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion) for _, resource := range resource.APIResources { fmt.Printf("resource name: %s\n", resource.Name) } fmt.Println("--------------------------") } }
- 输出结果
resource groupVersion: v1 resource name: bindings resource name: componentstatuses resource name: configmaps resource name: endpoints resource name: events resource name: limitranges resource name: namespaces resource name: namespaces/finalize resource name: namespaces/status resource name: nodes resource name: nodes/proxy resource name: nodes/status resource name: persistentvolumeclaims resource name: persistentvolumeclaims/status resource name: persistentvolumes resource name: persistentvolumes/status resource name: pods resource name: pods/attach resource name: pods/binding resource name: pods/eviction resource name: pods/exec resource name: pods/log resource name: pods/portforward resource name: pods/proxy resource name: pods/status resource name: podtemplates resource name: replicationcontrollers resource name: replicationcontrollers/scale resource name: replicationcontrollers/status resource name: resourcequotas resource name: resourcequotas/status resource name: secrets resource name: serviceaccounts resource name: services resource name: services/proxy resource name: services/status -------------------------- resource groupVersion: apiregistration.k8s.io/v1 resource name: apiservices resource name: apiservices/status -------------------------- resource groupVersion: apiregistration.k8s.io/v1beta1 resource name: apiservices resource name: apiservices/status -------------------------- resource groupVersion: extensions/v1beta1 resource name: ingresses resource name: ingresses/status -------------------------- resource groupVersion: apps/v1 resource name: controllerrevisions resource name: daemonsets resource name: daemonsets/status resource name: deployments resource name: deployments/scale resource name: deployments/status resource name: replicasets resource name: replicasets/scale resource name: replicasets/status resource name: statefulsets resource name: statefulsets/scale resource name: statefulsets/status -------------------------- resource groupVersion: events.k8s.io/v1beta1 resource name: events -------------------------- resource groupVersion: authentication.k8s.io/v1 resource name: tokenreviews -------------------------- resource groupVersion: authentication.k8s.io/v1beta1 resource name: tokenreviews -------------------------- resource groupVersion: authorization.k8s.io/v1 resource name: localsubjectaccessreviews resource name: selfsubjectaccessreviews resource name: selfsubjectrulesreviews resource name: subjectaccessreviews -------------------------- resource groupVersion: authorization.k8s.io/v1beta1 resource name: localsubjectaccessreviews resource name: selfsubjectaccessreviews resource name: selfsubjectrulesreviews resource name: subjectaccessreviews -------------------------- resource groupVersion: autoscaling/v1 resource name: horizontalpodautoscalers resource name: horizontalpodautoscalers/status -------------------------- resource groupVersion: autoscaling/v2beta1 resource name: horizontalpodautoscalers resource name: horizontalpodautoscalers/status -------------------------- resource groupVersion: autoscaling/v2beta2 resource name: horizontalpodautoscalers resource name: horizontalpodautoscalers/status -------------------------- resource groupVersion: batch/v1 resource name: jobs resource name: jobs/status -------------------------- resource groupVersion: batch/v1beta1 resource name: cronjobs resource name: cronjobs/status -------------------------- resource groupVersion: certificates.k8s.io/v1beta1 resource name: certificatesigningrequests resource name: certificatesigningrequests/approval resource name: certificatesigningrequests/status -------------------------- resource groupVersion: networking.k8s.io/v1 resource name: networkpolicies -------------------------- resource groupVersion: networking.k8s.io/v1beta1 resource name: ingresses resource name: ingresses/status -------------------------- resource groupVersion: policy/v1beta1 resource name: poddisruptionbudgets resource name: poddisruptionbudgets/status resource name: podsecuritypolicies -------------------------- resource groupVersion: rbac.authorization.k8s.io/v1 resource name: clusterrolebindings resource name: clusterroles resource name: rolebindings resource name: roles -------------------------- resource groupVersion: rbac.authorization.k8s.io/v1beta1 resource name: clusterrolebindings resource name: clusterroles resource name: rolebindings resource name: roles -------------------------- resource groupVersion: storage.k8s.io/v1 resource name: csinodes resource name: storageclasses resource name: volumeattachments resource name: volumeattachments/status -------------------------- resource groupVersion: storage.k8s.io/v1beta1 resource name: csidrivers resource name: csinodes resource name: storageclasses resource name: volumeattachments -------------------------- resource groupVersion: admissionregistration.k8s.io/v1 resource name: mutatingwebhookconfigurations resource name: validatingwebhookconfigurations -------------------------- resource groupVersion: admissionregistration.k8s.io/v1beta1 resource name: mutatingwebhookconfigurations resource name: validatingwebhookconfigurations -------------------------- resource groupVersion: apiextensions.k8s.io/v1 resource name: customresourcedefinitions resource name: customresourcedefinitions/status -------------------------- resource groupVersion: apiextensions.k8s.io/v1beta1 resource name: customresourcedefinitions resource name: customresourcedefinitions/status -------------------------- resource groupVersion: scheduling.k8s.io/v1 resource name: priorityclasses -------------------------- resource groupVersion: scheduling.k8s.io/v1beta1 resource name: priorityclasses -------------------------- resource groupVersion: coordination.k8s.io/v1 resource name: leases -------------------------- resource groupVersion: coordination.k8s.io/v1beta1 resource name: leases -------------------------- resource groupVersion: node.k8s.io/v1beta1 resource name: runtimeclasses -------------------------- resource groupVersion: discovery.k8s.io/v1beta1 resource name: endpointslices --------------------------
5.client-go 完整informer机制 原理
5.1.client-go informer机制概述(重要)
5.1.1.informer是我们使用client-go的基本单位
文章来源:https://www.toymoban.com/news/detail-819630.html
- 需要明确一件事:informer才是client-go的基本单位
- 从上面架构图中可以看出,client-go中包含的组件主要有:Reflector、DeltaFIFO、Controller、Indexer、Processer。
- 这个 架构图会迷惑我们,让我们觉得 client-go 只维护了这么一套组件,实现功能。在阅读源码后,我发现,实际上client-go并不是只有一套这种组件,而是以informer作为基本单位,每个informer有自己的一套组件。
- 每种GVR资源,都有自己对应的informer,我们可以选择性的创建informer并启动,进而仅从apiserver ListAndWatch 自己需要的资源就可以了
- 每创建一种资源的informer对象,该对象就携带了自己需要的 Reflector、DeltaFIFO、Controller、Indexer、Processer,启动之后,该 informer 作为一个单独的协程启动,负责自己资源的 ListAndWatch、缓存、事件处理 等。
- 下面我从 informer 的使用角度,印证上面的结论
- 代码来自 概要分析
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { klog.Fatalf("Failed to create config: %v", err) } // 初始化与apiserver通信的clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create client: %v", err) } // 初始化shared informer factory以及pod informer factory := informers.NewSharedInformerFactory(clientset, 30*time.Second) podInformer := factory.Core().V1().Pods() informer := podInformer.Informer() // 注册informer的自定义ResourceEventHandler informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: xxx, UpdateFunc: xxx, DeleteFunc: xxx, }) // 启动shared informer factory,开始informer的list & watch操作 stopper := make(chan struct{}) go factory.Start(stopper) // 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中 // 或者调用factory.WaitForCacheSync(stopper) if !cache.WaitForCacheSync(stopper, informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } // 创建lister podLister := podInformer.Lister() // 从informer中的indexer本地缓存中获取对象 podList, err := podLister.List(labels.Everything()) if err != nil { fmt.Println(err) } }
- 我们重点关注这几句
// 初始化shared informer factory以及pod informer factory := informers.NewSharedInformerFactory(clientset, 30*time.Second) podInformer := factory.Core().V1().Pods() informer := podInformer.Informer() ...... go factory.Start(stopper) ...... podLister := podInformer.Lister() podList, err := podLister.List(labels.Everything())
- SharedInformerFactory是什么,等会再说。只需要知道他是一个工厂,可以创建 informer
-
factory.Core().V1().Pods()
和podInformer.Informer()
,最终就是创建了一个 pod 资源的 informer,该 informer 会自动保存在 factory 中 -
factory.Start
就是把 factory 中已经创建的所有 informer,都启动起来,每个informer就是一个单独的协程,互不影响,各自进行 ListAndWatch -
podInformer.Lister()
和podLister.List()
就是从pod的这个informer中,获取缓存数据。可以看到获取缓存的时候,确定到了某一个具体的 informer
- 代码来自 概要分析
5.1.2.informer 机制的一些重要数据结构
5.1.2.1.informers.SharedInformerFactory
- 从 5.1.1 中可知,我们要使用client-go操作一种资源,需要创建该资源对应的informer,并启动起来。可创建之后,下次再想使用,再创建一次的话,未免太浪费资源了,因此就推出了这么一种结构 SharedInformerFactory
- SharedInformerFactory 主要有两部分功能:
- 作为创建informer的工厂,用于创建所有类型的informer
- SharedInformerFactory 内部 还会缓存所有已经创建过的informer,下次再创建,就会直接使用缓存,不会再创建新的informer,节省了资源
- 结构体源码
type sharedInformerFactory struct { // 这个client,是clientset类型的客户端,用于与apiserver交互 client kubernetes.Interface // 限制 当前SharedInformerFactory 所创建的 Informer 只关注指定命名空间中的资源变化 namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration // 缓存已经创建的全部informer informers map[reflect.Type]cache.SharedIndexInformer // 缓存已经启动的 informer,只存 类型:是否启动 startedInformers map[reflect.Type]bool }
- SharedInformerFactory 缓存了创建的所有 informer,方便一次性启动所有已创建的 informer。SharedInformerFactory 提供了一个 Start 方法,用于启动所有的 informer
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() if f.shuttingDown { return } for informerType, informer := range f.informers { if !f.startedInformers[informerType] { f.wg.Add(1) // We need a new variable in each loop iteration, // otherwise the goroutine would use the loop variable // and that keeps changing. informer := informer // 可以看到,每个informer,都使用一个单独的协程启动的 go func() { defer f.wg.Done() informer.Run(stopCh) }() f.startedInformers[informerType] = true } } }
5.1.2.2.cache.SharedIndexInformer
- cache.SharedIndexInformer 接口,SharedInformerFactory中保存的就是这个类型
type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer }
- sharedIndexInformer 类型,是 SharedIndexInformer 接口的实现类,这个就是我们说的 client-go 的基本单位,里面存储着informer的 indexer、controller、processor、listerWatcher 等
type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher // objectType is an example object of the type this informer is // expected to handle. Only the type needs to be right, except // that when that is `unstructured.Unstructured` the object's // `"apiVersion"` and `"kind"` must also be right. objectType runtime.Object // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default // value). defaultEventHandlerResyncPeriod time.Duration // clock allows for testability clock clock.Clock started, stopped bool startedLock sync.Mutex // blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex // Called whenever the ListAndWatch drops the connection with an error. watchErrorHandler WatchErrorHandler }
5.1.2.3.XxxInformer
- 在 5.1.1 中的示例中,我们写了这么两句代码
podInformer := factory.Core().V1().Pods() informer := podInformer.Informer()
- 其中
factory.Core().V1().Pods()
返回的是PodInformer
接口类型,位于informers/core/v1/pod.go
中type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister }
- informers包下,给每一种GVR,都提供了一个接口类型,用于表示这种 GVR 资源的informer。比如还有 NodeInformer、NamespaceInformer、SecretInformer等
- 不过这个 PodInformer 并不是我们直接使用的,我们最终使用的还是 cache.SharedIndexInformer 类型,所以 这些资源 xxxInformer 接口,都提供了一个
Informer() cache.SharedIndexInformer
方法,用于获取一个为该资源工作的cache.SharedIndexInformer
- 因此,这些资源 xxxInformer 接口,其实就是对 cache.SharedIndexInformer 类型的封装。
- client-go源码中,/client-go/informers 包下,就是factory结构+所有GVR资源的 informer结构
5.2.优质博客推荐
看完 5.1 的概述,其他源码分析可以先直接看 下面的一些博客。我有时间也会继续分析文章来源地址https://www.toymoban.com/news/detail-819630.html
- 一图读懂k8s informer client-go
- 概要分析
- 初始化与启动分析
-
Reflector源码分析
- 补充:reflector源码分析
- DeltaFIFO源码分析
- Controller&Processor源码分析
- Indexer源码分析
6.实战:client-go开发自定义控制器
6.1.需求说明
- 我们希望实现这个效果:
- 创建或更新Service的时候,如果这个Service的Annotations中,包含了"ingress/http:true",那么在创建或更新这个Service的时候,会自动为它创建一个Ingress。
- 删除Service的时候,如果这个Service的Annotations中,包含了"ingress/http:true",那么同时也要删除它的 Ingress。
- 需求分析:
- 这个效果需要编写三个事件处理方法,addService、updateService、deleteIngress
- addService/updateService:用户创建或更新service的时候,kubernetes的ServiceController已经完成service的创建或更新了。我们要做的是拿到已存在的service对象,看是否包含 “ingress/http:true”。如果包含,则保证有一个对应的ingress;如果不包含,则保证不能有对应的ingress
- deleteIngress:用于删除ingress的时候,也触发addService/updateService一样的逻辑,保证service和ingress的对应关系是正确的。
- 你可能会疑问,为什么没有 deleteService?
- 因为我们会使用 OwnerReferences 将service+ingress关联起来。因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc,所以删除service时我们无需特殊处理。
6.2.代码编写
- 源码仓库:https://github.com/graham924/share-code-operator-study
- 位于 addingress 目录下
- 位于 addingress 目录下
- main.go 代码如下
package main import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "log" "share-code-operator-study/addingress/pkg" ) func main() { // 创建一个 集群客户端配置 config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { inClusterConfig, err := rest.InClusterConfig() if err != nil { log.Fatalln("can't get config") } config = inClusterConfig } // 创建一个 clientset 客户端,用于创建 informerFactory clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 创建一个 informerFactory factory := informers.NewSharedInformerFactory(clientset, 0) // 使用 informerFactory 创建Services资源的 informer对象 serviceInformer := factory.Core().V1().Services() // 使用 informerFactory 创建Ingresses资源的 informer对象 ingressInformer := factory.Networking().V1().Ingresses() // 创建一个自定义控制器 controller := pkg.NewController(clientset, serviceInformer, ingressInformer) // 创建 停止channel信号 stopCh := make(chan struct{}) // 启动 informerFactory,会启动已经创建的 serviceInformer、ingressInformer factory.Start(stopCh) // 等待 所有informer 从 etcd 实现全量同步 factory.WaitForCacheSync(stopCh) // 启动自定义控制器 controller.Run(stopCh) }
- controller.go文件如下
package pkg import ( "context" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" informercorev1 "k8s.io/client-go/informers/core/v1" informernetv1 "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/kubernetes" listercorev1 "k8s.io/client-go/listers/core/v1" listernetv1 "k8s.io/client-go/listers/networking/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "reflect" "time" ) const ( // worker 数量 workNum = 5 // service 指定 ingress 的 annotation key annoKey = "ingress/http" // 调谐失败的最大重试次数 maxRetry = 10 ) // 自定义控制器 type controller struct { client kubernetes.Interface serviceLister listercorev1.ServiceLister ingressLister listernetv1.IngressLister queue workqueue.RateLimitingInterface } // NewController 创建一个自定义控制器 func NewController(clientset *kubernetes.Clientset, serviceInformer informercorev1.ServiceInformer, ingressInformer informernetv1.IngressInformer) *controller { // 控制器中,包含一个clientset、service和ingress的缓存监听器、一个workqueue c := controller{ client: clientset, serviceLister: serviceInformer.Lister(), ingressLister: ingressInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"), } // 为 serviceInformer 添加 ResourceEventHandler serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // 添加service时触发 AddFunc: c.addService, // 修改service时触发 UpdateFunc: c.updateService, // 这里没有删除service的逻辑,因为我们会使用 OwnerReferences 将service+ingress关联起来。 // 因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc }) // 为 ingressInformer 添加 ResourceEventHandler ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // 删除ingress时触发 DeleteFunc: c.deleteIngress, }) return &c } // 添加service时触发 func (c *controller) addService(obj interface{}) { // 将 添加service 的 key 加入 workqueue c.enqueue(obj) } // 修改service时触发 func (c *controller) updateService(oldObj interface{}, newObj interface{}) { // 如果两个对象一致,就无需触发修改逻辑 if reflect.DeepEqual(oldObj, newObj) { return } // todo 比较annotation // 将 修改service 的 key 加入 workqueue c.enqueue(newObj) } // 删除ingress时触发 func (c *controller) deleteIngress(obj interface{}) { // 将对象转成ingress,并获取到它的 ownerReference ingress := obj.(*netv1.Ingress) ownerReference := metav1.GetControllerOf(ingress) // 如果ingress的 ownerReference 没有绑定到service,则无需处理 if ownerReference == nil || ownerReference.Kind != "Service" { return } // 如果ingress的 ownerReference 已经绑定到service,则需要处理 c.enqueue(obj) } // enqueue 将 待添加service 的 key 加入 workqueue func (c *controller) enqueue(obj interface{}) { // 调用工具方法,获取 kubernetes资源对象的 key(默认是 ns/name,或 name) key, err := cache.MetaNamespaceKeyFunc(obj) // 获取失败,不加入队列,即本次事件不予处理 if err != nil { runtime.HandleError(err) return } // 将 key 加入 workqueue c.queue.Add(key) } // dequeue 将处理完成的 key 出队 func (c *controller) dequeue(item interface{}) { c.queue.Done(item) } // Run 启动controller func (c *controller) Run(stopCh chan struct{}) { // 启动多个worker,同时对workqueue中的事件进行处理,效率提升5倍 for i := 0; i < workNum; i++ { // 每个worker都是一个协程,使用同一个停止信号 go wait.Until(c.worker, time.Minute, stopCh) } // 启动完成后,Run函数就停止在这里,等待停止信号 <-stopCh } // worker方法 func (c *controller) worker() { // 死循环,worker处理完一个,再去处理下一个 for c.processNextItem() { } } // processNextItem 处理下一个 func (c *controller) processNextItem() bool { // 从 workerqueue 取出一个key item, shutdown := c.queue.Get() // 如果已经收到停止信号了,则返回false,worker就会停止处理 if shutdown { return false } // 处理完成后,将这个key出队 defer c.dequeue(item) // 转成string类型的key key := item.(string) // 处理service逻辑的核心方法 err := c.syncService(key) // 处理过程出错,进入错误统一处理逻辑 if err != nil { c.handleError(key, err) } // 处理结束,返回true return true } // handleError 错误统一处理逻辑 func (c *controller) handleError(key string, err error) { // 如果当前key的处理次数,还不到最大重试次数,则再次加入队列 if c.queue.NumRequeues(key) < maxRetry { c.queue.AddRateLimited(key) return } // 运行时统一处理错误 runtime.HandleError(err) // 不再处理这个key c.queue.Forget(key) } // syncService 处理service逻辑的核心方法 func (c *controller) syncService(key string) error { // 将 key 切割为 ns 和 name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } // 从indexer中,获取service service, err := c.serviceLister.Services(namespace).Get(name) // 没有service,直接返回 if errors.IsNotFound(err) { return nil } if err != nil { return err } // 检查service的annotation,是否包含 key: "ingress/http" _, ok := service.Annotations[annoKey] // 从indexer缓存中,获取ingress ingress, err := c.ingressLister.Ingresses(namespace).Get(name) if ok && errors.IsNotFound(err) { // ingress不存在,但是service有"ingress/http",需要创建ingress // 创建ingress ig := c.createIngress(service) // 调用controller中的client,完成ingress的创建 _, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metav1.CreateOptions{}) if err != nil { return err } } else if !ok && ingress != nil { // ingress存在,但是service没有"ingress/http",需要删除ingress // 调用controller中的client,完成ingress的删除 err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) if err != nil { return err } } return nil } // createIngress 创建ingress func (c *controller) createIngress(service *corev1.Service) *netv1.Ingress { icn := "ingress" pathType := netv1.PathTypePrefix return &netv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: service.Name, Namespace: service.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(service, corev1.SchemeGroupVersion.WithKind("Service")), }, }, Spec: netv1.IngressSpec{ IngressClassName: &icn, Rules: []netv1.IngressRule{ { Host: "example.com", IngressRuleValue: netv1.IngressRuleValue{ HTTP: &netv1.HTTPIngressRuleValue{ Paths: []netv1.HTTPIngressPath{ { Path: "/", PathType: &pathType, Backend: netv1.IngressBackend{ Service: &netv1.IngressServiceBackend{ Name: service.Name, Port: netv1.ServiceBackendPort{ Number: 80, }, }, }, }, }, }, }, }, }, }, } }
6.3.kubernetes内置控制器源码学习
- 上面的代码我写了详细注释,建议仔细阅读一遍,注意其中的 NewController()、Run()、worker()、processNextItem()、handleError()、syncService() 这些方法的命名和处理逻辑。
- 你会发现 kubernetes源码中,内置的控制器和他们是一样的, 即 kubernetes内置控制器 也是使用client-go的这种方式,实现控制器开发的。
- 下面以 DeploymentController 为例,带大家看一下
- 可以看到,基本都是一样的写法,学到这里,大家就可以自行去看kubernetes各种控制器的源码了
到了这里,关于Kubernetes operator(一)client-go篇【更新中】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!