Kubernetes operator(一)client-go篇【更新中】

这篇具有很好参考价值的文章主要介绍了Kubernetes operator(一)client-go篇【更新中】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

云原生学习路线导航页(持续更新中)

  • 本文是 Kubernetes operator学习 系列第一篇,主要对client-go进行学习,从源码阅读角度,学习client-go各个组件的实现原理、如何协同工作等
  • 参考视频:Bilibili 2022年最新k8s编程operator篇,UP主:白丁云原生
  • 本文参考资料
    • https://pan.baidu.com/s/1BibLAishAFJLeTyYCLnlbA 提取码: en2p
    • https://zhuanlan.zhihu.com/p/573982128
    • https://xinchen.blog.csdn.net/article/details/113753087
    • 并根据个人理解进行了汇总和修改

1.为什么要学习 client-go

  • 为了适应更多的业务场景,k8s提供了很多的扩展点,用于满足更复杂的需求
  • K8s的扩展点如下:
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • ① kubectl
    • 用户通过kubectl与ApiServer进行交互,kubectl提供了插件,可以扩展kubectl的行为,但是这些插件只能影响用户本地的环境
  • ② API Server
    • 处理所有的请求,可以对用户请求进行 身份认证、基于其内容阻止请求、编辑请求内容、处理删除操作等等。
    • 这个扩展点应该说的是:用户可以 自定义 API Server
  • ③ k8s提供的内置资源
    • 我们无法修改,只能通过 annotation、label 控制他们
  • ④ CRD
    • 自定义资源,配合 自定义控制器Custom Controller,扩展k8s的特定业务场景
  • ⑤ scheduler
    • 调度器,决定k8s把Pod放到哪个节点执行。k8s提供了多种方式扩展调度行为
  • ⑥ Controller Manager
    • 实际上也是k8s的一个客户端,通过与API Server交互。k8s的每种资源都有对应的控制器,都属于ControllerManager
    • client-go本质上就是一个与apiserver交互的库,所以Controller Manager也是通过 client-go 库与 API Server交互的
  • ⑦ Custom Controller
    • 自定义控制器,可以控制 内置资源,也可以控制自定义资源CRD
  • ⑧ kubelet
    • 使用CNI:使得k8s可以使用不同技术,连接Pod网络
    • 使用CSI:使得k8s可以支持不同的存储类型
    • 使用CRI:使得k8s可以支持不同的容器运行时
  • ⑨ client-go
    • 一个通用的Golang库,用于和 apiserver 交互
    • 不管是k8s的各个组件,还是我们自己为CRD开发Custom Controller,都需要使用 client-go 与 API Server 进行通信

综上所述,要想学习 Operator,使用 CRD + Custom Controller 扩展kubernetes功能,必须先学习 Client-go 库,学会如何与APIServer进行交互

2.client-go与kubernetes版本对应关系

我们假设前提:kubernetes版本为 v1.x.y

  • kubernetes版本 >= v1.17.0时,client-go 版本使用 v0.x.y
  • kubernetes版本 < v1.17.0时,client-go 版本使用 v1.x.y

3.client-go架构

3.1.client-go 源代码目录介绍

  • github地址:https://github.com/kubernetes/client-go
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • discovery:用于发现API Server都是支持哪些API。kubectl apiversions使用了同样的机制
  • dynamic:包含了kubernetes dynamic client的逻辑,可以操作任意的k8s资源API对象,包括内置的、自定义的资源对象
  • informers:包含了所有内置资源的informer,便于操作k8s的资源对象
  • kubernetes:包含了访问Kubernetes API的 所有ClientSet
  • listers:包含了所有内置资源的lister,用于读取缓存中k8s资源对象的信息
  • plugin/pkg/client/auth:包含所有可选的认证插件,用于从外部获取credential(凭证)
  • tools:包含一系列工具,编写控制器时会用到很多里面的工具方法
  • transport:包含了创建连接、认证的逻辑,会被上层的ClientSet使用

3.2.client-go 架构

图片参考来源:https://zhuanlan.zhihu.com/p/573982128
Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
下面先介绍各组件整体的运转流程,然后对 client-go 和 一个 CRDController 应该包含哪些组件进行详细介绍。

  • 整体流程简介
    • Reflector会持续监听k8s集群中指定资源类型的API,当发现变动和更新时,就会创建一个发生变动的 对象副本,并将其添加到队列DeltaFIFO中
    • Informer监听DeltaFIFO队列,取出对象,做两件事:
      • (1)将对象加入Indexer,Indexer 会将 [对象key, 对象] 存入一个线程安全的Cache中
      • (2)根据对象的 资源类型和操作,找到对应 Controller 预先提供的 Resource Event Handler,调用Handler,将对象的Key加入该 Controller 的 Workqueue
    • Controller 的循环函数 ProcessItem,监听到 Workqueue 有数据了,就会取出一个key,交给处理函数Worker,Worker 会根据 Key,使用 Indexer reference 从 Cache 中 获取 该key对应的 真实对象。然后就可以进行调谐了。
    • 注意点
      • DeltaFIFO 中 存的是 对象副本
      • Cache 中 存的是 [对象key, 对象] 的映射
      • Workqueue 中存的是 对象Key
      • CRDController 中,使用Informer对象,是为了向其中添加一些 Resource Event Handlers
      • CRDController 中,使用Indexer对象,是为了根据对象Key,获取对象实例
  • client-go组件
    • Reflector
      • reflector会一直监听kubernetes中指定资源类型的API,实现监听的函数就是ListAndWatch。这种监听机制既适用于k8s的内建资源,也适用于自定义资源。
      • 当reflector通过监听API发现资源对象实例存在新的 notification 时,它就会使用 listing API 获取这个新的实例对象,并将其放入 watchHandler 函数内的 DeltaFIFO 中;
    • Informer
      • Informer 会从 Delta FIFO 中取出对象。实现这个功能的方法对应源码中的 processLoop;
      • Informer 取出对象后,根据Resource类型,调用对应的 Resource Event Handler 回调函数,该函数实际上由某个具体的 Controller 提供,函数中会获取对象的 key,并将 key 放入到 该Controller 内部的 Workqueue 中,等候处理。
    • Indexer 和 Thread Safe Store
      • Indexer 会提供对象的索引功能,通常是基于对象Key来创建索引。默认索引函数是MetaNamespaceKeyFunc, 它生成的索引键为/格式。
      • Indexer 维护着一个线程安全的 Cache,即 Thread Safe Store。存储的是[对象key, 对象],用对象Key可以进行获取对象实例。
    • Resource Event Handlers reference
      • 这实际上是所有Controller的Resource Event Handlers的引用。
      • 这些 handlers 由具体的Controller提供,就是 Informer 的回调函数。Informer 会根据资源的类型,调用对应Controller的 handler 方法
      • handler 通常都是用于将资源对象的key放入到 该Controller 内部的 Workqueue 中,等候处理。
  • 自定义控制器组件
    • Informer reference
      • Informer reference 是 Informer 实例对象的引用,用于操作和处理自定义资源对象
      • 我们编写自定义控制器时,需要引用自己需要的Informer,向其中加入一系列 Resource Event Handlers
    • Indexer reference
      • Indexer reference 是 Indexer实例对象的引用,用于根据对象Key索引资源对象
      • 我们编写自定义控制器时,应该创建Indexer的引用,将对象Key传给它,就可以获取想要处理的对象
      • NewIndexerInformer函数
        • client-go中的基本控制器提供了 NewIndexerInformer 函数,用于创建Informer和Indexer。
        • 可以直接使用NewIndexerInformer 函数,或者也可以使用工厂方法来创建Informer
    • Resource Event Handlers
      • 由具体的 Controller 给 Client-go 的Informer 提供的回调函数,获取待处理对象的key,并将key放入到Workqueue中。
    • Workqueue
      • 此队列是 具体的Controller 内部创建的队列,用于暂时存储从Resource event handler 中 传递过来的,待处理对象的Key。
      • Resource event handler 函数通常会获取待处理对象的key,并将key放入到这个workqueue中。
    • Process Item
      • 这个函数为循环函数,它不断从 Work queue 中取出对象的key,并使用 Indexer Reference 获取这个key对应的具体资源对象,然后根据资源的变化,做具体的调谐 Reconcile 动作。

3.3.使用client-go编写Controller的步骤

  • 根据2.2中所述,编写一个 自定义Controller,需要实现如下功能。
    • 先从client-go中获取对应资源的 Informer
    • 提供 一系列的 Resource event handlers,并加入对应的Informer,供该informer回调
    • 提供一个 Workqueue 队列,存储待处理的对象的Key
    • 提供一个 循环函数 ProcessItem,不断从 Workqueue 中取出对象的key,交给 处理函数 Worker
      • 提供一个 处理函数 Worker,根据对象Key,使用对应资源的Indexer,获取到该对象的实例,根据对象的属性变化,做真正的调谐过程。

4.client-go的client组件

