Nacos 1.4.x 服务发现源码阅读

这篇具有很好参考价值的文章主要介绍了Nacos 1.4.x 服务发现源码阅读。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

客户端

关键属性

HostReactor
  • Map<String, ScheduledFuture<?>> futureMap:缓存向服务端请求ServiceInfo的定时任务
  • Map<String, ServiceInfo> serviceInfoMap:缓存从服务端获取的Service信息
  • Map<String, Object> updatingMap:用来标记是是否存在其他请求向服务端以相同的条件请求ServiceInfo
ServiceManager
  • Map<String, Map<String, Service>> serviceMap:服务端缓存注册信息的Map:Map(namespace, Map(group::serviceName, Service)).

NacosNamingService.getAllInstances

获取所有的Instance集合

  • boolean subscribe:true,请求结果需要进行缓存并设置定时任务更新缓存;false:直接从服务端查询并返回结果
	public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
            boolean subscribe) throws NacosException {
        
        ServiceInfo serviceInfo;
        if (subscribe) {
        	//请求结果需要进行缓存并设置定时任务更新缓存
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
        	// 直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务
            serviceInfo = hostReactor
                    .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
HostReactor.getServiceInfoDirectlyFromServer

直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务

    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
            throws NacosException {
        String result = serverProxy.queryList(serviceName, clusters, 0, false);
        if (StringUtils.isNotEmpty(result)) {
            return JacksonUtils.toObj(result, ServiceInfo.class);
        }
        return null;
    }
HostReactor.getServiceInfo

根据serviceName和cluster集合获取实例集合,请求结果需要进行缓存并设置定时任务更新缓存
1.从客户端缓存(HostReactor.serviceInfoMap)中获取ServiceInfo
2.本地缓存中不存在对应的值,新建ServiceInfo实例并加入serviceInfoMap中,同时将serviceName加入updatingMap中并向服务端请求获取Instance集合并更新ServiceInfo,更新完成后从updatingMap中删除这个serviceName
3.本地缓存中存在对应的ServiceInfo,且updatingMap中有serviceName时,说明上一个请求正在请求服务端获取信息,调用wait方法,上一个请求更新完内存后会调用notifyAll方法
4.调用scheduleUpdateIfAbsent方法定时向服务端请求并更新serviceInfoMap的值

	public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        // 说明存在请求在向服务端请求Instance
        } else if (updatingMap.containsKey(serviceName)) {
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 添加定时任务,向服务端请求并更新serviceInfoMap的值
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }
    
HostReactor.updateServiceNow

// 从服务端获取信息并保存到客户端内存中

    private void updateServiceNow(String serviceName, String clusters) {
        try {
            updateService(serviceName, clusters);
        } catch (NacosException e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

	// 从服务端获取信息并保存到客户端内存中
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            // 请求服务端获取信息
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            // 处理从服务端获得的消息(将结果保存到HostReactor.serviceInfoMap中)
            if (StringUtils.isNotEmpty(result)) {
                processServiceJson(result);
            }
        } finally {
        	// 唤醒HostReactor.getServiceInfo中调用wait的线程
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    // 处理从服务端获取的消息
    // 1.先将json字符串转换为ServiceInfo
    // 2.之前在serviceInfoMap存在值,将新得到的ServiceInfo保存到serviceInfoMap中,并将现在的和之前的进行比较,如果内容发生变化,写日志,并发送InstancesChangeEvent事件
    // 3.之前在serviceInfoMap不存在值,将新得到的ServiceInfo保存到serviceInfoMap中,写日志,并发送InstancesChangeEvent事件
    // 4.返回第一步的ServiceInfo
	public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceKey);
        
        if (pushEmptyProtection && !serviceInfo.validate()) {
            //empty or error push, just ignore
            return oldService;
        }
        
        boolean changed = false;
        
        if (oldService != null) {
            
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                        + serviceInfo.getLastRefTime());
            }
            
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            
            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                oldHostMap.put(host.toInetAddr(), host);
            }
            
            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                newHostMap.put(host.toInetAddr(), host);
            }
            
            Set<Instance> modHosts = new HashSet<Instance>();
            Set<Instance> newHosts = new HashSet<Instance>();
            Set<Instance> remvHosts = new HashSet<Instance>();
            
            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                    newHostMap.entrySet());
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (oldHostMap.containsKey(key) && !StringUtils
                        .equals(host.toString(), oldHostMap.get(key).toString())) {
                    modHosts.add(host);
                    continue;
                }
                
                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }
            
            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }
                
                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }
                
            }
            
            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(newHosts));
            }
            
            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(remvHosts));
            }
            
            if (modHosts.size() > 0) {
                changed = true;
                updateBeatInfo(modHosts);
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(modHosts));
            }
            
            serviceInfo.setJsonFromServer(json);
            
            if (changed) {
                NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                        serviceInfo.getClusters(), serviceInfo.getHosts()));
                DiskCache.write(serviceInfo, cacheDir);
            }
            
        } else {
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }
        
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        
        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
        }
        
        return serviceInfo;
    }
