kubebuilder+code-generator开发k8s的controller

这篇具有很好参考价值的文章主要介绍了kubebuilder+code-generator开发k8s的controller。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文记录用kubebuilder和code-generator开发k8s的crd控制器。

概览

和k8s.io/code-generator类似,是一个码生成工具,用于为你的CRD生成kubernetes-style API实现。区别在于:
Kubebuilder不会生成informers、listers、clientsets,而code-generator会。
Kubebuilder会生成Controller、Admission Webhooks,而code-generator不会。
Kubebuilder会生成manifests yaml,而code-generator不会。
Kubebuilder还带有一些其他便利性设施。

安装kubebuilder

https://github.com/kubernetes-sigs/kubebuilder/releases/tag/v3.13.0
在这里下载kubebuilder对应芯片版本:
kubebuilder+code-generator开发k8s的controller,kubernetes,容器
本身作为一个二进制可执行文件,放入到/usr/local/bin目录下即可。

使用go mod

mkdir example
go mod init gateway
kubebuilder init --domain example.com
kubebuilder edit --multigroup=true

创建API

kubebuilder create api --group app --version v1 --kind Gateway
Create Resource [y/n]
y
Create Controller [y/n]
n

在example目录下创建了,api目录,制定了group和版本以及资源类型。
kubebuilder+code-generator开发k8s的controller,kubernetes,容器
会自动生成apis/app/v1目录,里面有{crd}_types.go和zz_generated.deepcopy.go文件,如果需要配置和修改crd字段,可以修改{crd}_types.go中的crd spec结构体,见下图:
修改{crd}_types.go之后,需用重新执行make manifests重新生成crd,此时zz_generated.deepcopy.go也会同步更新。
kubebuilder+code-generator开发k8s的controller,kubernetes,容器

生成RBAC manifests

在api/app/v1/目录创建rbac.go文件,加入以下内容:

// +kubebuilder:rbac:groups=app.example.com,resources=gateways,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=app.example.com,resources=gateways/status,verbs=get;update;patch

package v1

生成crd manifests

make manifests

这一步会生成config/crd/bases目录,以及该目录下的crd yaml
kubebuilder+code-generator开发k8s的controller,kubernetes,容器

使用code-generator

先下载code-generator

go get -u -v k8s.io/code-generator/...

这里要下下载很多依赖包
kubebuilder+code-generator开发k8s的controller,kubernetes,容器
在后面的编译过程中,需要手动下载一些依赖包到$GOPATH下
kubebuilder+code-generator开发k8s的controller,kubernetes,容器
同时需要升级go到1.21版本,已解决如下问题:
kubebuilder+code-generator开发k8s的controller,kubernetes,容器

准备脚本

在hack目录下新增update-codegen.sh和verify-codegen.sh
其中update-codegen.sh如下
其中:
MODULE=gateway,这里和go.mod保持一致(go mod init gateway)
API_PKG=api,目前工程的api目录,有的可能是api
OUTPUT_PKG=generated/app:输出结果的目录(generated/{group},group是之前kubebuilder create api是指定的group参数)
GROUP_VERSION=app:v1,也是跟kubebuilder create api是的group和version保持一致

#!/usr/bin/env bash

set -o errexit
set -o nounset
set -o pipefail

# corresponding to go mod init <module>
MODULE=gateway
# api package
APIS_PKG=api
# generated output package
OUTPUT_PKG=generated/app
# group-version such as foo:v1alpha1
GROUP_VERSION=app:v1

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 /Users/luoyi/go/src/k8s.io/code-generator 2>/dev/null || echo /Users/luoyi/go/src/k8s.io/code-generator)}
 

# generate the code with:
# --output-base    because this script should also be able to run inside the vendor dir of
#                  k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
#                  instead of the $GOPATH directly. For normal projects this can be dropped.
bash "${CODEGEN_PKG}"/generate-groups.sh "client,lister,informer" \
  ${MODULE}/${OUTPUT_PKG} ${MODULE}/${APIS_PKG} \
  ${GROUP_VERSION} \
  --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt \
  --output-base "${SCRIPT_ROOT}"
#  --output-base "${SCRIPT_ROOT}/../../.." \

这个脚本主要就是制定了一些模块名和运行脚本的参数。本质上与进入到$GOPATH/src/k8s.io,执行如下命令一样:

./generate-groups.sh all /Users/luoyi/k8s/example/generated/app /Users/luoyi/k8s/example/api app:v1

其中,/Users/luoyi/k8s/example/generated/app指定了生成代码的路径,/Users/luoyi/k8s/example/api指定了作用路径,最后一个参数指定了group和版本。all表示client、informer和listener都生成。
在生成代码前还需要做些配置:
在{crd}_type.go上配置tag // +genclient,用于生成clienset

// +genclient
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Gateway is the Schema for the gateways API
type Gateway struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`

        Spec GatewaySpec `json:"spec,omitempty"`

在apis/app/v1下新建doc.go,其中groupName要根据kubebuilder init和kubebuilder create api参数对应修改

// +groupName=app.example.com
package v1

在apis/app/v1下新建register.go,代码如下,无需修改

package v1

import (
    "k8s.io/apimachinery/pkg/runtime/schema"
)

// SchemeGroupVersion is group version used to register these objects.
var SchemeGroupVersion = GroupVersion

func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}

执行hack/update-codegen.sh可以得到clientset/lister/informer

./hack/update-codegen.sh

此时会得到example.com/gateway/generated/app目录,在目录下有clientset、liseters、informers
看到如下创建成功:

mv example.com/gateway/generated generated

kubebuilder+code-generator开发k8s的controller,kubernetes,容器
将example.com/gateway/generated移到项目根目录即可
kubebuilder+code-generator开发k8s的controller,kubernetes,容器

编写main.go

package main

import (
	"flag"
	"time"

	"github.com/golang/glog"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"

	// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
	// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

	clientset "gateway/generated/app/clientset/versioned"
	informers "gateway/generated/app/informers/externalversions"
	"gateway/signals"
)

var (
	masterURL  string
	kubeconfig string
)

func main() {
	flag.Parse()

	// 处理信号量
	stopCh := signals.SetupSignalHandler()

	// 处理入参
	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	if err != nil {
		glog.Fatalf("Error building kubeconfig: %s", err.Error())
	}

	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
	}

	studentClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		glog.Fatalf("Error building example clientset: %s", err.Error())
	}

	studentInformerFactory := informers.NewSharedInformerFactory(studentClient, time.Second*30)

	//得到controller
	controller := NewController(kubeClient, studentClient,
		studentInformerFactory.App().V1().Gateways())

	//启动informer
	go studentInformerFactory.Start(stopCh)

	//controller开始处理消息
	if err = controller.Run(2, stopCh); err != nil {
		glog.Fatalf("Error running controller: %s", err.Error())
	}
}

func init() {
	flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
	flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

编写controller.go

package main

import (
	"fmt"
	"time"

	"github.com/golang/glog"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/util/runtime"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/scheme"
	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"

	bolingcavalryv1 "gateway/api/app/v1"
	clientset "gateway/generated/app/clientset/versioned"
	studentscheme "gateway/generated/app/clientset/versioned/scheme"
	informers "gateway/generated/app/informers/externalversions/app/v1"
	listers "gateway/generated/app/listers/app/v1"
)

const controllerAgentName = "student-controller"

const (
	SuccessSynced = "Synced"

	MessageResourceSynced = "Student synced successfully"
)

// Controller is the controller implementation for Student resources
type Controller struct {
	// kubeclientset is a standard kubernetes clientset
	kubeclientset kubernetes.Interface
	// studentclientset is a clientset for our own API group
	studentclientset clientset.Interface

	studentsLister listers.GatewayLister
	studentsSynced cache.InformerSynced

	workqueue workqueue.RateLimitingInterface

	recorder record.EventRecorder
}

// NewController returns a new student controller
func NewController(
	kubeclientset kubernetes.Interface,
	studentclientset clientset.Interface,
	studentInformer informers.GatewayInformer) *Controller {

	utilruntime.Must(studentscheme.AddToScheme(scheme.Scheme))
	glog.V(4).Info("Creating event broadcaster")
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

	controller := &Controller{
		kubeclientset:    kubeclientset,
		studentclientset: studentclientset,
		studentsLister:   studentInformer.Lister(),
		studentsSynced:   studentInformer.Informer().HasSynced,
		workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Students"),
		recorder:         recorder,
	}

	glog.Info("Setting up event handlers")
	// Set up an event handler for when Student resources change
	studentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueStudent,
		UpdateFunc: func(old, new interface{}) {
			oldStudent := old.(*bolingcavalryv1.Gateway)
			newStudent := new.(*bolingcavalryv1.Gateway)
			if oldStudent.ResourceVersion == newStudent.ResourceVersion {
				//版本一致,就表示没有实际更新的操作,立即返回
				return
			}
			controller.enqueueStudent(new)
		},
		DeleteFunc: controller.enqueueStudentForDelete,
	})

	return controller
}

// 在此处开始controller的业务
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer runtime.HandleCrash()
	defer c.workqueue.ShutDown()

	glog.Info("开始controller业务,开始一次缓存数据同步")
	if ok := cache.WaitForCacheSync(stopCh, c.studentsSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	glog.Info("worker启动")
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	glog.Info("worker已经启动")
	<-stopCh
	glog.Info("worker已经结束")

	return nil
}

func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}

// 取数据处理
func (c *Controller) processNextWorkItem() bool {

	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool

		if key, ok = obj.(string); !ok {

			c.workqueue.Forget(obj)
			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// 在syncHandler中处理业务
		if err := c.syncHandler(key); err != nil {
			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
		}

		c.workqueue.Forget(obj)
		glog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		runtime.HandleError(err)
		return true
	}

	return true
}

// 处理
func (c *Controller) syncHandler(key string) error {
	// Convert the namespace/name string into a distinct namespace and name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	// 从缓存中取对象
	student, err := c.studentsLister.Gateways(namespace).Get(name)
	if err != nil {
		// 如果Student对象被删除了,就会走到这里,所以应该在这里加入执行
		if errors.IsNotFound(err) {
			glog.Infof("Student对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name)

			return nil
		}

		runtime.HandleError(fmt.Errorf("failed to list student by: %s/%s", namespace, name))

		return err
	}

	glog.Infof("这里是student对象的期望状态: %#v ...", student)
	glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)")

	c.recorder.Event(student, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}

// 数据先放入缓存,再入队列
func (c *Controller) enqueueStudent(obj interface{}) {
	var key string
	var err error
	// 将对象放入缓存
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}

	// 将key放入队列
	c.workqueue.AddRateLimited(key)
}

// 删除操作
func (c *Controller) enqueueStudentForDelete(obj interface{}) {
	var key string
	var err error
	// 从缓存中删除指定对象
	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		runtime.HandleError(err)
		return
	}
	//再将key放入队列
	c.workqueue.AddRateLimited(key)
}

信号处理

package signals

import (
	"os"
	"os/signal"
)

var onlyOneSignalHandler = make(chan struct{})

func SetupSignalHandler() (stopCh <-chan struct{}) {
	close(onlyOneSignalHandler) // panics when called twice

	stop := make(chan struct{})
	c := make(chan os.Signal, 2)
	signal.Notify(c, shutdownSignals...)
	go func() {
		<-c
		close(stop)
		<-c
		os.Exit(1) // second signal. Exit directly.
	}()

	return stop
}
//go:build !windows
// +build !windows

package signals

import (
	"os"
	"syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

最后运行:

./gateway -kubeconfig=$HOME/.kube/config -alsologtostderr=true

kubeconfig是k8s集群的默认配置路径。
kubebuilder+code-generator开发k8s的controller,kubernetes,容器文章来源地址https://www.toymoban.com/news/detail-796383.html

到了这里,关于kubebuilder+code-generator开发k8s的controller的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • k8s拉取私有仓库镜像失败:rpc error: code = Unknown desc = failed to pull and unpack image【20221121】

    k8s拉取镜像并不是通过docker拉取,而是通过 crictl拉取的。 失败原因如下: 解决方法: 1、先拷贝一份 2、修改/etc/containerd/config.toml 找到plugins.“io.containerd.grpc.v1.cri”.registry的位置 修改之前: 修改之后: 3、 内容: 4、重启 5、再次拉取

    2024年02月11日
    浏览(53)
  • K8S管理系统项目实战[API开发]-1

    前端: Vue+element plus 后端: go+gin kubernetes v1.24.2 golang v1.18.3 后端代码地址GitHub - yunixiangfeng/k8s-platform: K8s管理系统后端: go+gin Go 快速入门  Gin Web框架 K8s管理系统项目实战[API开发]  项目背景,整体设计,Client-go,框架搭建 随着容器技术的广泛应用,kubernetes逐渐成为业内的核心技

    2024年02月12日
    浏览(45)
  • 【系统开发】尚硅谷 - 谷粒商城项目笔记(十一):K8S

    Kubernetes(k8s)中文文档 Kubernetes概述_Kubernetes中文社区 Kubernetes官网 Kubernetes 是一个可移植的、可扩展的开源平台,用于管理容器化的工作负载和服务,可促进声明式配置和自动化。 Kubernetes 拥有一个庞大且快速增长的生态系统。Kubernetes 的服务、支持和工具广泛可用。 Kuber

    2024年02月11日
    浏览(59)
  • k8s scheduler开发实战(二):带插件参数版

    如果文章对你有用,可以点赞、好评、转发。 文章是在公司写的,代码都被加密了,也不能上传github,所以源代码就没上传了,不过按照文章来一步一步实现就行,如果有问题,评论区交流 建议全屏阅读,因为重要内容都写在代码里,代码行比较长 二:第一版的基础上,在

    2024年02月21日
    浏览(33)
  • 自建K8S一年多没用,忽然想使用下。kubelet启动失败,报错:main process exited, code=exited, status=255/n/a

    通过system status kubelet 查看报错,基本没有什么有效信息,所以使用 就可以看到以下报错:Jan 16 20:49:17 master kubelet[3824]: I0116 20:49:17.402577    3824 server.go:425] Version: v1.15.0 Jan 16 20:49:17 master kubelet[3824]: I0116 20:49:17.402770    3824 plugins.go:103] No cloud provider specified. Jan 16 20:49:17 master k

    2024年01月24日
    浏览(57)
  • 将Go语言开发的Web程序部署到K8S

    如果已经有K8S环境的同学可以跳过,如果没有,推荐你看看我的《Ubuntu22加Minikue搭建K8S环境》,课程目录如下: 下载:https://code.visualstudio.com/Download 安装命令: 下载:https://studygolang.com/dl 将其上传到Downloads目录下。 解压: 将其移动到特定目录: 配置环境变量: 激活环境变

    2024年02月03日
    浏览(64)
  • 高阶k8s二次开发教程 -- 通过阅读Istio源码习得

    本篇文章全网几乎找不到,在做深层次的k8s二次开发时非常管用。那就是使用Client-go去访问自定义CRD资源。 我们先使用kubebuilder生成一个CRD,论生成CRD这些,还是kubebuilder更加方便。 Kubernetes API(在本例中为Project和ProjectList)提供的每种类型都需要实现该k8s.io/apimachinery/pkg/r

    2024年02月15日
    浏览(40)
  • python开发基础篇1——后端操作K8s API方式

    操作K8s资源api方式: 原生api 客户端库,python客户端库 K8s支持三种客户端身份认证: HTTPS 证书认证:基于CA证书签名的数字证书认证(kubeconfig文件,默认路径~/.kube/config) HTTP Token认证:通过一个Token来识别用户(ServiceAccount) HTTP Base认证:用户名+密码的方式认证(1.19+已经弃

    2024年02月09日
    浏览(29)
  • Spring Cloud 使用 k8s 作为注册中心 开发环境 和 生产环境

    因为 k8s 本身就有拥有注册中心,和配置中心的功能。如果还是用 Nacos、Eureka、Consul 之类的注册中心组件,就有点冗余了。当然这些组件还是可以继续用的。 所以,本教程,教授 Spring Cloud 使用 k8s 的注册中心。在开发环境和生产环境 的教程! 下面以一个最简单的 服务消费

    2024年02月15日
    浏览(61)
  • 云原生之深入解析如何使用Devtron简化K8S应用开发

    ① 什么是 NeuVector ? NeuVector 是业界首个端到端的开源容器安全平台,唯一为容器化工作负载提供企业级零信任安全的解决方案。NeuVector 是业界领先的安全和合规解决方案,已被全球知名企业广泛采用;其代码库的开源不仅使 NeuVector 成为开源社区的首选技术,还为受严格监

    2024年02月13日
    浏览(67)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包