spring eureka集群相关问题

这篇具有很好参考价值的文章主要介绍了spring eureka集群相关问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、集群节点信息如何更新?

EurekaServer节点启动的时候,DefaultEurekaServerContext.init()方法调用PeerEurekaNodes.start()方法,start方法中resolvePeerUrls()会从配置文件读取serviceUrl属性值获得集群最新节点信息,通过updatePeerEurekaNodes()方法将最新节点信息更新到PeerEurekaNodes类的属性peerEurekaNodes和peerEurekaNodeUrls中。

PeerEurekaNodes.start()还提供了一个更新节点信息的定时任务,每隔PeerEurekaNodesUpdateIntervalMs (默认10min) 时间执行一次。

spring eureka集群相关问题,spring,eureka,java,集群同步,微服务

针对上述这个过程,需要注意的是(网上很多文章表述有误):peerEurekaNodesUpdateIntervalMs这个参数并不是集群节点注册信息的同步间隔时间。为什么这么说呢?我们可以看上图中的定时任务peersUpdateTask做了什么事情即可,其实只要看updatePeerEurekaNodes(resolvePeerUrls())做了什么即可。

通过源码跟踪:resolvePeerUrls()方法默认即执行了EndpointUtils.getServiceUrlsFromConfig(),即从配置文件读取服务节点urls信息。updatePeerEurekaNodes()方法则把获取得到的urls与之前urls信息进行对比,该新增的放到新增list,该删除的放到删除list,如果既没有新增的节点也没有待删除的节点,则返回不做任何处理。下面是代码:

spring eureka集群相关问题,spring,eureka,java,集群同步,微服务

针对要删除的节点,处理方式是从节点列表移除,同时调用该节点的shutDown方法;针对要新增的节点,则加入到节点列表中,并创建新的节点,调用方法createPeerEurekaNode(String peerEurekaNodeUrl);创建节点最终调用了构造方法:

new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);

上述构造方法会从服务其他节点获取注册服务信息同步到新创建的服务节点中。

那么接下来就要问第二个问题了。

二、注册服务信息如何在集群节点中同步?

注册服务信息在集群节点中的同步有两种场景:一种是新增服务节点如何从其他服务节点同步服务注册信息;另一种是稳定的集群中,各节点之间是如何同步服务注册信息。

我们先来看第一种场景,新增节点如何从其他服务节点同步服务注册信息。

(1)Eureka Server节点启动时的服务同步

spring eureka集群相关问题,spring,eureka,java,集群同步,微服务

Eureka Server启动时,EurekaServerBootStrap调用PeerAwareInstanceRegistryImpl.syncUp(),同步注册服务信息。

// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

syncUp()进行注册服务信息的同步:

    /**
     * Populates the registry information from a peer eureka node. This
     * operation fails over to other nodes until the list is exhausted if the
     * communication fails.
     */
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
 
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

首先会从EurekaClient.getApplications获取所有的注册服务信息,然后调用register()进行服务注册。

如果同步失败,则会sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次进行同步。

同步完成后,调用PeerAwareInstanceRegistryImpl.openForTraffic()方法,进行自我保护阀值的计算:

   public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfClientsSendingRenews = count;
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }

至此,Eureka Server启动时的节点复制就进行完了。

备注

最大同步重试次数=serverConfig.getRegistrySyncRetries(),默认5次。

同步失败后每次同步的间隔时间=serverConfig.getRegistrySyncRetryWaitMs(),默认30s。

如果启动时同步服务注册信息失败,一段时间不对外提供服务注册功能,waitTimeInMsWhenSyncEmpty,默认5min。

第二种场景:

(2)Eureka Server接收到Register、renew、cancel时的服务同步

spring eureka集群相关问题,spring,eureka,java,集群同步,微服务

处理Register、renew、cancel同步时比较复杂,包括好几个队列的转换处理以及异常处理。这里算是个简图吧,省略了队列的转换和异常处理。

Eureka Server接收到Register、renew或cancel事件后,执行向其他节点同步:

 /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
 
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

以Register为例(逻辑是一样的),Eureka Server循环遍历其他节点,然后调用node.register(info):