4.1.Client的4种类型

  • 我们知道,client-go 可以实现与 kubernetes 的通信。如何实现的呢?
  • client-go 主要提供了4种 client 组件:
    • RESTClient:最基础的客户端,提供最基本的封装,可以通过它组装与API Server即时通讯时 的 url
    • Clientset:是一个Client的集合,在Clientset中包含了所有K8S内置资源 的 Client,通过Clientset便可以很方便的操作如Pod、Service这些资源
    • dynamicClient:动态客户端,可以操作任意K8S的资源,包括CRD定义的资源
    • DiscoveryClient:用于发现K8S提供的资源组、资源版本和资源信息,比如:kubectl api-resources
  • 4种client分别对应源码目录:
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器

4.2.RESTClient详解

4.2.1.RESTClient 是什么

  • RESTClient 是最基础的客户端,提供与APIServer通信的最基本封装,可以向APIServer发送 Restful 风格请求。
  • 之所以称 RESTClient 是 最基础的客户端,是因为其他三种Client,其实都是 RESTClient 的再封装,内部都使用了 RESTClient。

4.2.2.RESTClient结构体

type RESTClient struct {
	// base is the root URL for all invocations of the client
	base *url.URL
	
	// versionedAPIPath is a path segment connecting the base URL to the resource root
	versionedAPIPath string

	// content describes how a RESTClient encodes and decodes responses.
	content ClientContentConfig

	// creates BackoffManager that is passed to requests.
	createBackoffMgr func() BackoffManager

	// rateLimiter is shared among all requests created by this client unless specifically
	// overridden.
	rateLimiter flowcontrol.RateLimiter

	// warningHandler is shared among all requests created by this client.
	// If not set, defaultWarningHandler is used.
	warningHandler WarningHandler

	// Set specific behavior of the client.  If not set http.DefaultClient will be used.
	Client *http.Client
}

4.2.3.RESTClient常用方法

  • RESTClientFor()
    • 位置:rest/config.go 文件
    • 函数签名:func RESTClientFor(config *Config) (*RESTClient, error),直接 rest点 调用
    • 该方法是用于 创建一个 RESTClient 实例
    • 接收一个 rest.Config 类型参数,Config中包含了 限速器、编解码器 等
      • RESTClientFor 方法内部会从 Config 中取出这些配置,设置给RESTClient 实例
      • 这样RESTClient 实例就具有了 限速、编解码 等多种功能
      • 因此,我们创建Config的时候,可以手动设置这些功能,下面的示例中会展示。
  • RESTClient实例的常用方法
    • /rest/client.go 中有一个接口 Interface
      // Interface captures the set of operations for generically interacting with Kubernetes REST apis.
      type Interface interface {
      	GetRateLimiter() flowcontrol.RateLimiter
      	Verb(verb string) *Request
      	Post() *Request
      	Put() *Request
      	Patch(pt types.PatchType) *Request
      	Get() *Request
      	Delete() *Request
      	APIVersion() schema.GroupVersion
      }
      
    • RESTClient 实现了这个接口,因此具有所有的方法,用于发送各种类型的请求
    • 另外,Interface 每个方法的返回值都是 Request 类型,Request 类型的各种方法,很多的返回值也是 Request,这样就可以实现 链式编程

4.2.4.RESTClient的一些其他知识点(建议看一遍)

4.2.4.1.Request 和 Result 常用方法
  • Request 位于 /rest/request.go
    • func (r *Request) Namespace(namespace string) *Request:设置 当前Resquest 访问的 namespace
    • func (r *Request) Resource(resource string) *Request:设置 当前Resquest 想要访问的资源类型
    • func (r *Request) Name(resourceName string) *Request:设置 当前Resquest 想要访问的资源的名称
    • func (r *Request) Do(ctx context.Context) Result:格式化并执行请求。返回一个 Result 对象,以便于处理响应。
  • Result 也位于 /rest/request.go
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
4.2.4.2.rest.Config 结构体
  • 位于 rest/config.go 中,用于描述 kubernetes客户端的通用属性
    type Config struct {
    	// API 服务器的主机地址,格式为 https://<hostname>:<port>。默认情况下,它为空字符串,表示使用当前上下文中的集群配置。
    	Host string
    	
    	// 指定 API 服务器的路径,目前只有两种取值:/api、/apis
    	// - /api:访问core API 组资源时,其实group值为空
    	// - /apis:访问其他 API 组资源时,都是apis,他们都有group值
    	APIPath string
    
    	// 对请求内容的配置,会影响对象在发送到服务器时的转换方式
    	// - ContentConfig中有两个重要属性:
    	//   - NegotiatedSerializer:用于序列化和反序列化请求和响应的接口
    	//   - GroupVersion:请求资源的 API 组和版本
    	ContentConfig
    
    	// 用于进行基本身份验证的用户名的字符串
    	Username string
    	
    	// 用于进行基本身份验证的密码的字符串
    	Password string `datapolicy:"password"`
    
    	// 用于进行身份验证的令牌的字符串
    	BearerToken string `datapolicy:"token"`
    
    	// 包含身份验证令牌的文件的路径
    	BearerTokenFile string
    
    	// TLS 客户端配置,包括证书和密钥
    	TLSClientConfig
    
    	// 每秒允许的请求数(Queries Per Second)。默认为 5.0。
    	QPS float32
    
    	// 突发请求数。默认为 10
    	Burst int
    
    	// 速率限制器,用于控制向 API 服务器发送请求的速率
    	RateLimiter flowcontrol.RateLimiter
    
    	// 与 API 服务器建立连接的超时时间
    	Timeout time.Duration
    
    	// 用于创建网络连接的 Dial 函数
    	Dial func(ctx context.Context, network, address string) (net.Conn, error)
    
    	// ......
    }
    
4.2.4.3.tools/clientcmd 工具
  • 源码位于 client-go/tools/clientcmd 包下
  • clientcmd 是 Kubernetes Go 客户端库(client-go)中的一个包,用于加载和解析 Kubernetes 配置文件,并辅助创建与 Kubernetes API 服务器进行通信的客户端。
  • clientcmd 提供了一些功能,使得在客户端应用程序中处理 Kubernetes 配置变得更加方便。主要包含以下几个方面的功能:
    • 加载配置文件:clientcmd 可以根据指定的路径加载 Kubernetes 配置文件,例如 kubeconfig 文件。
    • 解析配置文件:一旦加载了配置文件,clientcmd 提供了解析配置文件的功能,可以获取各种配置信息,如集群信息、认证信息、上下文信息等。
    • 辅助创建客户端:clientcmd 可以使用配置文件中的信息,辅助创建与 Kubernetes API 服务器进行通信的客户端对象。这些客户端对象可以用来执行对 Kubernetes 资源的增删改查操作。
    • 切换上下文:clientcmd 还支持在多个上下文之间进行切换。上下文表示一组命名空间、集群和用户的组合,用于确定客户端与哪个Kubernetes 环境进行通信。

4.2.5.RESTClient使用示例

  • 需求:获取default命名空间下的所有pod,并打印所有pod的name
  • 首先到kubernetes的官方API文档中,查看 请求url、响应
    • https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#pod-v1-core
    • 或者:https://v1-24.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#list-pod-v1-core
      Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • 代码编写
    package main
    
    import (
    	"context"
    	v1 "k8s.io/api/core/v1"
    	"k8s.io/client-go/kubernetes/scheme"
    	"k8s.io/client-go/rest"
    	"k8s.io/client-go/tools/clientcmd"
    )
    
    func main() {
    	// 在你机器的homeDir下,放入集群的config文件,用于连接集群(可以直接从集群master的~/.kube/config拷贝过来)
    	// clientcmd是位于client-go/tools/clientcmd目录下的工具
    	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    	if err != nil {
    		panic(err)
    	}
    	// 设置默认 GroupVersion(我要操作的是pod,不属于任何的group,所以使用了SchemeGroupVersion。你要操作什么,就写什么GroupVersion即可)
    	config.GroupVersion = &v1.SchemeGroupVersion
    
    	// 设置序列化/反序列化器(后面的 Into方法 就是使用它完成 反序列化 的)
    	config.NegotiatedSerializer = scheme.Codecs
    
    	// 设置 API 根的子路径(我们操作的是pod,属于core资源,所以设置为/api)
    	config.APIPath = "/api"
    	
    	// 创建一个 RESTClient
    	restClient, err := rest.RESTClientFor(config)
    	if err != nil {
    		panic(err)
    	}
    
    	// 创建一个Pod,用于接收请求结果
    	pods := v1.PodList{}
    	
    	// 链式编程 发送请求,并反序列化结果到pod中
    	err = restClient.Get().Namespace(v1.NamespaceDefault).Resource("pods").Do(context.TODO()).Into(&pods)
    	if err != nil {
    		panic(err)
    	}
    	
    	// 打印pod名称
    	for _, pod := range pods.Items {
    		println(pod.Name)
    	}
    }
    
  • 输出结果
    cassandra-5hbf7
    liveness-exec
    mysql-87pgn
    myweb-7f8rh
    myweb-rjblc
    nginx-pod-node1
    

4.3.Clientset详解

