[Nacos] Nacos Server处理订阅请求 (九)

这篇具有很好参考价值的文章主要介绍了[Nacos] Nacos Server处理订阅请求 (九)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.InstanceController#list()

Nacos Server处理订阅请求

[Nacos] Nacos Server处理订阅请求 (九)

主要还是从请求中获取参数, 比如namespceId、serviceName、agent(指定提交请求的客户端是哪种类型)、clusters、clusterIP、udpPort(后续UDP通信会使用)、app、tenant, 最后调用方法对参数进行处理

2.InstanceController#doSrvIpxt()

对请求进行详细处理

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        // 不同agent,生成不同的clientInfo
        ClientInfo clientInfo = new ClientInfo(agent);
        // 创建一个JSON Node,其就是当前方法返回的结果。后续代码就是对这个Node的各种初始化
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        // 从注册表中获取当前服务
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {

                // 创建当前发出订阅请求的Nacos client的UDP Client, PushClient
                // 注意,在Nacos的UDP通信中,Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                                pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }

        // 若注册表中没有该服务,则直接结束
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            // 注意,hosts为空
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }

        // 代码直到这里,说明注册表中存在该服务
        // 检测该服务是否被禁。若是被禁的服务,直接抛出异常
        checkIfDisabled(service);

        List<Instance> srvedIPs;

        // 获取到当前服务的所有实例,包含所有持久/临时实例
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

        // filter ips using selector:
        // 若选择器不空,则根据选择算法选择可用的intance列表,默认情况下,选择器不做任何过滤
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }

        // 若最终选择的结果为空,则直接结束
        if (CollectionUtils.isEmpty(srvedIPs)) {

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }

            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }

            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            // 注意,hosts为空
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }

        // 代码走到这里,说明具有可用的instance
        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        // 这个map只有两个key,True与False
        // key为true的value中存放的是所有健康的instance
        // key为false的value存放的是所有不健康的instance
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());

        // 根据instance的健康状态,将所有instance分流放入map的不同key的value中
        for (Instance ip : srvedIPs) {
            // 这个语句写的非常好
            // 健康加入健康的列表, 不健康的加入不健康的列表
            ipMap.get(ip.isHealthy()).add(ip);
        }

        // isCheck为true,表示需要检测instance的保护阈值
        if (isCheck) {
            // reachProtectThreshold 是否达到了保护阈值, false 为没有达到
            result.put("reachProtectThreshold", false);
        }

        // 获取服务的保护阈值
        double threshold = service.getProtectThreshold();

        // 若  "健康instance数量/instance总数" <= 保护阈值,则说明需要启动保护机制了
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                // true表示启动保护机制
                result.put("reachProtectThreshold", true);
            }

            // 健康数量小于阈值, 则从所有实例中调用, 可能会有不健康实例, 可以保证健康实例不被压崩溃
            // 将所有不健康的instance添加到的key为true的instance列表,
            // 即key为true的value中(instance列表)存放的是所有instance实例
            // 包含所有健康的与不健康的instance
            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            // 清空key为false的value(不健康的instance列表)
            ipMap.get(Boolean.FALSE).clear();
        }

        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold", false);

            return JacksonUtils.createEmptyJsonNode();
        }

        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

        // 注意,这个ipMap中存放着所有健康与不健康的instance列表
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();

            // 若客户端只要健康的instance,且当前遍历的map的key为false,则跳过
            if (healthyOnly && !entry.getKey()) {
                continue;
            }

            // 遍历的这个ips可能是所有不健康的instance列表,
            // 也可能是所有健康的instance列表,
            // 也可能是所有健康与不健康的instance列表总和
            for (Instance instance : ips) {

                // 跳过禁用的instance
                if (!instance.isEnabled()) {
                    continue;
                }

                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

                // 将当前遍历的instance转换为JSON
                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }

                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);

            }  // end-for
        } // end-for

        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
  1. 不同agent,生成不同的clientInfo, java、c、c++、go、nginx、dnsf
    [Nacos] Nacos Server处理订阅请求 (九)

  2. pushService.addClient(): 创建当前发出订阅请求的Nacos client的UDP Client, PushClient, Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
    [Nacos] Nacos Server处理订阅请求 (九)
    [Nacos] Nacos Server处理订阅请求 (九)
    获取到了UDP通信客户端PushClient, 并写入到一个缓存map中

    public void addClient(PushClient client) {
        // client is stored by key 'serviceName' because notify event is driven by serviceName change
        String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
        // clientMap是一个缓存map,用于存放当前Nacos Server中所有instance对应的UDP Client
        // 其是一个双层map,外层map的key为  namespaceId##groupId@@微服务名称,value为内层map
        // 内层map的key为代表一个instance的字符串,value为该instance对应的UDP Client,即PushClient
        ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
        // 若当前服务的内层map为null,则创建一个并放入到缓存map
        if (clients == null) {
            clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
            clients = clientMap.get(serviceKey);
        }
        PushClient oldClient = clients.get(client.toString());
        // 从内层map中获取当前instance对应的的PushClient,
        // 若该PushClient不为null,则更新一个最后引用时间戳;
        // 若该PushClient为null,则将当前这个PushClient作为PushClient
        // 写入到内层map,即写入到了缓存map
        if (oldClient != null) {
            // 更新最后引用时间戳
            oldClient.refresh();
        } else {
            PushClient res = clients.putIfAbsent(client.toString(), client);
            if (res != null) {
                Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
            }
            Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
        }
    }
  1. 获取当前服务的所有实例, 包括持久和临时
    [Nacos] Nacos Server处理订阅请求 (九)
    public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        // 获取到当前服务的所有cluster中的所有instance
        return allIPs(clusters);
    }

    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> result = new ArrayList<>();
        for (String cluster : clusters) {
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            // 将当前遍历cluster的所有instance添加到result集合
            // 包含所有持久实例与临时实例
            result.addAll(clusterObj.allIPs());
        }
        return result;
    }

    public List<Instance> allIPs() {
        List<Instance> allInstances = new ArrayList<>();
        // 持久实例
        allInstances.addAll(persistentInstances);
        // 临时实例
        allInstances.addAll(ephemeralInstances);
        return allInstances;
    }

