深入源码分析kubernetes informer机制(二)Reflector

这篇具有很好参考价值的文章主要介绍了深入源码分析kubernetes informer机制(二)Reflector。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


[阅读指南]
这是该系列第二篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


Reflector是什么

reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下一个环节处理。

整体结构

与api-server进行交互,通过list获取指定的全量资源,watch监听指定的资源变化事件,并将这些事件放入delta FIFO队列中。
结构与交互如下图
深入源码分析kubernetes informer机制(二)Reflector,kubernetes,容器,云原生,client-go,reflector

// 省略了部分字段,只留下我们关注的
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // reflector对象需要监控的资源类型,比如上一节workqueue中的&v1.Pod{}
    expectedType reflect.Type
    
    // deltaFIFO 队列存储对象
    store Store
    
    // 实现list/watch
    listerWatcher ListerWatcher
    
    // 上次更新的资源版本号,用来判断当前的node的资源状况
    lastSyncResourceVersion string
    ......
}

工作流程

reflecter主函数比较简单,循环同步运行ListAndWatch直到收到stop信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了这几件事:

  1. 通过stream或者chunk方式拉取全量list数据
  2. 开启一个协程进行缓存resync操作。
  3. 循环执行watch监听操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	...
    
	fallbackToList := !r.UseWatchList

    // stream式同步
	if r.UseWatchList {
		w, err = r.watchList(stopCh)
    	...
		if err != nil {
            ...
			fallbackToList = true
			w = nil
		}
	}

    // chunk式同步
	if fallbackToList {
		err = r.list(stopCh)
		if err != nil {
			return err
		}
	}

	...
	go r.startResync(stopCh, cancelCh, resyncerrc)
	return r.watch(w, stopCh, resyncerrc)
}

接下来咱一步步来看。

list拉取数据

ListAndWatch拉取全量数据时,出现了两种数据拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通过 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 变量可以启用stream list,默认会使用原有的list/watch。后续会单独开一篇介绍stream list方案,详情可以通过KEP-3157了解

前者在初始化时list拉取全量数据,通过watch更新增量变化。
后者可以通过watch 请求的方式获取list数据,从而减轻大规模集群初始化list数据时的资源消耗。
在建立watch连接时,携带如下两个参数即可告知服务器使用streaming list进行一致性读取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常规的list流程借用这个博主画的时序图来看下。
深入源码分析kubernetes informer机制(二)Reflector,kubernetes,容器,云原生,client-go,reflector

缓存resync操作

resync负责定期将本地的缓存重新加入deltaFIFO队列,确保本地缓存与controller的数据一致性。

国内太多博客没了解清楚就介绍这一部分是与api-server交互,进行relist。实际上resync完全没有涉及到服务端的部分,他就是一个本地缓存的同步机制。与服务端的交互使用list/watch已经完全可以确保资源一致性了,基本不怎么需要进行relist操作,并且对于节点非常多的大集群来说,list非常消耗资源,何况是定期relist呢。

关于resync机制的介绍,不在这里展开,详细看下一篇笔记。

watch监听操作

watch的实现非常巧妙,它利用了http的chunk编码传输机制建立长连接,来实现动态的数据监听,可以了解分块传输编码。
同样借用一张时序图来看下watch的流程
深入源码分析kubernetes informer机制(二)Reflector,kubernetes,容器,云原生,client-go,reflector

reflector通过Watcher监听api-server端的数据delta事件,并将这些事件放入deltaFIFO中统一处理。

// 在这里向服务端发起watch请求,并接收和处理资源变更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	...
    
	for {
        ...

        // w == nil表示使用常规的list/watch方式,streaming 方式会创建特殊的watcher
		if w == nil {
			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
			options := metav1.ListOptions{
                // 上次同步的资源版本,也就是本地的资源版本。以此来获取增量的数据
				ResourceVersion: r.LastSyncResourceVersion(),
                // watch 超时时间,长时间没有接受任务事件的watcher会被关掉,避免长时间挂起。
				TimeoutSeconds: &timeoutSeconds,
                // watch书签,避免watch重启时请求api-server导致的消耗。
				AllowWatchBookmarks: true,
			}
        	// 创建一个watch对象,监听api-server的资源变更事件,将接收到的事件丢进resultChan中
			w, err = r.listerWatcher.Watch(options)
        	...
		}

        // 将resultChan中的取出放入FIFO 队列
		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
		
        // 失败重试逻辑
        ...
		
	}
}

