Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端)

这篇具有很好参考价值的文章主要介绍了Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在真正测试 Divide 插件时,想要知道后端服务(以下称为 Client)是如何将自己的信息注册到管理台(以下称为 Client)。这里后端服务用的是 shenyu 自带的 http 的例子,项目名字为 shenyu-examples-http。

下图描述了本文研究的内容——服务注册时 Client端向 Admin 注册的数据同步——在 shenyu 架构中处于什么位置。红色部分都是我自己加的,在官网的图中没有。
Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端),apache

阅读准备

Disruptor入门及应用

正文

Client事件监听器监听本地的 Context 的刷新事件

当 Client (Spring 应用)依赖注入后,Spring 框架会刷新上下文 Context,这时,shenyu 自定义的一个监听 ContextRefreshedEvent 的监听器 SpringMvcClientEventListener (AbstractContextRefreshedEventListener 的子类)会触发 onApplicationEvent 方法。

  • AbstractContextRefreshedEventListener.onApplicationEvent()
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {
	
	// ...
   	
   	@Override
    public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
        context = event.getApplicationContext();
        // 1. 拿到 beans
        Map<String, T> beans = getBeans(context);
        if (MapUtils.isEmpty(beans)) {
            return;
        }
        // 2. 原子地设置 registered 为 true
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        if (isDiscoveryLocalMode) {
            // 3. 如果是“本地发现”模式,发布用于注册 URI 的 DTO
            publisher.publishEvent(buildURIRegisterDTO(context, beans));
        }
        // 4. 处理每个 bean,具体是发布 bean 的注册信息给 Disruptor 的 QueueConsumer
        beans.forEach(this::handle);
        // 5. apiModules 的 key 是 beanName,value 是 bean 的成员变量
        Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
        // 6. 处理每个 apiModules,具体是发布 apiModules 的注册信息给 Disruptor 的 QueueConsumer
        apiModules.forEach((k, v) -> handleApiDoc(v, beans));
    }
    
    protected void handle(final String beanName, final T bean) {
    	// ...
    }
    
    private void handleApiDoc(final Object bean, final Map<String, T> beans) {
    	// ...
    }
}

从 SpringMvcClientEventListener.getBeans() 拿到 Beans

  • SpringMvcClientEventListener.java
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {

	// ...
	
	private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();
	
    @Override
    protected Map<String, Object> getBeans(final ApplicationContext context) {
        // Filter out
        // isFull 这个 Boolean 值代表的是:是否代理整个服务,目前适用于 SpringMvc/SpringCould
        if (Boolean.TRUE.equals(isFull)) {
            // 在全代理模式下,发布一个事件,这个事件包含了服务的元数据,用于注册服务
            getPublisher().publishEvent(MetaDataRegisterDTO.builder()
                    .contextPath(getContextPath()) // 设置服务的上下文路径
                    .addPrefixed(addPrefixed) // 设置是否添加前缀
                    .appName(getAppName()) // 设置应用名称
                    .path(UriComponentsBuilder.fromUriString(PathUtils.decoratorPathWithSlash(getContextPath()) + EVERY_PATH).build().encode().toUriString())
                    // 设置服务的路径,这里使用了 UriComponentsBuilder 来构建URI,将上下文路径装饰后加上一个通配符,代表匹配所有路径
                    .rpcType(RpcTypeEnum.HTTP.getName()) // 设置远程调用类型为 HTTP
                    .enabled(true) // 设置服务为启用状态
                    .ruleName(getContextPath()) // 使用上下文路径作为规则名称
                    .build());
            LOG.info("init spring mvc client success with isFull mode");
            // 发布一个 URI 注册的事件,传入空的映射作为参数
            publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
            return Collections.emptyMap();
        }
        // shenyu-examples-http 用的不是全代理模式,因为 isFull 为 false,此时直接返回带 Controller 注解的 bean
        return context.getBeansWithAnnotation(Controller.class);
    }
}

publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap())); 发布一个 URI 注册的事件,传入空的映射作为参数。

ShenyuClientRegisterEventPublisher 给 Client 端的 Disruptor 的 QueueConsumer 发布要向 Admin 注册的数据(是的,此时还没传给 Admin,还停留在 Client 端)
  • ShenyuClientRegisterEventPublisher.publishEvent() 调用 DisruptorProvider.onData() 传递数据