NamingProxy.queryList

向服务端发送http请求来获取该请求条件下的Instance集合

  • String serviceName:服务名称
  • String clusters:用“,”隔开的cluster集合的字符串
  • boolean healthyOnly:返回结果里的实例是否健康,默认为false
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
            throws NacosException {
        
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        
        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }

服务端

InstanceController.list

	@GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {
        
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
        
        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
        
        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
        
        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
        
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
    }
InstanceController.doSrvIpxt

在服务端中根据条件查询Instance集合,方法太长了,这里只显示部分关键代码
healthyOnly为true时:默认情况下返回的所有满足条件且健康且能接受请求的Instance,如果不存在健康状态的Instance,返回所有满足条件且能接受请求的Instance。
healthyOnly为false时:返回所有满足条件的能接受请求的Instance,不管健康状态。文章来源地址https://www.toymoban.com/news/detail-523583.html

	public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        
        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        // 根据namespaceId和serviceName从ServiceManager.serviceMap中获取Service
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();
        
		// 不存在Service时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码
        if (service == null) {
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }
     
        List<Instance> srvedIPs;
        // 如果clusters为空,获取所有的Instance,否则获取clusters对应的Cluster集合里的所有Instance
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
        
        // 使用Service中的selector对上面获得的Instance集合进行过滤
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }
        // 不存在Instance时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码
        if (CollectionUtils.isEmpty(srvedIPs)) {
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }
        
        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());
        // 将Instance根据健康状态进行分类
        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }
        double threshold = service.getProtectThreshold();
        // 如果集合中健康状态的Instance的占比<=Serice.protectThreshold,会将不健康的Instance加入健康的集合中,即默认所有的Instance都是健康的,等效于healthyOnly为false,Serice.protectThreshold的默认值为0,即healthyOnly为true时,返回的所有Instance要么都是健康的,要么都是不健康的
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }
        // 保存需要返回给客户端的Instance信息
        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
        
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();
            // healthyOnly=true,说明客户端只要健康状态的Instance,会过滤掉不健康的的Instance,即key为false的时不处理
            if (healthyOnly && !entry.getKey()) {
                continue;
            }
            for (Instance instance : ips) {
                // 如果Instance不能接收外部的请求,跳过当前Instance
                if (!instance.isEnabled()) {
                    continue;
                }
                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
                // 省略了将instance的内从组装到ipObj里的过程
                hosts.add(ipObj);
            }
        }
        
        result.replace("hosts", hosts);
        ....
        return result;
    }