建立连接的逻辑在这一行
w, err = r.listerWatcher.Watch(options)

还是用上一篇workqueue来看看这个Watch实例的实现。
从Watch函数一路往上追溯,可以看到先是与server建立了http连接,再通过watch标记建立了watch连接,创建stream watcher对象,并拉起一个协程去处理监听到的事件信息。

  • 此后所有监听的delta事件都会经过receive协程进入到resultChan中。
// reflector调用的watch函数
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return lw.WatchFunc(options)
}

// watchFunc函数的定义
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	...
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true // 向服务端请求chunk连接
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO()) // 这里调用了getter的watch函数
                                    // getter是controller初始化时建立的http客户端: clientset.CoreV1().RESTClient()
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}


func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
    ...	
    
	url := r.URL().String()
	for {
		req, err := r.newHTTPRequest(ctx)

		resp, err := client.Do(req)
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp)
		}
		...
	}
}

func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		result: make(chan Event),
		done: make(chan struct{}),
	}
	go sw.receive() // 处理消息事件的协程
	return sw
}

// 解析接收到的事件,并放到resultChan中等待后续处理。
func (sw *StreamWatcher) receive() {
	for {
        // 解析数据
		action, obj, err := sw.source.Decode()
		select {
		case <-sw.done:
			return
        // 将事件发送到resultChan
		case sw.result <- Event{
			Type:   action,
			Object: obj,
		}:
		}
	}
}
  • 进入resultChan的事件,由watchHandler取出再分类添加到FIFO队列中。
func watchHandler(start time.Time,
	w watch.Interface,	// watch实例
	store Store, // 存储对象 比如delta FIFO queue
	...
) error {
	...
loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err

        // 从ResultChan中取出变更事件,并放进队列中,比如delta FIFO队列中
		case event, ok := <-w.ResultChan():
			// 省略了一些资源过滤和错误处理
            ...

            // 解析监听到的事件数据
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
				continue
			}

            // 解析资源事件的版本
			resourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added: 
				err := store.Add(event.Object) // 往队列添加add delta事件
            	... // err handle
			case watch.Modified: 
				err := store.Update(event.Object) // 往队列添加update delta事件
				... // err handle
			case watch.Deleted:	
				err := store.Delete(event.Object) // 往队列添加delete delta事件,在此之前会判断事件对应的资源对象是否存在
				... // err handle
			case watch.Bookmark:
            	...
			default:
				... // err handle
			}
            
            // 更新resourceVersion版本号,下一轮watch就不会再收到重复的更新事件
			setLastSyncResourceVersion(resourceVersion)
			if rvu, ok := store.(ResourceVersionUpdater); ok {
				rvu.UpdateResourceVersion(resourceVersion)
			}
            
			...
		}
	}

    ...
	return nil
}

总结

用一个图来回顾下reflector各个模块的关系~
深入源码分析kubernetes informer机制(二)Reflector,kubernetes,容器,云原生,client-go,reflector文章来源地址https://www.toymoban.com/news/detail-654070.html

