Spring-Cloud-Gateway-09-动态路由与自动刷新

这篇具有很好参考价值的文章主要介绍了Spring-Cloud-Gateway-09-动态路由与自动刷新。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。



系列文章

Spring-Cloud-Gateway-01-网关基本概念

Spring-Cloud-Gateway-02-请求调用基本流程

Spring-Cloud-Gateway-03-网关自动装配

Spring-Cloud-Gateway-04-HttpWebHandlerAdapter到DispatcherHandler调用流程

Spring-Cloud-Gateway-05-请求到HttpWebHandlerAdapter的调用链路

Spring-Cloud-Gateway-06-DispatcherHandler调用解析

Spring-Cloud-Gateway-07-GatewayFilterChain的执行过程

Spring-Cloud-Gateway-08-路由的自动装配与加载流程

Spring-Cloud-Gateway-09-动态路由与自动刷新




事件发布与监听

ApplicationEventPublisherAware事件发布详解

上面这篇文章讲的很通俗易通,建议大家去看看。大体的意思就是说,事件的发布者发布事件,事件的监听这对对应的事件进行监听,当监听到对应的事件时,就会触发调用相关的方法。因此,在事件处理中,事件是核心,是事件发布者和事件监听者的桥梁。

事件,关联到代码里就是ApplicationEvent抽象类,我们创建一个事件就需要继承这个抽象类。

事件监听者,关联到代码里就是ApplicationListener接口,其中onApplicationEvent方法就是在事件发布的时候触发。实现该接口,其中泛型就是需要监听的事件,然后在重写方法实现逻辑即可。

事件发布者,关联到代码里就是ApplicationEventPublisherAware接口,会给我们提供一个ApplicationEventPublisher对象,其中publishEvent方法表示发布一个事件。


RefreshRoutesEvent事件监听

回到具体的代码,看SpringCloudGateway是如何实现的

public class CachingRouteLocator
		implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
    private ApplicationEventPublisher applicationEventPublisher;
    
    @Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(
					list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
						applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, this::handleRefreshError), this::handleRefreshError);
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}
    
    @Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.applicationEventPublisher = applicationEventPublisher;
	}

可以看到SpringCloudGateway定义了RefreshRoutesEvent这个刷新路由的事件,然后实现了ApplicationListener接口,重写了onApplicationEvent方法,并且实现了ApplicationEventPublisherAware接口用来发布事件。

我们接着来看onApplicationEvent方法具体做了什么

首先调用fetch方法

    CachingRouteLocator.java	

	//this.delegate之前说过就是CompositeRouteLocator
	private Flux<Route> fetch() {
		return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
	}
	CompositeRouteLocator.java	

	@Override
	public Flux<Route> getRoutes() {
        //this.delegates就是RouteLocator的实现bean,分别调用它们的getRoutes方法返回所有的Route
        //目前来说就只有RouteDefinitionRouteLocator
		return this.delegates.flatMapSequential(RouteLocator::getRoutes);
	}
	RouteDefinitionRouteLocator.java

	@Override
	public Flux<Route> getRoutes() {
         //this.routeDefinitionLocator之前说过就是CompositeRouteDefinitionLocator,用来组合所有途径的RouteDefinition
         //通过convertToRoute方法转换为Route
		Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);

		if (!gatewayProperties.isFailOnRouteDefinitionError()) {
			// instead of letting error bubble up, continue
			routes = routes.onErrorContinue((error, obj) -> {
				if (logger.isWarnEnabled()) {
					logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
							+ " will be ignored. Definition has invalid configs, " + error.getMessage());
				}
			});
		}

		return routes.map(route -> {
			if (logger.isDebugEnabled()) {
				logger.debug("RouteDefinition matched: " + route.getId());
			}
			return route;
		});
	}

总的来看,fetch方法就是获取所有的Route并进行排序,也算是更新Route。

	CachingRouteLocator

	@Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(
					list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
        				 //发布RefreshRoutesResultEvent路由刷新结果的事件,并加入到cache中
                          //cache就是一map集合,缓存着所有的路由
						applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, this::handleRefreshError), this::handleRefreshError);
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}