4.3.1.Clientset是什么

  • 结论:Clientset 是 一系列 RESTClient 的 集合。
  • 从4.2.4的 RESTClient 使用示例来看,使用 RESTClient 操作kubernetes资源,太麻烦了
    • 要操作 pods,需要指定config,给config设置 APIPath 为 “/api”、设置序列化器、设置 GroupVersion,最后还要调用 rest.RESTClientFor(config) 得到一个 用于操作pods的Clientset
    • 而如果我要操作 deployment,这个过程又需要写一遍,然后又得到一个 用于操作deployment的Clientset
    • 代码冗余,不优雅,而且到处创建Clientset,耗时又浪费资源
  • 因此,就有了事先创建 各种资源的RESTClient,存起来备用的需求。Clientset就是这样封装起来的一个set集合。

4.3.2.Clientset的结构体

  • 位于 /kubernetes/clientset.go
  • Clientset结构体:
    type Clientset struct {
    	......
    	appsV1                        *appsv1.AppsV1Client
    	appsV1beta1                   *appsv1beta1.AppsV1beta1Client
    	appsV1beta2                   *appsv1beta2.AppsV1beta2Client
    	authenticationV1              *authenticationv1.AuthenticationV1Client
    	authenticationV1alpha1        *authenticationv1alpha1.AuthenticationV1alpha1Client
    	authenticationV1beta1         *authenticationv1beta1.AuthenticationV1beta1Client
    	authorizationV1               *authorizationv1.AuthorizationV1Client
    	authorizationV1beta1          *authorizationv1beta1.AuthorizationV1beta1Client
    	autoscalingV1                 *autoscalingv1.AutoscalingV1Client
    	autoscalingV2                 *autoscalingv2.AutoscalingV2Client
    	autoscalingV2beta1            *autoscalingv2beta1.AutoscalingV2beta1Client
    	autoscalingV2beta2            *autoscalingv2beta2.AutoscalingV2beta2Client
    	batchV1                       *batchv1.BatchV1Client
    	batchV1beta1                  *batchv1beta1.BatchV1beta1Client
    	certificatesV1                *certificatesv1.CertificatesV1Client
    	certificatesV1beta1           *certificatesv1beta1.CertificatesV1beta1Client
    	certificatesV1alpha1          *certificatesv1alpha1.CertificatesV1alpha1Client
    	coordinationV1beta1           *coordinationv1beta1.CoordinationV1beta1Client
    	coordinationV1                *coordinationv1.CoordinationV1Client
    	coreV1                        *corev1.CoreV1Client
    	......
    }
    
  • appsv1 的类型 *appsv1.AppsV1Client 举例:可以看到,内部包含了一个 restClient。这也进一步认证,Clientset 就是一系列 RESTClient 的集合。
    type AppsV1Client struct {
    	restClient rest.Interface
    }
    

4.3.3.Clientset的常用方法

4.3.3.1.NewForConfig()方法
  • 位于 /kubernetes/clientset.go 中,所以可以直接使用 kubernetes.NewForConfig() 使用
  • 用于创建一个Clientset,传入一个rest.Config配置对象
    func NewForConfig(c *rest.Config) (*Clientset, error) {
    	configShallowCopy := *c
    
    	if configShallowCopy.UserAgent == "" {
    		configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
    	}
    
    	// share the transport between all clients
    	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
    	if err != nil {
    		return nil, err
    	}
    
    	// 这个方法,就完成了所有 RESTClient 的创建
    	return NewForConfigAndClient(&configShallowCopy, httpClient)
    }
    
    func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
    	configShallowCopy := *c
    	if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
    		if configShallowCopy.Burst <= 0 {
    			return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
    		}
    		configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
    	}
    
    	var cs Clientset
    	var err error
    	// 下面就是创建各种 RESTClient 了,创建结果,被保存到 cs 中
    	cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfigAndClient(&configShallowCopy, httpClient)
    	if err != nil {
    		return nil, err
    	}
    	cs.admissionregistrationV1alpha1, err = admissionregistrationv1alpha1.NewForConfigAndClient(&configShallowCopy, httpClient)
    	if err != nil {
    		return nil, err
    	}
    	cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfigAndClient(&configShallowCopy, httpClient)
    	if err != nil {
    		return nil, err
    	}
    	......
    	return &cs, nil
    }
    
4.3.3.2.Clientset的实例方法
  • Clientset 实现了 /kubernetes/clientset.go 下的 Interface接口,将自己内部的 私有属性 供外部使用
  • Interface 接口源码
    type Interface interface {
    	......
    	AppsV1() appsv1.AppsV1Interface
    	AppsV1beta1() appsv1beta1.AppsV1beta1Interface
    	AppsV1beta2() appsv1beta2.AppsV1beta2Interface
    	AuthenticationV1() authenticationv1.AuthenticationV1Interface
    	AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
    	AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
    	AuthorizationV1() authorizationv1.AuthorizationV1Interface
    	AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
    	AutoscalingV1() autoscalingv1.AutoscalingV1Interface
    	AutoscalingV2() autoscalingv2.AutoscalingV2Interface
    	AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
    	AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
    	BatchV1() batchv1.BatchV1Interface
    	BatchV1beta1() batchv1beta1.BatchV1beta1Interface
    	CertificatesV1() certificatesv1.CertificatesV1Interface
    	CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
    	CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
    	CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
    	CoordinationV1() coordinationv1.CoordinationV1Interface
    	CoreV1() corev1.CoreV1Interface
    	......
    }
    
  • AppsV1() 方法为例,返回值是接口 appsv1.AppsV1Interface 的实现类 appsv1.AppsV1Client 的对象
    // 接口
    type AppsV1Interface interface {
    	RESTClient() rest.Interface
    	ControllerRevisionsGetter
    	DaemonSetsGetter
    	DeploymentsGetter
    	ReplicaSetsGetter
    	StatefulSetsGetter
    }
    
    // 实现类
    type AppsV1Client struct {
    	restClient rest.Interface
    }
    
    // AppsV1Client 实现 AppsV1Interface 接口的方法
    func (c *AppsV1Client) RESTClient() rest.Interface {
    	if c == nil {
    		return nil
    	}
    	return c.restClient
    }
    
  • appsv1.AppsV1Client 的其他实例方法
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • 以 appsv1.AppsV1Client.Deployments() 方法举例
    • Deployments() 方法源码
      // 返回值是DeploymentInterface 
      func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
      	// 实际上,返回值是 DeploymentInterface 的实现类 deployments 的对象
      	return newDeployments(c, namespace)
      }
      
      // 构造一个 deployments 的对象
      func newDeployments(c *AppsV1Client, namespace string) *deployments {
      	return &deployments{
      		client: c.RESTClient(),
      		ns:     namespace,
      	}
      }
      
    • 返回值:DeploymentInterface 接口源码,可以看到包含操作Deployment的各种方法
      type DeploymentInterface interface {
      	Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
      	Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
      	UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
      	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
      	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
      	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
      	List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
      	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
      	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
      	Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
      	ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
      	GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
      	UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
      	ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)
      
      	DeploymentExpansion
      }
      
  • 挑选 DeploymentInterface.Create 方法,查看 实现类 deployments 的 Create实现
    • 可以看出,Create方法的内容,就跟我们4.2.4中使用 RESTClient 的方式差不多
    • 这更加印证了,Clientset 就是 对各种 GroupVersion 的 RESTClient 的封装
    func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
    	result = &v1.Deployment{}
    	err = c.client.Post().
    		Namespace(c.ns).
    		Resource("deployments").
    		VersionedParams(&opts, scheme.ParameterCodec).
    		Body(deployment).
    		Do(ctx).
    		Into(result)
    	return
    }
    
  • 实际上,这些方法都不是人工写的,都是 code-generator 自动生成的
    • code-generator 提供了很多工具用于为k8s中的资源生成相关代码,其中包括一个 client-gen
    • client-gen:可以为资源生成标准的操作方法(get;list;watch;create;update;patch;delete)
    • 比如,在kuberentes源码 staging/src/k8s.io/api/core/v1/types.go 中,可以看到 type Pod struct 注释上,就使用了 genclient 的标记
      // +genclient
      // +genclient:method=UpdateEphemeralContainers,verb=update,subresource=ephemeralcontainers
      // ......
      type Pod struct {
      	......
      }
      
    • client-gen 常用标记
      // +genclient - 生成默认的客户端动作函数(create, update, delete, get, list, update, patch, watch以及 是否生成updateStatus取决于.Status字段是否存在)。
      // +genclient:nonNamespaced - 所有动作函数都是在没有名称空间的情况下生成
      // +genclient:onlyVerbs=create,get - 指定的动作函数被生成.
      // +genclient:skipVerbs=watch - 生成watch以外所有的动作函数.
      // +genclient:noStatus - 即使.Status字段存在也不生成updateStatus动作函数
      