到了这里,关于深入源码分析kubernetes informer机制(二)Reflector的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入分析Spring的IoC容器:从底层源码探索

    前言: 博主在最近的几次面试中,大中小厂都问到了Spring的ioc容器相关问题,这块知识确实是面试中的重点内容,因此结合所看的书籍,在这篇文章中总结下。该专栏比较适合刚入坑Java的小白以及准备秋招的大佬阅读,感谢大佬的关注。 如果文章有什么需要改进的地方欢迎

    2024年02月12日
    浏览(40)
  • 【云原生-深入理解 Kubernetes 系列 3】深入理解容器进程的文件系统

    【云原生-深入理解Kubernetes-1】容器的本质是进程 【云原生-深入理解Kubernetes-2】容器 Linux Cgroups 限制 大家好,我是秋意零。 😈 CSDN作者主页 😎 博客主页 👿 简介 👻 普通本科生在读 在校期间参与众多计算机相关比赛,如:🌟 “省赛”、“国赛” ,斩获多项奖项荣誉证书

    2024年02月06日
    浏览(39)
  • Kubernetes日志查看指南:深入了解容器日志管理技术

    简介: Kubernetes(简称K8s)已成为现代容器化应用程序管理的主要平台之一。了解如何有效地查看和管理Kubernetes集群中的容器日志对于故障排除、性能优化和安全监控至关重要。本文将向您介绍一些基本的技术和工具,帮助您在Kubernetes环境中查看和分析容器日志。 查看单个

    2024年02月12日
    浏览(26)
  • 【云原生-深入理解Kubernetes-1】容器的本质是进程

    大家好,我是秋意零。 😈 CSDN作者主页 😎 博客主页 👿 简介 👻 普通本科生在读 在校期间参与众多计算机相关比赛,如:🌟 “省赛”、“国赛” ,斩获多项奖项荣誉证书 🔥 各个平台, 秋意零/秋意临 账号创作者 🔥 云社区 创建者 点赞、收藏+关注下次不迷路! 欢迎加

    2024年02月02日
    浏览(46)
  • 云原生之深入解析Kubernetes中如何使用临时容器进行故障排查

    容器及其周围的生态系统改变了工程师部署、维护和排查工作负载故障的方式。但是,在 Kubernetes 集群上调试应用程序有时可能会很困难,因为可能在容器中找不到所需的调试工具。许多工程师使用基于精简、发行版构建无发行版的基础镜像,其中甚至没有包管理器或shell,

    2024年02月05日
    浏览(39)
  • 云原生之深入解析如何正确计算Kubernetes容器CPU使用率

    使用 Prometheus 配置 kubernetes 环境中 Container 的 CPU 使用率时,会经常遇到 CPU 使用超出 100%,现在来分析一下: container_spec_cpu_period:当对容器进行 CPU 限制时,CFS 调度的时间窗口,又称容器 CPU 的时钟周期通常是 100000 微秒 container_spec_cpu_quota:是指容器的使用 CPU 时间周期总量

    2024年02月10日
    浏览(56)
  • kubernetes--分析容器系统调用:Sysdig

    目录 Sysdig介绍: sysdig工作流程 安装Sysdig sysdig常用参数: sysdig过滤: sysdig之Chisels(工具箱): 其他常用命令        Sysdig是一个非常强大的系统监控分析和故障排查工具。汇聚strace+tcpdump+iftop+lsof工具功能为一身。        sysdig除了能获取系统资源利用率、进程、网络连

    2024年02月05日
    浏览(31)
  • 源码分析——ArrayList源码+扩容机制分析

    ArrayList 的底层是数组队列,相当于动态数组。与 Java 中的数组相比,它的容量能动态增长。在添加大量元素前,应用程序可以使用 ensureCapacity 操作来增加 ArrayList 实例的容量。这可以减少递增式再分配的数量。 ArrayList 继承于 AbstractList ,实现了 List , RandomAccess , Cloneable , ja

    2024年02月14日
    浏览(35)
  • 深入源码解析ArrayList:探秘Java动态数组的机制与性能

    1.1 介绍ArrayList的基本概念和作用 在Java中,ArrayList是一个实现了List接口的动态数组。它可以根据需要自动增加大小,因此可以存储任意数量的元素。 基本概念: ArrayList是Java中常用的集合类之一,它可以存储对象,并且可以根据索引访问和操作这些对象。 ArrayList是基于数组

    2024年02月04日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包