名字服务Polaris中服务发现详解

这篇具有很好参考价值的文章主要介绍了名字服务Polaris中服务发现详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

源码地址:https://github.com/polarismesh/polaris-controller/blob/main/README-zh.md

通过mesh配置文件设置controller的配置管理对象
https://fankangbest.github.io/2017/10/12/kubernetes-client%E5%88%86%E6%9E%90(%E4%B8%80)-kubeconfig-v1-5-2/

下面就从源码开始分析polaris是怎么通过进行服务发现的

polaris通过k8s的扩展api机制自定义了controller实现,下面选取一些关键代码进行分析

初始化controller

对每个资源增加创建、更新、删除操作的监控回调方法:

p := PolarisController{
   client: client,
   queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
      polarisControllerName),
   workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onServiceAdd,
   UpdateFunc: p.onServiceUpdate,
   DeleteFunc: p.onServiceDelete,
})

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onEndpointAdd,
   UpdateFunc: p.onEndpointUpdate,
   DeleteFunc: p.onEndpointDelete,
})

namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: p.onNamespaceAdd,
   UpdateFunc: p.onNamespaceUpdate,
})

上面是典型的k8s资源监听代码

以service创建为例:

当k8s中一个service被创建成功之后,就会调用polaris的这个onServiceAdd方法:

func (p *PolarisController) onServiceAdd(obj interface{}) {

   service := obj.(*v1.Service)

   if !util.IsPolarisService(service, p.config.PolarisController.SyncMode) {
      return
   }

   key, err := util.GenServiceQueueKey(service)
   if err != nil {
      log.Errorf("generate queue key for %s/%s error, %v", service.Namespace, service.Name, err)
      return
   }

   p.enqueueService(key, service, "Add")
}

逻辑如下:

  • 判断是否可以注册为北极星服务

  • 根据该service生成相应的key

  • 将该key放入workqueue

启动controller

func (p *PolarisController) Run(workers int, stopCh <-chan struct{}) {
   defer runtime.HandleCrash()
   defer p.queue.ShutDown()
   defer p.consumer.Destroy()
   defer p.provider.Destroy()

   defer log.Infof("Shutting down polaris controller")

   if !cache.WaitForCacheSync(stopCh, p.podsSynced, p.servicesSynced, p.endpointsSynced, p.namespaceSynced) {
      return
   }

   p.CounterPolarisService()

   for i := 0; i < workers; i++ {
      go wait.Until(p.worker, p.workerLoopPeriod, stopCh)
   }

   //定时任务
   go p.MetricTracker(stopCh)

   <-stopCh
}

逻辑如下:

  • 等待k8s资源cache同步完成

  • 统计k8s服务资源能够注册为北极星服务的数量:通过k8s接口获取所有k8s服务,对每个service判断是否可以转换为北极星service

  • 启动多个work协程,每个协程处理流程如下:

    • 从workqueue中获取元素key

    • 从key中解析出namespace、service名等信息

    • 根据namespace和service名从informer的cache中获取service

    • 根据service是否存在做不同的处理

    • 如果不存在,则调用北极星接口创建相应的namespace、service等

  • work工作协程会一直轮询中,直到收到stop信号,也就是说会一直轮询取消workqueue中的元素进行上述的处理工作

informer回调

从上面我们知道,在controller的初始化和启动方法中,分别对workqueue进行push和pop元素,那么workqueue的push操作所在的回调方法是什么时候触发的呢?

我们看到在polaris-controller-manager的run方法中,还有这样的逻辑:

controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.GenericInformerFactory.Start(controllerContext.Stop)

答案就是在这里,具体我们来分析下:
经过层层调用,最终在这里,我们看到,

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()

   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

   cfg := &amp;Config{
      Queue: fifo,
      ListerWatcher: s.listerWatcher,
      ObjectType: s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError: false,
      ShouldResync: s.processor.shouldResync,

      Process: s.HandleDeltas,
   }

   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()

   // Separate stop channel because Processor should be stopped strictly after controller
   processorStopCh := make(chan struct{})
   var wg wait.Group
   defer wg.Wait() // Wait for Processor to stop
   defer close(processorStopCh) // Tell Processor to stop
   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
   wg.StartWithChannel(processorStopCh, s.processor.run)

   defer func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()
      s.stopped = true // Don't want any new listeners
   }()
   s.controller.Run(stopCh)
}

上面的主要逻辑如下:

(1)调用NewDeltaFIFO,初始化DeltaFIFO;
(2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法;
(3)调用New,利用Config结构体来初始化controller;
(4)调用s.processor.run,启动processor;
(5)调用s.controller.Run,启动controller;

启动procesor

func (p *processorListener) run() {
   // this call blocks until the channel is closed. When a panic happens during the notification
   // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   // the next notification will be attempted. This is usually better than the alternative of never
   // delivering again.
   stopCh := make(chan struct{})
   wait.Until(func() {
      // this gives us a few quick retries before a long pause and then a few more quick retries
      err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
         for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
               p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
               p.handler.OnAdd(notification.newObj)
            case deleteNotification:
               p.handler.OnDelete(notification.oldObj)
            default:
               utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
         }
         // the only way to get here is if the p.nextCh is empty and closed
         return true, nil
      })

      // the only way to get here is if the p.nextCh is empty and closed
      if err == nil {
         close(stopCh)
      }
   }, 1*time.Minute, stopCh)
}

启动informer的controller

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.clock = c.clock

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   defer wg.Wait()

   wg.StartWithChannel(stopCh, r.Run)

   wait.Until(c.processLoop, time.Second, stopCh)
}