public class ShenyuClientRegisterEventPublisher {
    
    // ...
    
    private DisruptorProviderManage<DataTypeParent> providerManage;
    
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        // data 传给 Disruptor provider 
        provider.onData(data);
    }
}
  • DisruptorProvider 传递给 RingBuffer.publishEvent(),最终将注册的信息发布给 Diruptor 的 QueueConsumer。

    ps: Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。

public class DisruptorProvider<T> {
    
    // ...
    
    private final RingBuffer<DataEvent<T>> ringBuffer;
    
    private final boolean isOrderly;
    
    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
    
    // ...
    
    public void onData(final T data) {
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
            // 由  ringBuffer 发布事件
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
}
由 QueueConsumer.onEvent() 接收 RingBuffer.publishEvent() 发布的事件,并进行处理
  • 从 DisruptorProviderManage.startup 的源码中可以看到,在创建 Disruptor 时,线程池 OrderlyExecutor 被传进了 QueueConsumer,
public class DisruptorProviderManage<T> {
	
	// ...
   
    private final Integer consumerSize;
    
    private final QueueConsumerFactory<T> consumerFactory;
 	
 	// ...
    
    public void startup(final boolean isOrderly) {
        // 创建一个定制的线程池,用于消费者
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        // 根据是否有序来调整消费者数量和选择事件工厂
        if (isOrderly) {
            // 有序模式下,消费者数量设为1,使用有序的事件工厂
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            // 无序模式下,使用默认的事件工厂
            eventFactory = new DisruptorEventFactory<>();
        }
        // 创建Disruptor实例,配置其基本参数
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        // 创建消费者数组,根据newConsumerSize指定的大小
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        // 将消费者注册到Disruptor,使用工作池模式
        disruptor.handleEventsWithWorkerPool(consumers);
        // 设置默认的异常处理器,这里选择忽略异常
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        // 启动Disruptor
        disruptor.start();
        // 获取Disruptor的环形缓冲区,用于发布事件
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        // 创建并存储DisruptorProvider实例,用于向Disruptor发布事件
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }
}
  • 当接收到一个事件时,QueueConsumer 将任务交给线程池去处理事件,处理事件的 Runnable 接口由工厂 factory 产生。
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
	
	// ...
	
	private final QueueConsumerFactory<T> factory;
	
	// ...
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (Objects.nonNull(t)) {
            ThreadPoolExecutor executor = orderly(t);
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            executor.execute(queueConsumerExecutor);
        }
    }
}
  • QueueConsumerExecutor 在 Client 端的消费者执行器 RegisterClientConsumerExecutor
/**
 * The type Consumer executor.
 */
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
    
    private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
    
    private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
        this.subscribers = new EnumMap<>(executorSubscriberMap);
    }

    @Override
    // run 接口继承自 QueueConsumerExecutor,而 QueueConsumerExecutor 继承自 Runnable
    public void run() {
        final T data = getData();
       	// subscribers 拿到 ExecutorTypeSubscriber 去处理数据 data
        subscribers.get(data.getType()).executor(Lists.newArrayList(data));
    }
    
    /**
     * The type Register client executor factory.
     */
    public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
        
        @Override
        public RegisterClientConsumerExecutor<T> create() {
            Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
                    .stream()
                    // 将 AbstractQueueConsumerFactory.getSubscribers()
                    // 接口返回的 ExecutorSubscriber<T> 转为 ExecutorTypeSubscriber<T>,
                    // 其带有 getType 接口
                    .map(e -> (ExecutorTypeSubscriber<T>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
            return new RegisterClientConsumerExecutor<>(map);
        }

        @Override
        public String fixName() {
            return "shenyu_register_client";
        }
    }
}

ExecutorTypeSubscriber 继承自 ExecutorSubscriber :