4.3.4.Clientset使用示例

  • 需求:获取default命名空间下的pod列表,并获取kube-system命名空间下的deploy列表
  • 从下面代码来看,创建了一个 clientset,就可以操作不同 GroupVersion 下的 不同资源,也无需再去手动指定 APIPath 等值了
  • 代码编写
    package main
    
    import (
    	"context"
    	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/tools/clientcmd"
    )
    
    func main() {
    	// 同样是先 创建一个客户端配置config
    	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    	if err != nil {
    		panic(err)
    	}
    
    	// 使用 kubernetes.NewForConfig(),创建一个ClientSet对象
    	clientSet, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		panic(err)
    	}
    
    	// 1、从 clientSet 中调用操作pod的 RESTClient,获取default命名空间下的pod列表
    	pods, err := clientSet.CoreV1().Pods(v1.NamespaceDefault).List(context.TODO(), v1.ListOptions{})
    	if err != nil {
    		panic(err)
    	}
    	// 打印pod名称
    	for _, pod := range pods.Items {
    		println(pod.Name)
    	}
    
    	println("------")
    
    	// 2、从 clientSet 中调用操作 deploy 的 RESTClient,获取kube-system命名空间下的deploy列表
    	deploys, err := clientSet.AppsV1().Deployments("kube-system").List(context.TODO(), v1.ListOptions{})
    	if err != nil {
    		panic(err)
    	}
    	// 打印 deploy 名称
    	for _, deploy := range deploys.Items {
    		println(deploy.Name)
    	}
    }
    
  • 输出结果
    cassandra-5hbf7
    liveness-exec
    mysql-87pgn
    myweb-7f8rh
    myweb-rjblc
    nginx-pod-node1
    ------
    coredns
    default-http-backend
    metrics-server
    

4.4.DynamicClient详解

4.4.1.DynamicClient是什么

  • 由4.3.1中可知,Clientset是一系列RESTClient的集合,创建一个 Clientset 实例,其中已经包含了所有kubernetes内置资源的RESTClient,可是对于CustomResource(CR)资源,Clientset就没有操作它们的RESTClient了。
  • 因此,client-go特意提供了一种 客户端,即 DynamicClient,可以操作任意的kubernetes资源,包括 内置资源 + CR 资源

4.4.2.DynamicClient结构体分析

  • 位于 /client-go/dynamic/simple.go
  • DynamicClient结构体
    • 可以看到,DynamicClient 中只包含一个RESTClient的字段 client
    • client 类型为 rest.Interface,即为 RESTClient实现的那个接口,可以在4.2.2中看到
    type DynamicClient struct {
       client rest.Interface
    }
    

4.4.3.DynamicClient的常用方法

4.4.3.1.dynamic.NewForConfig()方法
  • 位于 /client-go/dynamic/simple.go,是一个函数,位于dynamic包下,所以直接点就可以使用
  • 该方法用于创建一个 DynamicClient 实例,入参也是 *rest.Config 类型
    func NewForConfig(inConfig *rest.Config) (*DynamicClient, error) {
    	config := ConfigFor(inConfig)
    
    	httpClient, err := rest.HTTPClientFor(config)
    	if err != nil {
    		return nil, err
    	}
    	return NewForConfigAndClient(config, httpClient)
    }
    
4.4.3.2.DynamicClient 的 实例方法
  • 结论:
    • DynamicClient 只有一个实例方法: Resource(resource schema.GroupVersionResource),用于指定当前 DynamicClient 要操作的究竟是什么类型
  • 下面进行源码分析。
  • Resource()其实是 DynamicClient 实现了接口 dynamic.Interface
    • 接口中只有一个 Resource()
    • 参数是一个 GVR,指定group、version、resource,就可以确定到底是哪个资源了
    • 返回值为 NamespaceableResourceInterface 接口类型
    type Interface interface {
    	Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
    }
    
  • DynamicClient 实现了这个接口,自然也实现了Resource这个方法
    • 下面代码返回的是 NamespaceableResourceInterface 接口的实现类型 dynamicResourceClient 的对象
    • 这个对象就是我们最终可以用于操作 你所指定资源 的 Client 了
    func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
    	return &dynamicResourceClient{client: c, resource: resource}
    }
    
  • dynamicResourceClient 类型
    • 可见,dynamicResourceClient 包含了DynamicClient对象、要操作的类型的GVR、还有资源的ns
    type dynamicResourceClient struct {
    	client    *DynamicClient
    	namespace string
    	resource  schema.GroupVersionResource
    }
    
    • 可以调用它的Namespace方法,为namespace字段赋值
      • 这个方法就是 NamespaceableResourceInterface 接口提供的
    func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
    	ret := *c
    	ret.namespace = ns
    	return &ret
    }
    

4.4.4.认识kubrenetes 的 Unstructured

4.4.4.1.为什么需要 Unstructured
  • 前面我们使用 RESTClient 或 Clientset,调用的是某个资源的 Get、List 等方法,返回值都是确定的。比如 调用Pod资源的List,我们能够确定返回的一定是PodList,所以Pod的List方法,就写死了,返回值就是PodList
  • 可当使用 DynamicClient 时,DynamicClient 没有确定的资源,资源类型是我们使用的时候才会去指定GVR的,所以 开发DynamicClient 的时候,Get 方法自然无法确定返回值类型。
  • 那么kubernetes人员应该如何开发这个Get方法?
  • 我们自然而然的能够想到,如果有一个通用的数据结构,可以表示所有类型的资源,问题就解决了
  • Unstructured 应运而生。它就是一个 可以表示所有资源的类型。
4.4.4.2.Unstructured源码
  • 位于 staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unsructured/unsructured.go
  • Unstructured 结构
    type Unstructured struct {
    	// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
    	// map[string]interface{}
    	// children.
    	Object map[string]interface{}
    }
    
  • Unstructured 包含一个 map,键为string,value为interface{},因此value可以为任意类型
4.4.4.3.dynamicResourceClient对于 Unstructured 的应用
  • 4.4.3.2.DynamicClient 的 实例方法 中我们知道,我们最终操作资源,其实使用的是 dynamicResourceClient
  • dynamicResourceClient 实现了 ResourceInterface 接口的所有方法,这些方法就是我们最终操作资源所调用的。下面我们看一下 ResourceInterface 接口的所有方法签名
    type ResourceInterface interface {
    	Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
    	Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
    	UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
    	Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
    	DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
    	Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
    	List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
    	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
    	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
    	Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
    	ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
    }
    
  • 可以看到:
    • Create方法因为不知道要创建什么资源,所以参数接收的是 obj *unstructured.Unstructured
    • Get方法因为不知道返回的是什么资源,所以返回的是 *unstructured.Unstructured
    • List方法因为不知道返回的是什么资源列表,所以返回的是 *unstructured.UnstructuredList。这个结构里面包含一个 []unstructured.Unstructured
4.4.4.4.Unstructured 与 资源对象的相互转换
  • 既然 dynamicResourceClient 的方法 接收和返回的,很多都是 Unstructured 类型,那么我们就需要实现 真正的资源对象Unstructured 的 相互转换
  • runtime包下,给我们提供了一个 UnstructuredConverter 接口,接口中提供了两个方法,分别用于 资源对象-->UnstructuredUnstructured-->资源对象
    • 位于 staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
    type UnstructuredConverter interface {
       ToUnstructured(obj interface{}) (map[string]interface{}, error)
       FromUnstructured(u map[string]interface{}, obj interface{}) error
    }
    
  • UnstructuredConverter 接口只有一个实现类 unstructuredConverter
    • 位于 staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
    type unstructuredConverter struct {
    	// If true, we will be additionally running conversion via json
    	// to ensure that the result is true.
    	// This is supposed to be set only in tests.
    	mismatchDetection bool
    	// comparison is the default test logic used to compare
    	comparison conversion.Equalities
    }
    
    • unstructuredConverter 实现了UnstructuredConverter 接口的 两个方法
      • func (c *unstructuredConverter) ToUnstructured(obj interface{}) (map[string]interface{}, error):资源对象–>Unstructured
      • func (c *unstructuredConverter) FromUnstructured(u map[string]interface{}, obj interface{}) error:Unstructured–>资源对象
  • 不过,unstructuredConverter 类型开头小写,我们无法直接使用,kubernetes 创建了一个全局变量 DefaultUnstructuredConverter,类型就是unstructuredConverter,用以供外界使用
    • 位于 staging/src/k8s.io/apimachinery/pkg/runtime/converter.go
    var (
    	......
    	// DefaultUnstructuredConverter performs unstructured to Go typed object conversions.
    	DefaultUnstructuredConverter = &unstructuredConverter{
    		mismatchDetection: parseBool(os.Getenv("KUBE_PATCH_CONVERSION_DETECTOR")),
    		comparison: conversion.EqualitiesOrDie(
    			func(a, b time.Time) bool {
    				return a.UTC() == b.UTC()
    			},
    		),
    	}
    )
    
  • 总结:我们直接使用 runtime.DefaultUnstructuredConverter,调用它的 ToUnstructured 或 FromUnstructured 方法,就可以实现 Unstructured 与 资源对象的相互转换 了