RefreshRoutesResultEvent事件发布之后并没有看到有专门监听的,可能会在后续扩展中吧。

到目前为止,我们就知道路由的更新逻辑是如何处理的,那么有一个问题

RefreshRoutesEvent事件的发布是在什么时候触发的?


RefreshRoutesEvent事件触发

Spring-Cloud-Gateway-09-动态路由与自动刷新

用IDEA搜索一下可以看到,分别有两个地方发布了RefreshRoutesEvent事件,分别点进去看

一个是在AbstractGatewayControllerEndpoint

	@PostMapping("/refresh")
	public Mono<Void> refresh() {
		this.publisher.publishEvent(new RefreshRoutesEvent(this));
		return Mono.empty();
	}

可以看到这是一个Post请求,路径/refresh,猜想这是用来调用接口手动刷新的

另外一个是在RouteRefreshListener中,通过名称可以猜到这是一个监听类,并且实现了ApplicationListener接口,监听的对象事件则是ApplicationEvent这个抽象类,也就是所有继承了ApplicationEvent类的事件发布之后,都是在RouteRefreshListener中触发

	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (event instanceof ContextRefreshedEvent) {
			ContextRefreshedEvent refreshedEvent = (ContextRefreshedEvent) event;
			if (!WebServerApplicationContext.hasServerNamespace(refreshedEvent.getApplicationContext(), "management")) {
				reset();
			}
		}
		else if (event instanceof RefreshScopeRefreshedEvent || event instanceof InstanceRegisteredEvent) {
			reset();
		}
		else if (event instanceof ParentHeartbeatEvent) {
			ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
			resetIfNeeded(e.getValue());
		}
		else if (event instanceof HeartbeatEvent) {
			HeartbeatEvent e = (HeartbeatEvent) event;
			resetIfNeeded(e.getValue());
		}
	}

	private void resetIfNeeded(Object value) {
		if (this.monitor.update(value)) {
			reset();
		}
	}

	private void reset() {
		this.publisher.publishEvent(new RefreshRoutesEvent(this));
	}

可以看到RefreshRoutesEvent事件的发布是在onApplicationEvent方法中触发的

重点关注HeartbeatEvent事件,通过名字可以猜到和心跳机制相关

在resetIfNeeded方法中做了检验,this.monitor就是HeartbeatMonitor对象

/**
 * Helper class for listeners to the {@link HeartbeatEvent}, providing a convenient way to
 * determine if there has been a change in state.
 *
 * @author Dave Syer
 */
public class HeartbeatMonitor {

	private AtomicReference<Object> latestHeartbeat = new AtomicReference<>();

	/**
	 * @param value The latest heartbeat.
	 * @return True if the state changed.
	 */
	public boolean update(Object value) {
		Object last = this.latestHeartbeat.get();
		if (value != null && !value.equals(last)) {
			return this.latestHeartbeat.compareAndSet(last, value);
		}
		return false;
	}

}

通过注释可以看到这个一个心跳的监控器,提供一种方式来判断当前状态是否改变

update方法中通过CAS来进行判断的

下面我们关注HeartbeatEvent这个事件是如何触发的