/**
     * Sends the registration information of {@link InstanceInfo} receiving by
     * this node to the peer node represented by this class.
     *
     * @param info
     *            the instance information {@link InstanceInfo} of any instance
     *            that is send to this instance.
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

这里执行TaskDispatcher的process,实际底层转到了AcceptorExecutor.process():

acceptorExecutor.process(id, task, expiryTime);

然后acceptorExecutor里会开启内部线程AcceptorRunner进行处理,处理逻辑有点复杂(包括队列的转换和异常处理),最终将处理好的结果放到BlockingQueue> batchWorkQueue中。

TaskExecutors中会在内部开启WokerRunnable线程组(ThreadGroup),循环poll batchWorkQueue队列中的Task,然后调用InstanceReplicationTask的execute方法,将register事件推送到其他Eureka Server节点。

public void run() {
     try {
                while (!isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = getWork();
                    metrics.registerExpiryTimes(holders);
 
                    List<T> tasks = getTasksOf(holders);
                    ProcessingResult result = processor.process(tasks);
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    }
                    metrics.registerTaskResult(result, tasks.size());
                }
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery WorkerThread error", e);
            }
        }

register、renew、cancel的处理逻辑一样,只不过调用的同步接口不同罢了。文章来源地址https://www.toymoban.com/news/detail-823095.html

到了这里,关于spring eureka集群相关问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Eureka(服务注册和发现)——Eureka的简介和原理 & Eureka的使用和分析 & 心跳续约策略,服务的下线和剔除,自我保护 & Eureka集群的搭建

    Eureka:服务注册与发现组件,用于实现服务的自动注册与发现,Spring Cloud Eureka 是对Netflix公司的Eureka的二次封装,它实现了服务治理的功能,Spring Cloud Eureka提供服务端与客户端,服务端即是Eureka服务注册中心,客户端完成微服务向Eureka服务的注册与发现。服务端和客户端均采

    2024年02月05日
    浏览(43)
  • k8s发布eureka集群,创建微服务项目

    1.1创建父级项目,父级项目pom.xml文件中的打包类型为pom 1.2创建eureka服务 1.1.2配置pom.xml依赖 1.1.3创建eureka的application.yml文件 1.2构建项目推送到docke仓库 此步骤省略可参考以下连接 1.2.1-docker安装配置 1.2.2-Dockerfile 文件编写 1.2.3 -jenkins构建项目 1.2.4-Harbor仓库搭建 2.1 由于eureka是

    2024年02月02日
    浏览(33)
  • 【Spring Boot Admin】客户端服务无法注册到监控平台的相关问题及解决方案

    1、客户端服务整合了Spring Security 通过URL注册,需在客户端服务中添加如下配置 通过注册中心注册,需在客户端服务中添加如下配置 2、客户端服务配置了server.port.context-path参数,并且客户端服务通过注册中心注册 需在客户端服务中添加如下配置 3、Spring Boot Admin 监控平台使

    2024年02月16日
    浏览(54)
  • Nacos(一):简介 如何安装 服务注册与发现 集群 权重 与Eureka区别

    当服务调用越来越多,服务的地址需要管理起来,并实现动态调用而不是硬编码在接口中。此时需要一个注册中心来帮助我们管理服务。    流程如下: 商品微服务注册IP和端口到注册中心 订单微服务先从注册中心获取到商品微服务的IP和端口 订单微服务中使用获取到的IP和

    2024年02月13日
    浏览(55)
  • 2-Spring cloud之Eureka快速剔除失效服务 以及 Eureka原理

    添加如下配置: 每个服务的yml配置如下: 如下: 更多可以参考下面的文章,说的不错 Eureka服务端挂了,为什么微服务还能调通?(原理分析).

    2024年02月13日
    浏览(80)
  • 【Spring Cloud 三】Eureka服务注册与服务发现

    【Spring Cloud一】微服务基本知识 目前公司项目使用的注册中心主要是Spring Cloud Alibaba的Nacos做的注册中心和配置中心。之前也是对Nacos的基本原理通过手写代码的方式进行了实现。出于对于Eureka的好奇所以就对Spring Cloud Neflix的Eureka进行理论学习和实践。 Eureka是一个 注册发现中

    2024年02月14日
    浏览(159)
  • Spring Cloud Eureka:服务注册与发现

    💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! Spring Cloud Eureka是Spring Cloud生态系统中的一个组件,它是用于实现服务注册与发现的服务治理组件。在微服务架构中,服务之间存在复杂的依赖关系,而Spring Cloud Eureka可以帮助解决服务之间相互查找和通信的问题 Eurek

    2024年02月09日
    浏览(53)
  • Eureka:Spring Cloud服务注册与发现组件

    Eureka 一词来源于古希腊词汇,是“发现了”的意思。在软件领域,Eureka 是 Netflix 公司开发的一款开源的服务注册与发现组件。 Spring Cloud 将 Eureka 与 Netflix 中的其他开源服务组件(例如 Ribbon、Feign 以及 Hystrix 等)一起整合进 Spring Cloud Netflix 模块中,整合后的组件全称为 Spr

    2024年02月03日
    浏览(57)
  • Spring Cloud Netflix微服务组件-Eureka

    目录 CAP理论 注册中心对比 为什么注册中心更适合用AP? 分布式系统AP和CP如何取舍? Eureka核心功能点 Euraka server启动的主线流程 总体流程图 @EnableEurekaServer 流程图 EurekaServerAutoConfiguration EurekaServerInitializerConfiguration Euraka client启动的主线流程 总体流程图 EurekaClientAutoConfigurat

    2024年02月01日
    浏览(248)
  • 2-Spring cloud之Eureka快速剔除失效服务

    添加如下配置: 每个服务的yml配置如下: 如下: 更多可以参考下面的文章,说的不错 Eureka服务端挂了,为什么微服务还能调通?(原理分析).

    2024年02月12日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包