4.4.5.DynamicClient使用示例

  • 需求:获取 kube-system 命名空间下 name=coredns 的 deploy 对象
  • 步骤
    • 同样是先 创建一个客户端配置config
    • 使用 dynamic.NewForConfig(),创建一个 DynamicClient 对象
    • 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client
    • 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象
    • 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象
  • 代码编写
    package main
    
    import (
    	"context"
    	"fmt"
    	appsv1 "k8s.io/api/apps/v1"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/runtime"
    	"k8s.io/apimachinery/pkg/runtime/schema"
    	"k8s.io/client-go/dynamic"
    	"k8s.io/client-go/tools/clientcmd"
    )
    
    func main() {
    	// 同样是先 创建一个客户端配置config
    	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    	if err != nil {
    		panic(err)
    	}
    
    	// 使用 dynamic.NewForConfig(),创建一个 DynamicClient 对象
    	dynamicClient, err := dynamic.NewForConfig(config)
    	if err != nil {
    		panic(err)
    	}
    
    	// 使用 DynamicClient.Resource(),指定要操作的资源对象,获取到该资源的 Client
    	dynamicResourceClient := dynamicClient.Resource(schema.GroupVersionResource{
    		Group:    "apps",
    		Version:  "v1",
    		Resource: "deployments",
    	})
    
    	// 先为该Client指定ns,然后调用 Client 的 Get() 方法,获取到该资源对象
    	unstructured, err := dynamicResourceClient.
    		Namespace("kube-system").
    		Get(context.TODO(), "coredns", metav1.GetOptions{})
    	if err != nil {
    		panic(err)
    	}
    
    	// 调用 runtime.DefaultUnstructuredConverter.FromUnstructured(),将 unstructured 反序列化成 Deployment 对象
    	deploy := &appsv1.Deployment{}
    	err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), deploy)
    	if err != nil {
    		panic(err)
    	}
    	
    	// 打印 deploy 名称和命名空间
    	fmt.Printf("deploy.Name: %s\ndeploy.namespace: %s", deploy.Name, deploy.Namespace)
    }
    
  • 输出结果
    deploy.Name: coredns
    deploy.namespace: kube-system
    

4.5.DiscoveryClient详解

4.5.1.DiscoveryClient是什么

  • 我们前面学习的3种Client:RESTClient、Clientset、DynamicClient,都是用来操作 kubernetes 资源的,我们目前还缺少一个用于检索 kuberentes资源的 Client
  • DiscoveryClient 就是这样一个 Client。用于检索 kuberentes集群中支持的 API 资源的相关信息,例如版本、组、资源类型等。
  • DiscoveryClient 提供了一组方法来查询和获取这些信息,以便在编写 Controller 或 Operator 时,能够动态地了解集群中可用的资源。

4.5.2.DiscoveryClient结构体

  • 位于 staging/src/k8s.io/client-go/discovery/discovery_client.go
  • DiscoveryClient 结构体很简单
    • LegacyPrefix 字段表示旧版本资源的访问前缀,一般值都是/api。Kubernetes 1.16 以前,资源的访问前缀都是 /api,1.16及之后,全面改成 /apis,为了兼容旧资源,这里特意保存了一个常量字符串 /api
    type DiscoveryClient struct {
    	restClient restclient.Interface
    
    	LegacyPrefix string
    	// Forces the client to request only "unaggregated" (legacy) discovery.
    	UseLegacyDiscovery bool
    }
    

4.5.3.DiscoveryClient的常用方法

4.5.3.1.discovery.NewDiscoveryClientForConfig()方法
  • 位于 /client-go/discovery/discovery_client.go,是一个函数,位于discovery包下,所以直接点就可以使用
  • 该方法用于创建一个 DiscoveryClient 实例,入参也是 *rest.Config 类型
    func NewDiscoveryClientForConfig(c *restclient.Config) (*DiscoveryClient, error) {
    	config := *c
    	if err := setDiscoveryDefaults(&config); err != nil {
    		return nil, err
    	}
    	httpClient, err := restclient.HTTPClientFor(&config)
    	if err != nil {
    		return nil, err
    	}
    	return NewDiscoveryClientForConfigAndClient(&config, httpClient)
    }
    
4.5.3.2.DiscoveryClient的实例方法
  • DiscoveryClient 实现了接口的所有方法,用于获取API资源的各种信息
    • ServerGroupsInterface等这些接口,内部也有很多方法
    type DiscoveryInterface interface {
    	RESTClient() restclient.Interface
    	ServerGroupsInterface
    	ServerResourcesInterface
    	ServerVersionInterface
    	OpenAPISchemaInterface
    	OpenAPIV3SchemaInterface
    	// Returns copy of current discovery client that will only
    	// receive the legacy discovery format, or pointer to current
    	// discovery client if it does not support legacy-only discovery.
    	WithLegacy() DiscoveryInterface
    }
    
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器

4.5.4.CachedDiscoveryClient 和 DiscoveryClient 的区别

  • 在查看 DiscoveryInterface 的时候,除了 DiscoveryClient,还有一个实现类 CachedDiscoveryClient
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • 顾名思义,CachedDiscoveryClient 就是实现了缓存机制的 DiscoveryClient 的封装
  • CachedDiscoveryClient 在 DiscoveryClient 的基础上增加了一层缓存,用于缓存获取的资源信息,以减少对 API Server 的频繁请求。
  • 在首次调用时,CachedDiscoveryClient 会从 API Server 获取资源信息,并将其缓存在本地。之后的调用会直接从缓存中获取资源信息,而不需要再次向 API Server 发送请求。
  • 因为 集群部署完成后,API 资源基本很少变化,所以缓存下来可以很好的提高请求效率。
  • kubectl 工具内部,kubectl api-versions命令其实就是使用了这个CachedDiscoveryClient,所以多次执行kubectl api-versions命令,其实只有第一次请求了API Server,后续都是直接使用的 本地缓存。

4.5.5.DiscoveryClient 使用示例

  • 需求:获取 当前kubernetes集群 的 所有资源列表
  • 代码编写
    func main() {
    	// 1、先创建一个客户端配置config
    	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    	if err != nil {
    		panic(err.Error())
    	}
    
    	// 2、使用 discovery.NewDiscoveryClientForConfig(),创建一个 DiscoveryClient 对象
    	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
    	if err != nil {
    		panic(err.Error())
    	}
    
    	// 3、使用 DiscoveryClient.ServerGroupsAndResources(),获取所有资源列表
    	_, resourceLists, err := discoveryClient.ServerGroupsAndResources()
    	if err != nil {
    		panic(err.Error())
    	}
    
    	// 4、遍历资源列表,打印出资源组和资源名称
    	for _, resource := range resourceLists {
    		fmt.Printf("resource groupVersion: %s\n", resource.GroupVersion)
    		for _, resource := range resource.APIResources {
    			fmt.Printf("resource name: %s\n", resource.Name)
    		}
    		fmt.Println("--------------------------")
    	}
    }
    
  • 输出结果
    resource groupVersion: v1
    resource name: bindings
    resource name: componentstatuses
    resource name: configmaps
    resource name: endpoints
    resource name: events
    resource name: limitranges
    resource name: namespaces
    resource name: namespaces/finalize
    resource name: namespaces/status
    resource name: nodes
    resource name: nodes/proxy
    resource name: nodes/status
    resource name: persistentvolumeclaims
    resource name: persistentvolumeclaims/status
    resource name: persistentvolumes
    resource name: persistentvolumes/status
    resource name: pods
    resource name: pods/attach
    resource name: pods/binding
    resource name: pods/eviction
    resource name: pods/exec
    resource name: pods/log
    resource name: pods/portforward
    resource name: pods/proxy
    resource name: pods/status
    resource name: podtemplates
    resource name: replicationcontrollers
    resource name: replicationcontrollers/scale
    resource name: replicationcontrollers/status
    resource name: resourcequotas
    resource name: resourcequotas/status
    resource name: secrets
    resource name: serviceaccounts
    resource name: services
    resource name: services/proxy
    resource name: services/status
    --------------------------
    resource groupVersion: apiregistration.k8s.io/v1
    resource name: apiservices
    resource name: apiservices/status
    --------------------------
    resource groupVersion: apiregistration.k8s.io/v1beta1
    resource name: apiservices
    resource name: apiservices/status
    --------------------------
    resource groupVersion: extensions/v1beta1
    resource name: ingresses
    resource name: ingresses/status
    --------------------------
    resource groupVersion: apps/v1
    resource name: controllerrevisions
    resource name: daemonsets
    resource name: daemonsets/status
    resource name: deployments
    resource name: deployments/scale
    resource name: deployments/status
    resource name: replicasets
    resource name: replicasets/scale
    resource name: replicasets/status
    resource name: statefulsets
    resource name: statefulsets/scale
    resource name: statefulsets/status
    --------------------------
    resource groupVersion: events.k8s.io/v1beta1
    resource name: events
    --------------------------
    resource groupVersion: authentication.k8s.io/v1
    resource name: tokenreviews
    --------------------------
    resource groupVersion: authentication.k8s.io/v1beta1
    resource name: tokenreviews
    --------------------------
    resource groupVersion: authorization.k8s.io/v1
    resource name: localsubjectaccessreviews
    resource name: selfsubjectaccessreviews
    resource name: selfsubjectrulesreviews
    resource name: subjectaccessreviews
    --------------------------
    resource groupVersion: authorization.k8s.io/v1beta1
    resource name: localsubjectaccessreviews
    resource name: selfsubjectaccessreviews
    resource name: selfsubjectrulesreviews
    resource name: subjectaccessreviews
    --------------------------
    resource groupVersion: autoscaling/v1
    resource name: horizontalpodautoscalers
    resource name: horizontalpodautoscalers/status
    --------------------------
    resource groupVersion: autoscaling/v2beta1
    resource name: horizontalpodautoscalers
    resource name: horizontalpodautoscalers/status
    --------------------------
    resource groupVersion: autoscaling/v2beta2
    resource name: horizontalpodautoscalers
    resource name: horizontalpodautoscalers/status
    --------------------------
    resource groupVersion: batch/v1
    resource name: jobs
    resource name: jobs/status
    --------------------------
    resource groupVersion: batch/v1beta1
    resource name: cronjobs
    resource name: cronjobs/status
    --------------------------
    resource groupVersion: certificates.k8s.io/v1beta1
    resource name: certificatesigningrequests
    resource name: certificatesigningrequests/approval
    resource name: certificatesigningrequests/status
    --------------------------
    resource groupVersion: networking.k8s.io/v1
    resource name: networkpolicies
    --------------------------
    resource groupVersion: networking.k8s.io/v1beta1
    resource name: ingresses
    resource name: ingresses/status
    --------------------------
    resource groupVersion: policy/v1beta1
    resource name: poddisruptionbudgets
    resource name: poddisruptionbudgets/status
    resource name: podsecuritypolicies
    --------------------------
    resource groupVersion: rbac.authorization.k8s.io/v1
    resource name: clusterrolebindings
    resource name: clusterroles
    resource name: rolebindings
    resource name: roles
    --------------------------
    resource groupVersion: rbac.authorization.k8s.io/v1beta1
    resource name: clusterrolebindings
    resource name: clusterroles
    resource name: rolebindings
    resource name: roles
    --------------------------
    resource groupVersion: storage.k8s.io/v1
    resource name: csinodes
    resource name: storageclasses
    resource name: volumeattachments
    resource name: volumeattachments/status
    --------------------------
    resource groupVersion: storage.k8s.io/v1beta1
    resource name: csidrivers
    resource name: csinodes
    resource name: storageclasses
    resource name: volumeattachments
    --------------------------
    resource groupVersion: admissionregistration.k8s.io/v1
    resource name: mutatingwebhookconfigurations
    resource name: validatingwebhookconfigurations
    --------------------------
    resource groupVersion: admissionregistration.k8s.io/v1beta1
    resource name: mutatingwebhookconfigurations
    resource name: validatingwebhookconfigurations
    --------------------------
    resource groupVersion: apiextensions.k8s.io/v1
    resource name: customresourcedefinitions
    resource name: customresourcedefinitions/status
    --------------------------
    resource groupVersion: apiextensions.k8s.io/v1beta1
    resource name: customresourcedefinitions
    resource name: customresourcedefinitions/status
    --------------------------
    resource groupVersion: scheduling.k8s.io/v1
    resource name: priorityclasses
    --------------------------
    resource groupVersion: scheduling.k8s.io/v1beta1
    resource name: priorityclasses
    --------------------------
    resource groupVersion: coordination.k8s.io/v1
    resource name: leases
    --------------------------
    resource groupVersion: coordination.k8s.io/v1beta1
    resource name: leases
    --------------------------
    resource groupVersion: node.k8s.io/v1beta1
    resource name: runtimeclasses
    --------------------------
    resource groupVersion: discovery.k8s.io/v1beta1
    resource name: endpointslices
    --------------------------
    