Spring-Cloud-Gateway-09-动态路由与自动刷新

	public class CloudEurekaClient extends DiscoveryClient {

	@Override
	protected void onCacheRefreshed() {
		super.onCacheRefreshed();

		if (this.cacheRefreshedCount != null) { // might be called during construction and
			// will be null
			long newCount = this.cacheRefreshedCount.incrementAndGet();
			log.trace("onCacheRefreshed called with count: " + newCount);
			this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
		}
	}

CloudEurekaClientDiscoveryClient的子类,并重写了onCacheRefreshed方法

DiscoveryClient应该比较熟悉了,Eureka的客户端

再往上找

    	DiscoveryClient.java

		private boolean fetchRegistry(boolean forceFullRegistryFetch) {
		......
        
        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

继续往上找

    @VisibleForTesting
    void refreshRegistry() {
        ......
            boolean success = fetchRegistry(remoteRegionsModified);
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

可以看到是由CacheRefreshThread线程来执行的

private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
	......
}

可以看到initScheduledTasks方法会初始化两个线程 CacheRefreshThread HeartbeatThread , 默认每 30 秒调用一次

  • CacheRefreshThread 会发布 HeartbeatEvent
  • HeartbeatThread 更新 lastSuccessfulHeartbeatTimestamp时间戳

initScheduledTasks方法是在初始化 DiscoveryClient的时候调用的

到这里,我们就很清楚了,HeartbeatEvent时间是如何被发布的,以及发布的频率是多少


路由自动刷新流程

spring cloud gateway 自动刷新路由,这篇文章总结的很好,大家可以看看,这里把它部分内容搬运下来。

刷新路由流程:

  • 初始化DiscoveryClient
    • 调用 initScheduledTasks() 方法,初始化两个线程 CacheRefreshThread 和 HeartbeatThread , 默认每 30 秒调用
    • CacheRefreshThread 会发布 HeartbeatEvent
    • HeartbeatThread 更新 lastSuccessfulHeartbeatTimestamp 时间戳
  • CacheRefreshThread
    • 调用refreshRegistry()方法
      • 调用fetchRegistry()方法
        • 调用 onCacheRefreshed() 方法, 在子类 CloudEurekaClient 重载后发布 HeartbeatEvent 事件
  • RouteRefreshListener : 监听HeartbeatEvent ,并且发布 RefreshRoutesEvent 事件
  • RefreshRoutesEvent 促发本地拉取最新的路由信息
  • CachingRouteLocator: 自动装配默认的 路由更新器,CachingRouteLocator监听RefreshRoutesEvent事件
    • 如果是 RefreshRoutesEvent 事件, 调用 fetch() 方法更新 routes

Routes 更新过程

  • CachingRouteLocator.fetch() 总入口 ,使用装饰器模式,代理 CompositeRouteLocator,而 CompositeRouteLocator 中 代理最终使用 RouteDefinitionRouteLocatorgetRoutes() 方法
  • 在 RouteDefinitionRouteLocator 中, 调用 RouteDefinitionLocator 的 getRouteDefinitions() 实现 route 刷新
  • RouteDefinitionLocator 也使用了装饰器和组合模式:
    • InMemoryRouteDefinitionRepository, 基于内存
    • PropertiesRouteDefinitionLocator, 基于 properties 文件,如果使用了 properties 文件
    • RedisRouteDefinitionRepository ,基于Redis
    • DiscoveryClientRouteDefinitionLocator,基于注册中心

动态路由

如果路由的自动刷新流程搞清楚了之后,动态路由就很好理解了

每次Routes更新的过程,都会从各个渠道去拿到RouteDefinition,然后再转换成Route,缓存起来

目前SpringCloudGateway提供了方式有两种


DiscoveryClientRouteDefinitionLocator

基于注册中心,每次路由更新的时候,从注册中心拉取路由信息,并转换成RouteDefinition返回


RouteDefinitionRepository

SpringCloudGateway提供的接口RouteDefinitionRepository,目前有两种实现类,分别是InMemoryRouteDefinitionRepository(基于内存)和RedisRouteDefinitionRepository(基于Redis)

默认是InMemoryRouteDefinitionRepository

我们同样可以实现RouteDefinitionRepository接口,重写save方法和delete方法,通过getRouteDefinitions方法返回RouteDefinition

两种不同的方法,有兴趣的同学可以去实现下,这里写的很粗糙,只是提供大致的方向,还有很多细节没有涉及。文章来源地址https://www.toymoban.com/news/detail-428538.html

到了这里,关于Spring-Cloud-Gateway-09-动态路由与自动刷新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring-cloud-gateway版本和springboot版本不匹配

    在搭建gateway服务的时候,启动出现以下问题: Description: An attempt was made to call a method that does not exist. The attempt was made from the following location:     org.springframework.cloud.gateway.config.GatewayAutoConfiguration$NettyConfiguration.buildConnectionProvider(GatewayAutoConfiguration.java:798) The following method did no

    2024年02月16日
    浏览(106)
  • 芝法酱躺平攻略(14)——Nginx与Spring-Cloud-Gateway

    上一章芝法酱躺平攻略(12)展望了微服务下常见的技术需求与常见解决方案,本期来讲解第一部分,Nginx与SpringCloud-Gateway。 本章将实践在nginx和spring-cloud-gateway的简单使用。 首先,去官网查找最新稳定版,把下载的nginx包放到DOWNLOAD文件夹下。 而后把该包解压到SOFTWARE文件夹

    2024年02月07日
    浏览(43)
  • 第八章 : Spring cloud 网关中心 Gateway (动态路由)

    第八章 : Spring cloud 网关中心 Gateway (动态路由) 前言 本章知识点:重点介绍动态网关路由的背景、动态路由与静态路由的概念,以及如何基于Nacos实现动态网关路由 的实战案例。 背景 前面章节介绍了Spring Cloud Gateway提供的配置路由规则的两种方法,但都是在Spring Cloud Ga

    2024年01月19日
    浏览(47)
  • Spring-Cloud-Gateway修改请求(json,form带文件请求)参数,返回值参数

    新项目需要在getway统一做入参、出参加解密,记录日志。记录一下form,x-www-form-urlencoded , json 这几种修改数据的方式。 gateway做拦截器是实现GlobalFilter接口,修改json方式网上有很多文章,后来又想研究研究能不能实现修改form-data参数,以及文件请求,后者文章不多大部分是怎

    2024年02月16日
    浏览(47)
  • 三分钟了解Spring Cloud Gateway路由转发之自动路由

    大家好,我是冰点,今天和大家分享一下关于Spring Cloud Gateway 利用服务注册与发现实现自动路由的原理和源码解读。希望对大家有所帮助。 今天有个新同学,问我 为什么我们的网关服务Spring Cloud Gateway,没有配置路由就可以将请求到路由服务 ,说他们之前的项目的网关是将

    2024年02月08日
    浏览(35)
  • Spring Cloud 2022.x版本使用gateway和nacos实现动态路由和负载均衡

    Spring Cloud Alibaba官方:https://sca.aliyun.com/zh-cn/ Spring Cloud官网:https://spring.io/projects/spring-cloud Spring Cloud与Spring Cloud Alibaba版本对应说明:https://sca.aliyun.com/zh-cn/docs/2022.0.0.0/overview/version-explain 下载地址:https://github.com/alibaba/nacos/releases 下载编译压缩并解压:nacos-server-2.2.3.zip。 1.1、

    2024年02月11日
    浏览(40)
  • Spring Cloud Gateway 路由配置策略

    Spring Cloud Gateway 是一个基于 Spring Boot 2.x 和 Spring WebFlux 的轻量级网关服务,用于构建微服务架构中的 API 网关。它提供了一种简单、高效、灵活和可扩展的方式来路由请求到后端的微服务。 Spring Cloud Gateway 的核心特性包括: 路由功能:可以根据请求的属性(路径、参数等)将

    2024年01月20日
    浏览(41)
  • Spring Cloud Gateway 监控、多网关实例路由共享 | Spring Cloud 18

    Actuator 是 Spring Boot 提供的用来对应用系统进行监控的功能模块,借助于 Actuator 开发者可以很方便地对应用系统某些监控指标进行查看、统计等。 Actuator 的核心是端点 Endpoint 。 Endpoint 可以让我们监视应用程序并与其交互。 Spring Boot 包含许多内置端点,并允许您添加自己的端

    2024年02月09日
    浏览(59)
  • 【Spring Cloud Alibaba】8.路由网关(Gateway)

    接下来对服务消费者添加路由网关来实现统一访问接口,本操作先要完成之前的步骤,详情请参照【Spring Cloud Alibaba】Spring Cloud Alibaba 搭建教程 Spring Cloud Gateway 是 Spring 官方基于 Spring 5.0 , Spring Boot 2.0 和 Project Reactor 等技术开发的网关,该项目提供了一个库,用于在 Spring W

    2023年04月24日
    浏览(39)
  • spring cloud gateway网关(一)之网关路由

    1、gateway相关介绍 在微服务架构中,系统往往由多个微服务组成,而这些服务可能部署在不同机房、不同地区、不同域名下。这种情况下,客户端(例如浏览器、手机、软件工具等)想要直接请求这些服务,就需要知道它们具体的地址信息,例如 IP 地址、端口号等。这种客户

    2024年02月08日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包