3.总结

Nacos Server处理订阅请求的主要任务:文章来源地址https://www.toymoban.com/news/detail-462962.html

  1. 创建了Nacos Client对应的UDP通信客户端PushClient, 并写入一个缓存map
  2. 从注册表中获取到指定服务的所有可用的instance, 并封装为Json

到了这里,关于[Nacos] Nacos Server处理订阅请求 (九)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SQL server 数据库同步(发布与订阅)【解决办法】

    注意:所有连接数据库操作时注意连接的数据库是否在所选服务器上 ![在这里插入图片描述](https://img-blog.csdnimg.cn/38812acbb640420b89ba666dd016adb8.png) 在 发布服务器 和 订阅服务器 本地防火墙放通1433端口 参考 【操作方法】windows防火墙添加出入站规则方法 此处选择影响代理管理位

    2024年02月16日
    浏览(31)
  • Nacos——Nacos简介以及Nacos Server安装

    资料来源:02-Nacos配置管理-什么是配置中心_哔哩哔哩_bilibili nacos记得下载2.x版本的,负责以后新建配置的时候会出现“发布错误,请检查参数是否正确”错误!!!! 目录 一、Nacos简介 1.1  四大功能: 1.2   对比 二、配置中心 2.1 什么是配置   2.1.1 特点  2.2 什么是配置中

    2024年02月05日
    浏览(37)
  • [Nacos] Nacos Server主要类和接口 (五)

    InstanceController: 处理器, 处理服务实例的心跳和注册等请求。 core/Service: 在Nacos客户端的一个微服务名称定义的微服务, 在Nacos服务端是以Service实例的形式出现的。类似于ServiceInfo, ServiceInfo为客户端服务, Service为服务端服务。 RecordListener: Service类实现了RecordListener接口, 这个接口

    2024年02月06日
    浏览(56)
  • Spring Cloud Alibaba【什么是Nacos、Nacos Server下载安装 、Docker安装Nacos Server服务、微服务聚合父工程构建】(一)

       目录 Spring Cloud Alibaba简介 Spring Cloud Alibaba版本与兼容性   分布式服务治理_什么是Nacos 分布式服务治理_Nacos Server下载安装   分布式服务治理_Docker安装Nacos Server服务 分布式服务治理_微服务聚合父工程构建  什么是Spring Cloud Alibaba Spring Cloud Alibaba致力于提供微服务开发的

    2024年02月17日
    浏览(37)
  • Nacos Server 部署配置详解

    官方简介:一个更易于构建云原生应用的动态 服务发现(Nacos Discovery ) 、 服务配置(Nacos Config) 和服务管理平台。 Nacos的关键特性包括 : 服务发现和服务健康监测 动态配置服务 动态DNS服务 服务及其元数据管理 Nacos的核心功能 : 服务注册 :Nacos Client会通过发送REST请求的方式

    2024年02月05日
    浏览(42)
  • Nacos 版本不一致报错: Request nacos server failed

    在做微服务开发中,测试环境使用Nacos没有问题,但是生产环境服务启动一直报错: 代码没有改动,测试环境没问题,但是生产环境有问题呢?首先看一下两者不同的地方,大多数都是 环境配置 的问题。 查看 Nacos 服务的版本,查看 Nacos 控制台首页左上角就能看到版本号:

    2024年02月12日
    浏览(59)
  • NacosException: Request nacos server failed

    2.1.2 客户端可引用的版本不一致导致!

    2024年02月13日
    浏览(70)
  • Get请求传参List<Map>或List<对象>

    前端参数结构 后端参数结构(后端结构前端不用关心!!!!)

    2024年02月03日
    浏览(39)
  • nacos作为注册中心: Application failed to connect to Nacos server: “xxxx“

    1、首先排查nacos是否启动成功 2、排查nacos映射的地址是哪个 3、如果是服务之间互相引用,nacos有公共地址的,排查连接nacos配置文件的ip是否正确 4、如果nacos配置文件正确,则需要对公共的服务进行install,这样新服务才会引用到 上述问题中我遇到的是第四个情景,按照自己

    2024年01月21日
    浏览(60)
  • 多人开发共用一个nacos,怎样配置可以保证各自的请求不会请求到同事的电脑里,实现请求隔离

    1.在每个服务的配置文件里,给服务加上自己的后缀 2.配置gateway里,指向对应的带后缀服务名,如果是gateway是自动配置 可忽略此项

    2024年02月14日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包