5.client-go 完整informer机制 原理

5.1.client-go informer机制概述(重要)

5.1.1.informer是我们使用client-go的基本单位

Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器

  • 需要明确一件事:informer才是client-go的基本单位
    • 从上面架构图中可以看出,client-go中包含的组件主要有:Reflector、DeltaFIFO、Controller、Indexer、Processer。
    • 这个 架构图会迷惑我们,让我们觉得 client-go 只维护了这么一套组件,实现功能。在阅读源码后,我发现,实际上client-go并不是只有一套这种组件,而是以informer作为基本单位,每个informer有自己的一套组件。
    • 每种GVR资源,都有自己对应的informer,我们可以选择性的创建informer并启动,进而仅从apiserver ListAndWatch 自己需要的资源就可以了
    • 每创建一种资源的informer对象,该对象就携带了自己需要的 Reflector、DeltaFIFO、Controller、Indexer、Processer,启动之后,该 informer 作为一个单独的协程启动,负责自己资源的 ListAndWatch、缓存、事件处理 等。
  • 下面我从 informer 的使用角度,印证上面的结论
    • 代码来自 概要分析
      func main() {
          config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
      	if err != nil {
      		klog.Fatalf("Failed to create config: %v", err)
      	}
      	
          // 初始化与apiserver通信的clientset
          clientset, err := kubernetes.NewForConfig(config)
      	if err != nil {
      		klog.Fatalf("Failed to create client: %v", err)
      	}
      	
      	// 初始化shared informer factory以及pod informer
      	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
      	podInformer := factory.Core().V1().Pods()
      	informer := podInformer.Informer()
      	
      	// 注册informer的自定义ResourceEventHandler
      	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      		AddFunc:    xxx,
      		UpdateFunc: xxx,
      		DeleteFunc: xxx,
      	})
      	
      	// 启动shared informer factory,开始informer的list & watch操作
      	stopper := make(chan struct{})
      	go factory.Start(stopper)
      	
      	// 等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中 
      	// 或者调用factory.WaitForCacheSync(stopper)
      	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
      		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
      		return
      	}
      	
      	// 创建lister
      	podLister := podInformer.Lister()
      	// 从informer中的indexer本地缓存中获取对象
      	podList, err := podLister.List(labels.Everything())
      	if err != nil {
      		fmt.Println(err)
      	}
      }
      
    • 我们重点关注这几句
      // 初始化shared informer factory以及pod informer
      factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
      podInformer := factory.Core().V1().Pods()
      informer := podInformer.Informer()
      ......
      go factory.Start(stopper)
      ......
      podLister := podInformer.Lister()
      podList, err := podLister.List(labels.Everything())
      
    • SharedInformerFactory是什么,等会再说。只需要知道他是一个工厂,可以创建 informer
    • factory.Core().V1().Pods()podInformer.Informer(),最终就是创建了一个 pod 资源的 informer,该 informer 会自动保存在 factory 中
    • factory.Start 就是把 factory 中已经创建的所有 informer,都启动起来,每个informer就是一个单独的协程,互不影响,各自进行 ListAndWatch
    • podInformer.Lister()podLister.List() 就是从pod的这个informer中,获取缓存数据。可以看到获取缓存的时候,确定到了某一个具体的 informer

5.1.2.informer 机制的一些重要数据结构

5.1.2.1.informers.SharedInformerFactory
  • 从 5.1.1 中可知,我们要使用client-go操作一种资源,需要创建该资源对应的informer,并启动起来。可创建之后,下次再想使用,再创建一次的话,未免太浪费资源了,因此就推出了这么一种结构 SharedInformerFactory
  • SharedInformerFactory 主要有两部分功能:
    • 作为创建informer的工厂,用于创建所有类型的informer
    • SharedInformerFactory 内部 还会缓存所有已经创建过的informer,下次再创建,就会直接使用缓存,不会再创建新的informer,节省了资源
  • 结构体源码
    type sharedInformerFactory struct {
    	// 这个client,是clientset类型的客户端,用于与apiserver交互
    	client           kubernetes.Interface
    	// 限制 当前SharedInformerFactory 所创建的 Informer 只关注指定命名空间中的资源变化
    	namespace        string
    	tweakListOptions internalinterfaces.TweakListOptionsFunc
    	lock             sync.Mutex
    	defaultResync    time.Duration
    	customResync     map[reflect.Type]time.Duration
    
    	// 缓存已经创建的全部informer
    	informers map[reflect.Type]cache.SharedIndexInformer
    	// 缓存已经启动的 informer,只存 类型:是否启动
    	startedInformers map[reflect.Type]bool
    }
    
  • SharedInformerFactory 缓存了创建的所有 informer,方便一次性启动所有已创建的 informer。SharedInformerFactory 提供了一个 Start 方法,用于启动所有的 informer
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	if f.shuttingDown {
    		return
    	}
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			f.wg.Add(1)
    			// We need a new variable in each loop iteration,
    			// otherwise the goroutine would use the loop variable
    			// and that keeps changing.
    			informer := informer
    			// 可以看到,每个informer,都使用一个单独的协程启动的
    			go func() {
    				defer f.wg.Done()
    				informer.Run(stopCh)
    			}()
    			f.startedInformers[informerType] = true
    		}
    	}
    }
    