到了这里,关于Nacos 1.4.x 服务发现源码阅读的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Nacos服务注册或发现、Nacos服务分级模型、Nacos负载均衡策略、加权负载均衡、Nacos环境隔离

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

    2024年01月16日
    浏览(41)
  • 微服务 – Spring Cloud – Nacos服务注册、发现

    1、引入依赖 父pom依赖 子pom依赖 2、配置文件 3、主启动类 第三部完成 打开nacos 在服务列表即可看到注册进来的服务. 4、业务类 写一个接口供服务发现者使用 1、引入依赖 2、配置文件 3、主启动类 打开nacos 在服务列表即可看到注册进来的服务. 4、发现第一个服务 并调用第一

    2024年02月11日
    浏览(91)
  • 微服务·架构组件之服务注册与发现-Nacos

    Nacos服务注册与发现流程 服务注册:Nacos 客户端会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。 Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。 服务心跳:在服务注册后,Nacos Client会维

    2024年02月02日
    浏览(46)
  • 传统DNS、负载均衡服务发现框架与专业服务发现框架(Eurek、nacos)分析

    DNS 服务器可以在一定程度上用作服务发现的机制,以下是其冲动服务发现的一些利弊 优势 广泛性 : DNS是互联网的标准协议之一,已经广泛地被支持和使用。因此,使用DNS作为服务发现的机制可以借助现有的网络基础设施,无需引入新的工具。 简单性 : DNS的域名解析机制

    2024年02月12日
    浏览(38)
  • 【探索SpringCloud】服务发现-Nacos服务端数据结构和模型

    上一文中,我们从官方的图示了解到Nacos的服务数据结构。但我关心的是,Nacos2.x不是重构了吗?怎么还是这种数据结构?我推测,必然是为了对Nacos1.x的兼容,实际存储应该不是这样的。于是,沿着这个问题出发我们一起来翻一下源码。 在扎入源码之前,我们需要回忆一下,

    2024年02月10日
    浏览(44)
  • Nacos:服务的注册、发现和配置中心(注册篇)

    Nacos 是阿里巴巴推出来的一个新开源项目,这是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。 N

    2024年02月15日
    浏览(39)
  • springCloudAlibaba组件-Nacos-服务发现与负载均衡(三)

    如果项目使用微服务架构,如果A微服务需要访问B微服务,需要http请求进行调用,当然需要B微服务的地址与端口号,微服务可以向之前提到的服务中心进行获取B服务的ip地址和端口号,这就是服务发现 1.客户端主动获取 客户端: 流程: 1.先是故障转移机制判断是否去本地文

    2024年02月10日
    浏览(44)
  • 微服务架构的服务注册和发现究竟采用Nacos还是Eureka ?

    微服务架构已经成为了构建分布式应用程序的主要方式之一,而服务注册与发现在微服务架构中扮演着至关重要的角色。在这个领域,有两个非常流行的工具,它们分别是Nacos和Eureka。我们来深入探讨这两者之间的区别,以帮助您在选择适合您项目的服务注册与发现工具时提

    2024年02月02日
    浏览(49)
  • SpringCloudAlibaba微服务实战系列(一)Nacos服务注册发现

    实战前先做一个背景了解。 单体架构:近几年技术的飞速发展,各种各样的服务已经进入到网络化。单体架构发布时只需要打成一个war或jar包发布即可;而随着业务量激增或网站流量的增加,必会暴露致命缺陷。 SOA:Service Oriented Architecture 面向服务的体系结构。旨在提升代

    2024年02月15日
    浏览(38)
  • Spring Cloud Alibaba - 服务注册与发现(Nacos)

    ✅作者简介:热爱Java后端开发的一名学习者,大家可以跟我一起讨论各种问题喔。 🍎个人主页:Hhzzy99 🍊个人信条:坚持就是胜利! 💞当前专栏:微服务 🥭本文内容:Spring Cloud Alibaba - 服务注册与发现(Nacos)。 在微服务架构中,服务注册与发现是其中的重要一环。服务

    2024年02月07日
    浏览(90)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包