上面的主要逻辑是:

(1)调用NewReflector,初始化Reflector;
(2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector;
(3)调用c.processLoop,开始controller的核心处理;

启动reflector
func (r *Reflector) Run(stopCh <-chan struct{}) {
   klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
   wait.Until(func() {
      if err := r.ListAndWatch(stopCh); err != nil {
         utilruntime.HandleError(err)
      }
   }, r.period, stopCh)
}

主要就是从kube-apiserver处做list&watch操作,然后将得到的对象封装存储进DeltaFIFO中。

调用自定义eventhandler
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来,然后调用c.config.Process方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent将对象重新加入到DeltaFIFO中去。

c.config.Process其实就是sharedIndexInformer.HandleDeltas。
HandleDeltas方法中,将从DeltaFIFO中pop出来的对象以及类型,相应的在indexer中做添加、更新、删除操作,并调用s.processor.distribute通知自定义的ResourceEventHandler。

https://cloudsre.me/2020/03/client-go-0-informer/
https://qiankunli.github.io/2020/07/20/client_go.html
https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html
http://dockerone.com/article/2434596
https://www.cnblogs.com/lianngkyle/p/16244872.html文章来源地址https://www.toymoban.com/news/detail-572552.html

到了这里,关于名字服务Polaris中服务发现详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • eureka服务注册和服务发现

    我们要在orderservice中根据查询到的userId来查询user,将user信息封装到查询到的order中。 一个微服务,既可以是服务提供者,又可以是服务消费者,因此eureka将服务注册、服务发现等功能统一封装到了eureka-client端

    2024年02月10日
    浏览(36)
  • 微服务之SpringCloud服务注册与发现

    目录 一,前言 二,Eureka实现服务注册与发现 单机Eureka服务注册流程 集群模式搭建模拟 Eureka自我保护机制 三,Zookeeper实现服务注册与发现 四,Consul是实现服务注册与发现 基本简介 安装 服务注册测试 ​五,三个注册中心的异同点 什么是服务治理 Spring Cloud 封装了Netflix公司

    2024年02月02日
    浏览(42)
  • nodejs微服务:服务发现与Consul

    服务发现 我们在做微服务开发的时候,客户端的一个接口可能需要调用N个服务,客户端必须知道所有服务的网络位置(ip+port),如下所示 客户端 Client 服务A (IP 1, PROT 1) 服务B (IP 2, PROT 2) 服务C (IP 3, PROT 3) … 服务N (IP N, PROT N) 客户端需要知道目前所有可用的服务端的ip和端口

    2023年04月09日
    浏览(32)
  • 基于 Zookeeper 实现服务注册和服务发现

    无论是采用SOA还是微服务架构,都需要使用服务注册和服务发现组件。我刚开始接触 Dubbo 时一直对服务注册/发现以及 Zookeeper 的作用感到困惑,现在看来是因为对分布式系统的理解不够深入,对 Dubbo 和 Zookeeper 的工作原理不够清楚。 本文将基于 Zookeeper 实现服务注册和服务发

    2024年02月10日
    浏览(40)
  • .net core微服务之服务发现

    一:nacos https://nacos.io/docs/latest/what-is-nacos/ https://github.com/alibaba/nacos 二:consul https://developer.hashicorp.com/consul/docs?product_intent=consul https://github.com/hashicorp/consul 服务发现的框架常用的还有zookeeper eureka等,这里准备使用nacos 前置条件准备 docker,yaml View Code 使用docker启动mysql View C

    2024年02月20日
    浏览(27)
  • 服务发现:在区块链中的应用:如何通过服务发现来提高区块链应用效率和客户满意度

    作者:禅与计算机程序设计艺术 随着数字货币、区块链技术的普及,以及互联网公司对其功能的整合,传统的单体应用模式已不能适应这一趋势,需要逐渐向分布式架构转型。分布式架构通常使用微服务架构的方式,将一个系统分解成多个小模块,各自独立运行且互相通信。

    2024年02月11日
    浏览(38)
  • 【微服务】Nacos:发现、配置和管理微服务

    Nacos 在阿里巴巴起源于 2008 2008 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十一的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。 随着云计算兴起, 2018 2018 2018 年 Nacos(阿里内部 Configserver/Diamond/ Vipserver 内核)开源,作为阿里十年

    2024年02月07日
    浏览(33)
  • 【】Syncthing搭建自己的中继服务和发现服务

    syncthing除了客户端,还有中继服务器和发现服务器,如果单纯的只安装客户端,也不是不能用,只不过你的文件要走别人的服务器,才能会进行同步,而且,同步的前提,是你的两台客户端主机都开机。那么怎样把syncthing搞成类似于ondriver或者百度网盘的同步文件夹呢,往下

    2024年02月16日
    浏览(36)
  • 【微服务】服务发现和管理技术框架选型调研

    结合实际业务和开发需要,着重考虑性能可靠性、功能和社区支持程度三方面,认为 Nacos更适合作为服务发现和管理的技术框架 。具体理由如下: 性能更好,可靠性更高  经过阿里、APISIX、SpringCloudAlibaba,阿里内部的钉钉、考拉、饿了么、优酷等业务验证 Nacos 在开源版本中,

    2024年02月10日
    浏览(43)
  • 第五章 商品服务发现与客户服务通信

    1、多个docker共享同一目录如何区分服务         这里我们可以利用一个shell脚本设置. env 内容处理 2、增加consul监控检查         正常来说consul是会向服务去做心跳检测的;这个操作很简单只需要基于consul的参数进行配置即可处理好,当然在这之前我们需要了解当前rp

    2024年02月13日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包