5.1.2.2.cache.SharedIndexInformer
  • cache.SharedIndexInformer 接口,SharedInformerFactory中保存的就是这个类型
    type SharedIndexInformer interface {
    	SharedInformer
    	// AddIndexers add indexers to the informer before it starts.
    	AddIndexers(indexers Indexers) error
    	GetIndexer() Indexer
    }
    
  • sharedIndexInformer 类型,是 SharedIndexInformer 接口的实现类,这个就是我们说的 client-go 的基本单位,里面存储着informer的 indexer、controller、processor、listerWatcher 等
    type sharedIndexInformer struct {
    	indexer    Indexer
    	controller Controller
    
    	processor             *sharedProcessor
    	cacheMutationDetector MutationDetector
    
    	listerWatcher ListerWatcher
    
    	// objectType is an example object of the type this informer is
    	// expected to handle.  Only the type needs to be right, except
    	// that when that is `unstructured.Unstructured` the object's
    	// `"apiVersion"` and `"kind"` must also be right.
    	objectType runtime.Object
    
    	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    	// shouldResync to check if any of our listeners need a resync.
    	resyncCheckPeriod time.Duration
    	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    	// value).
    	defaultEventHandlerResyncPeriod time.Duration
    	// clock allows for testability
    	clock clock.Clock
    
    	started, stopped bool
    	startedLock      sync.Mutex
    
    	// blockDeltas gives a way to stop all event distribution so that a late event handler
    	// can safely join the shared informer.
    	blockDeltas sync.Mutex
    
    	// Called whenever the ListAndWatch drops the connection with an error.
    	watchErrorHandler WatchErrorHandler
    }
    
5.1.2.3.XxxInformer
  • 在 5.1.1 中的示例中,我们写了这么两句代码
    podInformer := factory.Core().V1().Pods()
    informer := podInformer.Informer()
    
  • 其中 factory.Core().V1().Pods() 返回的是 PodInformer 接口类型,位于 informers/core/v1/pod.go
    type PodInformer interface {
    	Informer() cache.SharedIndexInformer
    	Lister() v1.PodLister
    }
    
  • informers包下,给每一种GVR,都提供了一个接口类型,用于表示这种 GVR 资源的informer。比如还有 NodeInformer、NamespaceInformer、SecretInformer等
  • 不过这个 PodInformer 并不是我们直接使用的,我们最终使用的还是 cache.SharedIndexInformer 类型,所以 这些资源 xxxInformer 接口,都提供了一个 Informer() cache.SharedIndexInformer 方法,用于获取一个为该资源工作的 cache.SharedIndexInformer
  • 因此,这些资源 xxxInformer 接口,其实就是对 cache.SharedIndexInformer 类型的封装。
  • client-go源码中,/client-go/informers 包下,就是factory结构+所有GVR资源的 informer结构

5.2.优质博客推荐

看完 5.1 的概述,其他源码分析可以先直接看 下面的一些博客。我有时间也会继续分析文章来源地址https://www.toymoban.com/news/detail-819630.html

  • 一图读懂k8s informer client-go
  • 概要分析
  • 初始化与启动分析
  • Reflector源码分析
    • 补充:reflector源码分析
  • DeltaFIFO源码分析
  • Controller&Processor源码分析
  • Indexer源码分析

6.实战:client-go开发自定义控制器

6.1.需求说明

  • 我们希望实现这个效果:
    • 创建或更新Service的时候,如果这个Service的Annotations中,包含了"ingress/http:true",那么在创建或更新这个Service的时候,会自动为它创建一个Ingress。
    • 删除Service的时候,如果这个Service的Annotations中,包含了"ingress/http:true",那么同时也要删除它的 Ingress。
  • 需求分析:
    • 这个效果需要编写三个事件处理方法,addService、updateService、deleteIngress
    • addService/updateService:用户创建或更新service的时候,kubernetes的ServiceController已经完成service的创建或更新了。我们要做的是拿到已存在的service对象,看是否包含 “ingress/http:true”。如果包含,则保证有一个对应的ingress;如果不包含,则保证不能有对应的ingress
    • deleteIngress:用于删除ingress的时候,也触发addService/updateService一样的逻辑,保证service和ingress的对应关系是正确的。
    • 你可能会疑问,为什么没有 deleteService
      • 因为我们会使用 OwnerReferences 将service+ingress关联起来。因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc,所以删除service时我们无需特殊处理。

6.2.代码编写

  • 源码仓库:https://github.com/graham924/share-code-operator-study
    • 位于 addingress 目录下
      Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • main.go 代码如下
    package main
    
    import (
    	"k8s.io/client-go/informers"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/rest"
    	"k8s.io/client-go/tools/clientcmd"
    	"log"
    	"share-code-operator-study/addingress/pkg"
    )
    
    func main() {
    	// 创建一个 集群客户端配置
    	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    	if err != nil {
    		inClusterConfig, err := rest.InClusterConfig()
    		if err != nil {
    			log.Fatalln("can't get config")
    		}
    		config = inClusterConfig
    	}
    
    	// 创建一个 clientset 客户端,用于创建 informerFactory
    	clientset, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		panic(err.Error())
    	}
    
    	// 创建一个 informerFactory
    	factory := informers.NewSharedInformerFactory(clientset, 0)
    	// 使用 informerFactory 创建Services资源的 informer对象
    	serviceInformer := factory.Core().V1().Services()
    	// 使用 informerFactory 创建Ingresses资源的 informer对象
    	ingressInformer := factory.Networking().V1().Ingresses()
    
    	// 创建一个自定义控制器
    	controller := pkg.NewController(clientset, serviceInformer, ingressInformer)
    
    	// 创建 停止channel信号
    	stopCh := make(chan struct{})
    	// 启动 informerFactory,会启动已经创建的 serviceInformer、ingressInformer
    	factory.Start(stopCh)
    	// 等待 所有informer 从 etcd 实现全量同步
    	factory.WaitForCacheSync(stopCh)
    
    	// 启动自定义控制器
    	controller.Run(stopCh)
    }
    
  • controller.go文件如下
    package pkg
    
    import (
    	"context"
    	corev1 "k8s.io/api/core/v1"
    	netv1 "k8s.io/api/networking/v1"
    	"k8s.io/apimachinery/pkg/api/errors"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/util/runtime"
    	"k8s.io/apimachinery/pkg/util/wait"
    	informercorev1 "k8s.io/client-go/informers/core/v1"
    	informernetv1 "k8s.io/client-go/informers/networking/v1"
    	"k8s.io/client-go/kubernetes"
    	listercorev1 "k8s.io/client-go/listers/core/v1"
    	listernetv1 "k8s.io/client-go/listers/networking/v1"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/util/workqueue"
    	"reflect"
    	"time"
    )
    
    const (
    	// worker 数量
    	workNum = 5
    	// service 指定 ingress 的 annotation key
    	annoKey = "ingress/http"
    	// 调谐失败的最大重试次数
    	maxRetry = 10
    )
    
    // 自定义控制器
    type controller struct {
    	client        kubernetes.Interface
    	serviceLister listercorev1.ServiceLister
    	ingressLister listernetv1.IngressLister
    	queue         workqueue.RateLimitingInterface
    }
    
    // NewController 创建一个自定义控制器
    func NewController(clientset *kubernetes.Clientset, serviceInformer informercorev1.ServiceInformer, ingressInformer informernetv1.IngressInformer) *controller {
    	// 控制器中,包含一个clientset、service和ingress的缓存监听器、一个workqueue
    	c := controller{
    		client:        clientset,
    		serviceLister: serviceInformer.Lister(),
    		ingressLister: ingressInformer.Lister(),
    		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
    	}
    
    	// 为 serviceInformer 添加 ResourceEventHandler
    	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		// 添加service时触发
    		AddFunc: c.addService,
    		// 修改service时触发
    		UpdateFunc: c.updateService,
    		// 这里没有删除service的逻辑,因为我们会使用 OwnerReferences 将service+ingress关联起来。
    		// 因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc
    	})
    
    	// 为 ingressInformer 添加 ResourceEventHandler
    	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		// 删除ingress时触发
    		DeleteFunc: c.deleteIngress,
    	})
    
    	return &c
    }
    
    // 添加service时触发
    func (c *controller) addService(obj interface{}) {
    	// 将 添加service 的 key 加入 workqueue
    	c.enqueue(obj)
    }
    
    // 修改service时触发
    func (c *controller) updateService(oldObj interface{}, newObj interface{}) {
    	// 如果两个对象一致,就无需触发修改逻辑
    	if reflect.DeepEqual(oldObj, newObj) {
    		return
    	}
    	// todo 比较annotation
    	// 将 修改service 的 key 加入 workqueue
    	c.enqueue(newObj)
    }
    
    // 删除ingress时触发
    func (c *controller) deleteIngress(obj interface{}) {
    	// 将对象转成ingress,并获取到它的 ownerReference
    	ingress := obj.(*netv1.Ingress)
    	ownerReference := metav1.GetControllerOf(ingress)
    	// 如果ingress的 ownerReference 没有绑定到service,则无需处理
    	if ownerReference == nil || ownerReference.Kind != "Service" {
    		return
    	}
    	// 如果ingress的 ownerReference 已经绑定到service,则需要处理
    	c.enqueue(obj)
    }
    
    // enqueue 将 待添加service 的 key 加入 workqueue
    func (c *controller) enqueue(obj interface{}) {
    	// 调用工具方法,获取 kubernetes资源对象的 key(默认是 ns/name,或 name)
    	key, err := cache.MetaNamespaceKeyFunc(obj)
    	// 获取失败,不加入队列,即本次事件不予处理
    	if err != nil {
    		runtime.HandleError(err)
    		return
    	}
    	// 将 key 加入 workqueue
    	c.queue.Add(key)
    }
    
    // dequeue 将处理完成的 key 出队
    func (c *controller) dequeue(item interface{}) {
    	c.queue.Done(item)
    }
    
    // Run 启动controller
    func (c *controller) Run(stopCh chan struct{}) {
    	// 启动多个worker,同时对workqueue中的事件进行处理,效率提升5倍
    	for i := 0; i < workNum; i++ {
    		// 每个worker都是一个协程,使用同一个停止信号
    		go wait.Until(c.worker, time.Minute, stopCh)
    	}
    	// 启动完成后,Run函数就停止在这里,等待停止信号
    	<-stopCh
    }
    
    // worker方法
    func (c *controller) worker() {
    	// 死循环,worker处理完一个,再去处理下一个
    	for c.processNextItem() {
    
    	}
    }
    
    // processNextItem 处理下一个
    func (c *controller) processNextItem() bool {
    	// 从 workerqueue 取出一个key
    	item, shutdown := c.queue.Get()
    	// 如果已经收到停止信号了,则返回false,worker就会停止处理
    	if shutdown {
    		return false
    	}
    	// 处理完成后,将这个key出队
    	defer c.dequeue(item)
    
    	// 转成string类型的key
    	key := item.(string)
    
    	// 处理service逻辑的核心方法
    	err := c.syncService(key)
    	// 处理过程出错,进入错误统一处理逻辑
    	if err != nil {
    		c.handleError(key, err)
    	}
    	// 处理结束,返回true
    	return true
    }
    
    // handleError 错误统一处理逻辑
    func (c *controller) handleError(key string, err error) {
    	// 如果当前key的处理次数,还不到最大重试次数,则再次加入队列
    	if c.queue.NumRequeues(key) < maxRetry {
    		c.queue.AddRateLimited(key)
    		return
    	}
    
    	// 运行时统一处理错误
    	runtime.HandleError(err)
    	// 不再处理这个key
    	c.queue.Forget(key)
    }
    
    // syncService 处理service逻辑的核心方法
    func (c *controller) syncService(key string) error {
    	// 将 key 切割为 ns 和 name
    	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    	if err != nil {
    		return err
    	}
    
    	// 从indexer中,获取service
    	service, err := c.serviceLister.Services(namespace).Get(name)
    	// 没有service,直接返回
    	if errors.IsNotFound(err) {
    		return nil
    	}
    	if err != nil {
    		return err
    	}
    
    	// 检查service的annotation,是否包含 key: "ingress/http"
    	_, ok := service.Annotations[annoKey]
    	// 从indexer缓存中,获取ingress
    	ingress, err := c.ingressLister.Ingresses(namespace).Get(name)
    
    	if ok && errors.IsNotFound(err) {
    		// ingress不存在,但是service有"ingress/http",需要创建ingress
    		// 创建ingress
    		ig := c.createIngress(service)
    		// 调用controller中的client,完成ingress的创建
    		_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metav1.CreateOptions{})
    		if err != nil {
    			return err
    		}
    	} else if !ok && ingress != nil {
    		// ingress存在,但是service没有"ingress/http",需要删除ingress
    		// 调用controller中的client,完成ingress的删除
    		err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
    		if err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    // createIngress 创建ingress
    func (c *controller) createIngress(service *corev1.Service) *netv1.Ingress {
    	icn := "ingress"
    	pathType := netv1.PathTypePrefix
    	return &netv1.Ingress{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      service.Name,
    			Namespace: service.Namespace,
    			OwnerReferences: []metav1.OwnerReference{
    				*metav1.NewControllerRef(service, corev1.SchemeGroupVersion.WithKind("Service")),
    			},
    		},
    		Spec: netv1.IngressSpec{
    			IngressClassName: &icn,
    			Rules: []netv1.IngressRule{
    				{
    					Host: "example.com",
    					IngressRuleValue: netv1.IngressRuleValue{
    						HTTP: &netv1.HTTPIngressRuleValue{
    							Paths: []netv1.HTTPIngressPath{
    								{
    									Path:     "/",
    									PathType: &pathType,
    									Backend: netv1.IngressBackend{
    										Service: &netv1.IngressServiceBackend{
    											Name: service.Name,
    											Port: netv1.ServiceBackendPort{
    												Number: 80,
    											},
    										},
    									},
    								},
    							},
    						},
    					},
    				},
    			},
    		},
    	}
    }
    