public interface ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T> {`

从下图的 ExecutorTypeSubscriber 接口的实现类可以看到,在 Client 端有 3 个 Subscriber
Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端),apache

我们这个例子看的是URI,所以就以 ShenyuClientURIExecutorSubscriber 举例。

数据交由 ShenyuClientURIExecutorSubscriber 执行处理

  • ShenyuClientURIExecutorSubscriber.execute()
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
	
	// ...
    
    private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;
    @Override
    public void executor(final Collection<URIRegisterDTO> dataList) {
        for (URIRegisterDTO uriRegisterDTO : dataList) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            while (true) {
                // 连得上就跳出死循环
                try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
                    break;
                } catch (IOException e) {
                    long sleepTime = 1000;
                    // maybe the port is delay exposed
                    if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
                        LOG.error("host:{}, port:{} connection failed, will retry",
                                uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
                        // If the connection fails for a long time, Increase sleep time
                        if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
                            sleepTime = 10000;
                        }
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException ex) {
                        LOG.error("interrupted when sleep", ex);
                    }
                }
            }
            // 1. 延迟应用关闭时的其他钩子
            ShenyuClientShutdownHook.delayOtherHooks();
            // 2. 给 Admin 端发送 DTO 注册信息
            shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
            // 3. 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
            ShutdownHookManager.get().addShutdownHook(new Thread(() -> {
                final URIRegisterDTO offlineDTO = new URIRegisterDTO();
                BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);
                offlineDTO.setEventType(EventType.OFFLINE);
                // 给 Admin 端发送下线 DTO
                shenyuClientRegisterRepository.offline(offlineDTO);
            }), 2);
        }
    }
}

有三个方法需要说明:

  1. ShenyuClientShutdownHook.delayOtherHooks() 延迟应用关闭时的其他钩子
  2. ShenyuClientRegisterRepository.persistURI() 给 Admin 端发送 DTO 注册信息
  3. ShutdownHookManager.get().addShutdownHook() 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
延迟应用关闭时的其他钩子
  • ShenyuClientShutdownHook.delayOtherHooks()

    1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次
    2. 一个接管其他钩子的线程
public class ShenyuClientShutdownHook {
	
	// ...
	
	private static final AtomicBoolean DELAY = new AtomicBoolean(false);

    private static String hookNamePrefix = "ShenyuClientShutdownHook";

    private static AtomicInteger hookId = new AtomicInteger(0);

    private static Properties props;

    private static IdentityHashMap<Thread, Thread> delayHooks = new IdentityHashMap<>();

    private static IdentityHashMap<Thread, Thread> delayedHooks = new IdentityHashMap<>();
	
	// ....
	
    /**
     * Delay other shutdown hooks.
     */
    public static void delayOtherHooks() {
    	// 1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次
        if (!DELAY.compareAndSet(false, true)) {
            return;
        }
        // 2. 一个接管其他钩子的线程
        TakeoverOtherHooksThread thread = new TakeoverOtherHooksThread();
        thread.start();
    }

    /**
     * Delay other shutdown hooks thread.
     */
    private static class TakeoverOtherHooksThread extends Thread {
        @Override
        // 1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
        public void run() {
            int shutdownWaitTime = Integer.parseInt(props.getProperty("shutdownWaitTime", "3000"));
            int delayOtherHooksExecTime = Integer.parseInt(props.getProperty("delayOtherHooksExecTime", "2000"));
            IdentityHashMap<Thread, Thread> hooks = null;
            try {
                // 2. 通过反射拿到应用关闭时的所有钩子
                Class<?> clazz = Class.forName(props.getProperty("applicationShutdownHooksClassName", "java.lang.ApplicationShutdownHooks"));
                Field field = clazz.getDeclaredField(props.getProperty("applicationShutdownHooksFieldName", "hooks"));
                field.setAccessible(true);
                hooks = (IdentityHashMap<Thread, Thread>) field.get(clazz);
            } catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) {
                LOG.error(ex.getMessage(), ex);
            }
            long s = System.currentTimeMillis();
            // 3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
            // GPT:
            // 答:1. 避免死锁或长时间阻塞
            //    2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
            //    3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。
            //       在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。
            //    确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。
            while (System.currentTimeMillis() - s < delayOtherHooksExecTime) {
                for (Iterator<Thread> iterator = Objects.requireNonNull(hooks).keySet().iterator(); iterator.hasNext();) {
                    Thread hook = iterator.next();
                    // 4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
                    if (hook.getName().startsWith(hookNamePrefix)) {
                        continue;
                    }
                    // 5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
                    if (delayHooks.containsKey(hook) || delayedHooks.containsKey(hook)) {
                        continue;
                    }
                    Thread delayHook = new Thread(() -> {
                        LOG.info("sleep {}ms", shutdownWaitTime);
                        try {
                            // 6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
                            TimeUnit.MILLISECONDS.sleep(shutdownWaitTime);
                        } catch (InterruptedException ex) {
                            LOG.error(ex.getMessage(), ex);
                        }
                        hook.run();
                    }, hook.getName());
                    delayHooks.put(delayHook, delayHook);
                    // 7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHook
                    iterator.remove();
                }

                for (Iterator<Thread> iterator = delayHooks.keySet().iterator(); iterator.hasNext();) {
                    Thread delayHook = iterator.next();
                    // 8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooks
                    Runtime.getRuntime().addShutdownHook(delayHook);
                    // 9. 加入已处理过的钩子 map,
                    delayedHooks.put(delayHook, delayHook);
                    iterator.remove();
                    LOG.info("hook {} will sleep {}ms when it start", delayHook.getName(), shutdownWaitTime);
                }
                try {
                    // 10. 睡眠 100ms,目的是?
                    // GPT:
                    // 答:1. 减少CPU使用率
                    //    2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException ex) {
                    LOG.error(ex.getMessage(), ex);
                }
            }
            // 帮助 GC
            hookNamePrefix = null;
            hookId = new AtomicInteger(0);
            props = null;
            delayHooks = null;
            delayedHooks = null;
        }
    }	
}
  • TakeoverOtherHooksThread.run()

    代码如上面给出的:

    1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
    2. 通过反射拿到应用关闭时的所有钩子
    3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
      GPT:
      答:
      1. 避免死锁或长时间阻塞
      2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
      3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。
    4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
    5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
    6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
    7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHook
    8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooks
    9. 加入已处理过的钩子 map
    10. 睡眠 100ms,目的是?
      GPT:
      答:
      1. 减少CPU使用率
      2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
给 Admin 端发送 DTO 注册信息
  • ShenyuClientRegisterRepository.persistURI()

    ShenyuClientRegisterRepositoryFailbackRegistryRepositoryHttpClientRegisterRepository继承关系如下图 Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端),apache

  • ShenyuClientRegisterRepository.persistURI()

/**
 * Shenyu client register repository.
 */
@SPI
public interface ShenyuClientRegisterRepository {

    /**
     * Init.
     *
     * @param config the config
     */
    default void init(ShenyuRegisterCenterConfig config) {
    }
    
    /**
     * Persist metadata.
     *
     * @param metadata metadata
     */
    void persistInterface(MetaDataRegisterDTO metadata);
    
    /**
     * Persist uri.
     *
     * @param registerDTO the register dto
     */
    default void persistURI(URIRegisterDTO registerDTO) {
    }

    /**
     * Node active offline when shutdown.
     *
     * @param offlineDTO the offline dto
     */
    default void offline(URIRegisterDTO offlineDTO) {
    }

    /**
     * persistApiDoc.
     * @param apiDocRegisterDTO apiDocRegisterDTO
     */
    default void persistApiDoc(ApiDocRegisterDTO apiDocRegisterDTO) {
    }
    
    /**
     * closeRepository.
     * If the close method is used, Spring will call it by default when the bean is destroyed,
     * So its method name is closeRepository to avoid being called by default when the bean is destroyed.
     */
    default void closeRepository() {
    }
}
  • FailbackRegistryRepository.persistURI()

    这里同样用到了模板方法,doPersistURI 交由子类 HttpClientRegisterRepository 实现

public abstract class FailbackRegistryRepository implements ShenyuClientRegisterRepository {
	
	// ... 
	
    @Override
    public void persistURI(final URIRegisterDTO registerDTO) {
        try {
        	// 1. 同样是模板方法,交由子类 HttpClientRegisterRepository 实现
            this.doPersistURI(registerDTO);
        } catch (Exception ex) {
            //If a failure occurs, it needs to be added to the retry list.
            logger.warn("Failed to persistURI {}, cause:{}", registerDTO, ex.getMessage());
            this.addFailureUriDataRegister(registerDTO);
        }
    }
}
  • HttpClientRegisterRepository.doPersistURI()

    1. 如果端口已被其他进程监听,则直接返回,不需要再注册
    2. 否则注册
public class HttpClientRegisterRepository extends FailbackRegistryRepository {
    
    // ...
    
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);

    private static URIRegisterDTO uriRegisterDTO;

    private static ApiDocRegisterDTO apiDocRegisterDTO;

    private String username;
    
    private String password;
    
    private List<String> serverList;
    
    /**
     * server -> accessToken.
     */
    private LoadingCache<String, String> accessToken;
    
    // ...
    
    @Override
    public void doPersistURI(final URIRegisterDTO registerDTO) {
        if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
        	// 1. 如果端口已被其他进程监听,则直接返回,不需要再注册
            return;
        }
        // 2. 否则注册
        doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
        uriRegisterDTO = registerDTO;
    }
    
    private <T> void doRegister(final T t, final String path, final String type) {
        int i = 0;
        for (String server : serverList) {
            i++;
            String concat = server.concat(path);
            try {
                String accessToken = this.accessToken.get(server);
                if (StringUtils.isBlank(accessToken)) {
                    throw new NullPointerException("accessToken is null");
                }
                // 1. 调用注册工具类进行注册
                RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
                // considering the situation of multiple clusters, we should continue to execute here
            } catch (Exception e) {
                LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
                if (i == serverList.size()) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}
  • HttpClientRegisterRepository.doRegister()

    1. 调用注册工具类进行注册(代码如上)
  • RegisterUtils.doRegister()

    1. 构建 http 的 heade
    2. 在此通过 http 调用 Admin 的服务进行注册,
      url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
      json 为要传输的注册信息
    3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
public final class RegisterUtils {

	// ...
	
    public static void doRegister(final String json, final String url, final String type, final String accessToken) throws IOException {
        if (StringUtils.isBlank(accessToken)) {
            LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
            return;
        }
        // 1. 构建 http 的 header
        Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
        // 2. 在此通过 http 调用 Admin 的服务进行注册,
        //    url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
        //    json 为要传输的注册信息
        // 3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
        String result = OkHttpTools.getInstance().post(url, json, headers);
        if (Objects.equals(SUCCESS, result)) {
            LOGGER.info("{} client register success: {} ", type, json);
        } else {
            LOGGER.error("{} client register error: {} ", type, json);
        }
    }
}
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
  • ShutdownHookManager.addShutdownHook()

    1. 向运行时添加一个关机钩子,这个钩子是一个新线程,新线程去执行 ShutdownHookManager 管理的要在关机时执行的钩子
    2. 添加关闭应用时要执行的注销注册的钩子
public final class ShutdownHookManager {
	
	// ...
	
	private static final ShutdownHookManager MGR = new ShutdownHookManager();
   
    private final Set<HookEntry> hooks =
            Collections.synchronizedSet(new HashSet<HookEntry>());	
    
    static {
    	// 1. 向运行时添加一个关机钩子,这个钩子是一个新线程,
    	// 新线程去执行 ShutdownHookManager  管理的要在关机的钩子
        Runtime.getRuntime().addShutdownHook(
                new Thread(() -> {
                    MGR.shutdownInProgress.set(true);
                    for (Runnable hook : MGR.getShutdownHooksInOrder()) {
                        try {
                            hook.run();
                        } catch (Throwable ex) {
                            LOG.error(ex.getMessage(), ex);
                        }
                    }
                })
        );
    }

	// ...
	
    public void addShutdownHook(final Runnable shutdownHook, final int priority) {
        if (shutdownHook == null) {
            throw new IllegalArgumentException("shutdownHook cannot be NULL");
        }
        if (shutdownInProgress.get()) {
            throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
        }
        // 2. 添加关闭应用时要执行的注销注册的钩子
        hooks.add(new HookEntry(shutdownHook, priority));
    }
}

一张图总结

Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端),apache文章来源地址https://www.toymoban.com/news/detail-825051.html

到了这里,关于Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache ShenYu 学习笔记一

    这是一个异步的,高性能的,跨语言的,响应式的 API 网关。 官网文档:https://shenyu.apache.org/zh/docs/index 仓库地址:https://github.com/apache/shenyu 本次体验基本参照官方快速开始文档步骤 开发工具:IDEA JDK:1.8 IDEA打开上一步下载好的项目 找到 shenyu-admin 子项目,运行 ShenyuAdminBoo

    2024年02月01日
    浏览(50)
  • Apache Doris 聚合函数源码阅读与解析|源码解读系列

    笔者最近由于工作需要开始调研 Apache Doris,通过阅读聚合函数代码切入 Apache Doris 内核,同时也秉承着开源的精神,开发了 array_agg 函数并贡献给社区。笔者通过这篇文章记录下对源码的一些理解,同时也方便后面的新人更快速地上手源码开发。 聚合函数,顾名思义,即对一

    2024年01月25日
    浏览(29)
  • 怎样成为优秀的后端工程师

    本文翻译自国外论坛 medium,原文地址:https://medium.com/@pradeesh-kumar/how-to-become-a-good-backend-engineer-9da75202a104 让我们一起看看国外开发者认为优秀后端工程师需要掌握哪些技能。 本质上,软件开发有两个要素:前端和后端。当访问者登陆网站时,他们会看到 UI 与之交互,即前端

    2024年02月03日
    浏览(34)
  • 构建WebRTC技术需要的后端服务

    📢欢迎点赞 :👍 收藏 ⭐留言 📝 如有错误敬请指正,赐人玫瑰,手留余香! 📢本文作者:由webmote 原创 📢作者格言:新的征程,我们面对的不是技术而是人心,人心不可测,海水不可量,唯有技术,才是深沉黑夜中的一座闪烁的灯塔 ! 当下直播界最炙手可热的技术,

    2024年02月15日
    浏览(33)
  • 新的后端渲染:服务器驱动UI

    通过API发送UI是一种彻底的新方法,将改变传统的UI开发。 一项正在改变我们对用户界面 (UI) 的看法的技术是通过 API 发送 UI,也称为 服务器驱动UI 。这种方法提供了新水平的活力和灵活性,正在改变 UI 开发的传统范例。 服务器驱动 UI 不仅仅是一个理论概念;它也是一个概

    2024年02月11日
    浏览(33)
  • 汇编代码生成和编译器的后端

    基于SLR(1)分析的语义分析及中间代码生成程序-CSDN博客 https://blog.csdn.net/lijj0304/article/details/135097554?spm=1001.2014.3001.5501 在前面编译器前端实现的基础上,将所生成的中间代码翻译成某种目标机的汇编代码,实现编译器后端实现的任务。然后进一步实现程序的输入是源程序,输出

    2024年01月21日
    浏览(36)
  • 博客系统的后端设计(八) - 实现发布博客功能

    在原来的编辑页面点击发布文章按钮,是不会有什么效果的。 这是因为此时还不能实现前后端的交互。 请求使用 POST ,路径是 /blog title=这是标题content=这是正文 请求中要有 body,按照 form 表单的方式添加进去。 响应使用 HTTP/1.1 302 跳转到列表页:Location: blog.list.html 在一篇博

    2024年02月07日
    浏览(34)
  • 基于模块自定义扩展字段的后端逻辑实现(一)

    目录 一:背景介绍 二:实现过程 三:字段标准化 四:数据存储 五:数据扩展 六:表的设计 一:背景介绍   最近要做一个系统,里面涉及一个模块是使用拖拉拽的形式配置模块使用的字段表单,主要包括新建/编辑模块,模块详情等。这里涉及的重点是新建模块的表单是手

    2024年02月02日
    浏览(25)
  • Node.js与TypeScript:优雅的后端开发方式

    随着前端 JavaScript语言的不断发展, Node.js 开发环境也越来越受到开发者们的欢迎。 Node.js 让我们可以轻松地使用 JavaScript 来编写服务器端应用,从而实现前后端一致的开发体验。在 Node.js 的发展历程中, TypeScript 也逐渐成为了一种备受欢迎的编程语言,它的出现大大提高了

    2024年02月10日
    浏览(54)
  • Django form组件 - 神奇的后端直接渲染HTML

    之前在HTML页面中利用form表单向后端提交数据时会写一些获取用户输入的标签并且使用form标签将其包裹起来。并且很多场景下都需要对用户的输入做校验,比如用户输入的长度和格式等,如果用户输入的有误就需要在页面上相应的位置显示相应的错误信息。而django form组件实

    2024年02月02日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包