6.3.kubernetes内置控制器源码学习

  • 上面的代码我写了详细注释,建议仔细阅读一遍,注意其中的 NewController()、Run()、worker()、processNextItem()、handleError()、syncService() 这些方法的命名和处理逻辑。
  • 你会发现 kubernetes源码中,内置的控制器和他们是一样的, 即 kubernetes内置控制器 也是使用client-go的这种方式,实现控制器开发的。
  • 下面以 DeploymentController 为例,带大家看一下
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
    Kubernetes operator(一)client-go篇【更新中】,云原生学习专栏,kubernetes,golang,云原生,容器
  • 可以看到,基本都是一样的写法,学到这里,大家就可以自行去看kubernetes各种控制器的源码了

到了这里,关于Kubernetes operator(一)client-go篇【更新中】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • K8S的client-go与Informer机制

    K8S的client-go与Informer机制 client-go是一个包含KubernetesAPI的SDK,它在整个k8s源码中发挥着不可或缺的作用。 2.1 规范 2.1.1 RESTful REST,即Representational State Transfer的缩写。这个词组可以翻译为\\\"表现层状态转化\\\"。 每一个URI代表一种资源; 客户端和服务器之间,传递这种资源的某种表

    2024年02月20日
    浏览(34)
  • client-go实战之十二:选主(leader-election)

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇,又有一个精彩的知识点在本章呈现:选主(leader-election) 在解释什么是选主之前,咱们先来看一个场景(有真实适用场景的技术,

    2024年02月13日
    浏览(35)
  • 云原生|详解Kubernetes Operator在项目中的开发应用

    目录 一、使用场景 (一)client-go中处理逻辑 (二)controller-runtime中处理逻辑 二、使用controller-runtime开发operator项目 (一)生成框架代码 (二)定义crd字段 (三)生成crd文件 (四)初始化manager (五)配置controller (六)配置webhook controller-runtime是基于kubernetes控制器模式衍

    2024年02月04日
    浏览(33)
  • kubernetes|云原生| 如何优雅的重启和更新pod---pod生命周期管理实务

    kubernetes的管理维护的复杂性体现在了方方面面,例如,pod的管理,服务的管理,用户的管理(RBAC),网络的管理等等,因此,kubernetes安装部署完毕仅仅是万里长征的第一步,后面的运营和维护工作才是更为关键的东西。 那么,pod的生命周期是什么概念呢?这

    2024年02月04日
    浏览(51)
  • kubernetes operator解析

    您是否想过站点可靠性工程 (SRE) 团队如何有效地成功管理复杂的应用程序? 在 Kubernetes 生态中,只有一个答案:Kubernetes Operators! 在本文中,我们将研究它们是什么以及它们是如何工作的。 Kubernetes Operator 概念由 CoreOS 的工程师于 2016 年开发,作为一种在 Kubernetes 集群上构建

    2024年02月10日
    浏览(29)
  • Flink Kubernetes Operator 介绍

    Flink Kubernetes Operator是针对在Kubernetes上运行Apache Flink应用程序而设计的工具。它充分利用了Kubernetes的优势,实现了对Flink集群的弹性管理和自动化操作,通过扩展Kubernetes API的方式,提供了管理和操作Flink部署的功能。 1.部署和监控 Flink 应用程序和会话部署:Flink Kubernetes Ope

    2024年01月21日
    浏览(50)
  • Flink On Kubernetes(三)Flink Kubernetes Operator安装

    前面讲了Flink On Kubernetes如何进行技术的选型,这边的话目前选用的是Flink On Kubernetes Application模式,这种模式最好的是资源隔离,就是说如果这个任务出现CPU和内存的突然彪高不会占用其他任务的资源,不会影响其他任务。 很多人可能有疑问,使用Flink On Kubernetes为啥需要先安

    2024年02月02日
    浏览(32)
  • 构建 Kubernetes Operator 的原则是什么?

    Kubernetes(简称K8s)上数据服务的自动化越来越受欢迎。在K8s上运行有状态的工作负载意味着使用Operator。然而,它发展演化到今天已经变得非常复杂,像Operator这样的应用模式和扩展方式对于开发者与运维者而言愈发受到欢迎。 但工程师们经常对编写K8s Operator的复杂性感到吃

    2024年02月03日
    浏览(33)
  • 【大数据】-- 本地部署 Flink kubernetes operator

    目录 1.说明 1.1 版本 1.2 kubernetes 环境 1.3 参考 2.安装步骤 2.1 安装本地 kubernetes 环境

    2024年02月13日
    浏览(31)
  • 【flink进阶】-- Flink kubernetes operator 版本升级

    目录 1、检查当前 flink kubernetes operator 版本  2、停止生产上正在运行的 flink job 3、升级 CRD

    2024年02月07日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包