前言
在真正测试 Divide 插件时,想要知道后端服务(以下称为 Client)是如何将自己的信息注册到管理台(以下称为 Client)。这里后端服务用的是 shenyu 自带的 http 的例子,项目名字为 shenyu-examples-http。
下图描述了本文研究的内容——服务注册时 Client端向 Admin 注册的数据同步——在 shenyu 架构中处于什么位置。红色部分都是我自己加的,在官网的图中没有。
阅读准备
Disruptor入门及应用
正文
Client事件监听器监听本地的 Context 的刷新事件
当 Client (Spring 应用)依赖注入后,Spring 框架会刷新上下文 Context,这时,shenyu 自定义的一个监听 ContextRefreshedEvent 的监听器 SpringMvcClientEventListener (AbstractContextRefreshedEventListener 的子类)会触发 onApplicationEvent 方法。
- AbstractContextRefreshedEventListener.onApplicationEvent()
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {
// ...
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {
context = event.getApplicationContext();
// 1. 拿到 beans
Map<String, T> beans = getBeans(context);
if (MapUtils.isEmpty(beans)) {
return;
}
// 2. 原子地设置 registered 为 true
if (!registered.compareAndSet(false, true)) {
return;
}
if (isDiscoveryLocalMode) {
// 3. 如果是“本地发现”模式,发布用于注册 URI 的 DTO
publisher.publishEvent(buildURIRegisterDTO(context, beans));
}
// 4. 处理每个 bean,具体是发布 bean 的注册信息给 Disruptor 的 QueueConsumer
beans.forEach(this::handle);
// 5. apiModules 的 key 是 beanName,value 是 bean 的成员变量
Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);
// 6. 处理每个 apiModules,具体是发布 apiModules 的注册信息给 Disruptor 的 QueueConsumer
apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}
protected void handle(final String beanName, final T bean) {
// ...
}
private void handleApiDoc(final Object bean, final Map<String, T> beans) {
// ...
}
}
从 SpringMvcClientEventListener.getBeans() 拿到 Beans
- SpringMvcClientEventListener.java
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {
// ...
private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();
@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {
// Filter out
// isFull 这个 Boolean 值代表的是:是否代理整个服务,目前适用于 SpringMvc/SpringCould
if (Boolean.TRUE.equals(isFull)) {
// 在全代理模式下,发布一个事件,这个事件包含了服务的元数据,用于注册服务
getPublisher().publishEvent(MetaDataRegisterDTO.builder()
.contextPath(getContextPath()) // 设置服务的上下文路径
.addPrefixed(addPrefixed) // 设置是否添加前缀
.appName(getAppName()) // 设置应用名称
.path(UriComponentsBuilder.fromUriString(PathUtils.decoratorPathWithSlash(getContextPath()) + EVERY_PATH).build().encode().toUriString())
// 设置服务的路径,这里使用了 UriComponentsBuilder 来构建URI,将上下文路径装饰后加上一个通配符,代表匹配所有路径
.rpcType(RpcTypeEnum.HTTP.getName()) // 设置远程调用类型为 HTTP
.enabled(true) // 设置服务为启用状态
.ruleName(getContextPath()) // 使用上下文路径作为规则名称
.build());
LOG.info("init spring mvc client success with isFull mode");
// 发布一个 URI 注册的事件,传入空的映射作为参数
publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
return Collections.emptyMap();
}
// shenyu-examples-http 用的不是全代理模式,因为 isFull 为 false,此时直接返回带 Controller 注解的 bean
return context.getBeansWithAnnotation(Controller.class);
}
}
publisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));
发布一个 URI 注册的事件,传入空的映射作为参数。
ShenyuClientRegisterEventPublisher 给 Client 端的 Disruptor 的 QueueConsumer 发布要向 Admin 注册的数据(是的,此时还没传给 Admin,还停留在 Client 端)
- ShenyuClientRegisterEventPublisher.publishEvent() 调用 DisruptorProvider.onData() 传递数据
public class ShenyuClientRegisterEventPublisher {
// ...
private DisruptorProviderManage<DataTypeParent> providerManage;
public void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
// data 传给 Disruptor provider
provider.onData(data);
}
}
-
DisruptorProvider 传递给 RingBuffer.publishEvent(),最终将注册的信息发布给 Diruptor 的 QueueConsumer。
ps: Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。
public class DisruptorProvider<T> {
// ...
private final RingBuffer<DataEvent<T>> ringBuffer;
private final boolean isOrderly;
private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
// ...
public void onData(final T data) {
if (isOrderly) {
throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");
}
try {
// 由 ringBuffer 发布事件
ringBuffer.publishEvent(translatorOneArg, data);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
}
由 QueueConsumer.onEvent() 接收 RingBuffer.publishEvent() 发布的事件,并进行处理
- 从 DisruptorProviderManage.startup 的源码中可以看到,在创建 Disruptor 时,线程池 OrderlyExecutor 被传进了 QueueConsumer,
public class DisruptorProviderManage<T> {
// ...
private final Integer consumerSize;
private final QueueConsumerFactory<T> consumerFactory;
// ...
public void startup(final boolean isOrderly) {
// 创建一个定制的线程池,用于消费者
OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
int newConsumerSize = this.consumerSize;
EventFactory<DataEvent<T>> eventFactory;
// 根据是否有序来调整消费者数量和选择事件工厂
if (isOrderly) {
// 有序模式下,消费者数量设为1,使用有序的事件工厂
newConsumerSize = 1;
eventFactory = new OrderlyDisruptorEventFactory<>();
} else {
// 无序模式下,使用默认的事件工厂
eventFactory = new DisruptorEventFactory<>();
}
// 创建Disruptor实例,配置其基本参数
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
size,
DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
ProducerType.MULTI,
new BlockingWaitStrategy());
// 创建消费者数组,根据newConsumerSize指定的大小
@SuppressWarnings("all")
QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
for (int i = 0; i < newConsumerSize; i++) {
consumers[i] = new QueueConsumer<>(executor, consumerFactory);
}
// 将消费者注册到Disruptor,使用工作池模式
disruptor.handleEventsWithWorkerPool(consumers);
// 设置默认的异常处理器,这里选择忽略异常
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 启动Disruptor
disruptor.start();
// 获取Disruptor的环形缓冲区,用于发布事件
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
// 创建并存储DisruptorProvider实例,用于向Disruptor发布事件
provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
}
}
- 当接收到一个事件时,QueueConsumer 将任务交给线程池去处理事件,处理事件的 Runnable 接口由工厂 factory 产生。
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// ...
private final QueueConsumerFactory<T> factory;
// ...
@Override
public void onEvent(final DataEvent<T> t) {
if (Objects.nonNull(t)) {
ThreadPoolExecutor executor = orderly(t);
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
queueConsumerExecutor.setData(t.getData());
// help gc
t.setData(null);
executor.execute(queueConsumerExecutor);
}
}
}
- QueueConsumerExecutor 在 Client 端的消费者执行器 RegisterClientConsumerExecutor
/**
* The type Consumer executor.
*/
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
this.subscribers = new EnumMap<>(executorSubscriberMap);
}
@Override
// run 接口继承自 QueueConsumerExecutor,而 QueueConsumerExecutor 继承自 Runnable
public void run() {
final T data = getData();
// subscribers 拿到 ExecutorTypeSubscriber 去处理数据 data
subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}
/**
* The type Register client executor factory.
*/
public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
@Override
public RegisterClientConsumerExecutor<T> create() {
Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
.stream()
// 将 AbstractQueueConsumerFactory.getSubscribers()
// 接口返回的 ExecutorSubscriber<T> 转为 ExecutorTypeSubscriber<T>,
// 其带有 getType 接口
.map(e -> (ExecutorTypeSubscriber<T>) e)
.collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
return new RegisterClientConsumerExecutor<>(map);
}
@Override
public String fixName() {
return "shenyu_register_client";
}
}
}
ExecutorTypeSubscriber 继承自 ExecutorSubscriber :
public interface ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T> {`
从下图的 ExecutorTypeSubscriber 接口的实现类可以看到,在 Client 端有 3 个 Subscriber
我们这个例子看的是URI,所以就以 ShenyuClientURIExecutorSubscriber 举例。
数据交由 ShenyuClientURIExecutorSubscriber 执行处理
- ShenyuClientURIExecutorSubscriber.execute()
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {
// ...
private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;
@Override
public void executor(final Collection<URIRegisterDTO> dataList) {
for (URIRegisterDTO uriRegisterDTO : dataList) {
Stopwatch stopwatch = Stopwatch.createStarted();
while (true) {
// 连得上就跳出死循环
try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {
break;
} catch (IOException e) {
long sleepTime = 1000;
// maybe the port is delay exposed
if (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {
LOG.error("host:{}, port:{} connection failed, will retry",
uriRegisterDTO.getHost(), uriRegisterDTO.getPort());
// If the connection fails for a long time, Increase sleep time
if (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {
sleepTime = 10000;
}
}
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException ex) {
LOG.error("interrupted when sleep", ex);
}
}
}
// 1. 延迟应用关闭时的其他钩子
ShenyuClientShutdownHook.delayOtherHooks();
// 2. 给 Admin 端发送 DTO 注册信息
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
// 3. 向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
ShutdownHookManager.get().addShutdownHook(new Thread(() -> {
final URIRegisterDTO offlineDTO = new URIRegisterDTO();
BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);
offlineDTO.setEventType(EventType.OFFLINE);
// 给 Admin 端发送下线 DTO
shenyuClientRegisterRepository.offline(offlineDTO);
}), 2);
}
}
}
有三个方法需要说明:
-
ShenyuClientShutdownHook.delayOtherHooks()
延迟应用关闭时的其他钩子 -
ShenyuClientRegisterRepository.persistURI()
给 Admin 端发送 DTO 注册信息 -
ShutdownHookManager.get().addShutdownHook()
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
延迟应用关闭时的其他钩子
-
ShenyuClientShutdownHook.delayOtherHooks()
- 利用 CAS 不加锁地确保并发时
TakeoverOtherHooksThread
线程只被运行一次 - 一个接管其他钩子的线程
- 利用 CAS 不加锁地确保并发时
public class ShenyuClientShutdownHook {
// ...
private static final AtomicBoolean DELAY = new AtomicBoolean(false);
private static String hookNamePrefix = "ShenyuClientShutdownHook";
private static AtomicInteger hookId = new AtomicInteger(0);
private static Properties props;
private static IdentityHashMap<Thread, Thread> delayHooks = new IdentityHashMap<>();
private static IdentityHashMap<Thread, Thread> delayedHooks = new IdentityHashMap<>();
// ....
/**
* Delay other shutdown hooks.
*/
public static void delayOtherHooks() {
// 1. 利用 CAS 不加锁地确保并发时 TakeoverOtherHooksThread 线程只被运行一次
if (!DELAY.compareAndSet(false, true)) {
return;
}
// 2. 一个接管其他钩子的线程
TakeoverOtherHooksThread thread = new TakeoverOtherHooksThread();
thread.start();
}
/**
* Delay other shutdown hooks thread.
*/
private static class TakeoverOtherHooksThread extends Thread {
@Override
// 1. 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
public void run() {
int shutdownWaitTime = Integer.parseInt(props.getProperty("shutdownWaitTime", "3000"));
int delayOtherHooksExecTime = Integer.parseInt(props.getProperty("delayOtherHooksExecTime", "2000"));
IdentityHashMap<Thread, Thread> hooks = null;
try {
// 2. 通过反射拿到应用关闭时的所有钩子
Class<?> clazz = Class.forName(props.getProperty("applicationShutdownHooksClassName", "java.lang.ApplicationShutdownHooks"));
Field field = clazz.getDeclaredField(props.getProperty("applicationShutdownHooksFieldName", "hooks"));
field.setAccessible(true);
hooks = (IdentityHashMap<Thread, Thread>) field.get(clazz);
} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) {
LOG.error(ex.getMessage(), ex);
}
long s = System.currentTimeMillis();
// 3. 限制处理钩子的时间在 delayOtherHooksExecTime 之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
// GPT:
// 答:1. 避免死锁或长时间阻塞
// 2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
// 3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。
// 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。
// 确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。
while (System.currentTimeMillis() - s < delayOtherHooksExecTime) {
for (Iterator<Thread> iterator = Objects.requireNonNull(hooks).keySet().iterator(); iterator.hasNext();) {
Thread hook = iterator.next();
// 4. 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
if (hook.getName().startsWith(hookNamePrefix)) {
continue;
}
// 5. 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
if (delayHooks.containsKey(hook) || delayedHooks.containsKey(hook)) {
continue;
}
Thread delayHook = new Thread(() -> {
LOG.info("sleep {}ms", shutdownWaitTime);
try {
// 6. 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
TimeUnit.MILLISECONDS.sleep(shutdownWaitTime);
} catch (InterruptedException ex) {
LOG.error(ex.getMessage(), ex);
}
hook.run();
}, hook.getName());
delayHooks.put(delayHook, delayHook);
// 7. 从原本的钩子 map 中移除这个原本要执行的钩子,即 delayHook
iterator.remove();
}
for (Iterator<Thread> iterator = delayHooks.keySet().iterator(); iterator.hasNext();) {
Thread delayHook = iterator.next();
// 8. 向运行时加入用来延迟执行原本钩子的钩子,即 delayedHooks
Runtime.getRuntime().addShutdownHook(delayHook);
// 9. 加入已处理过的钩子 map,
delayedHooks.put(delayHook, delayHook);
iterator.remove();
LOG.info("hook {} will sleep {}ms when it start", delayHook.getName(), shutdownWaitTime);
}
try {
// 10. 睡眠 100ms,目的是?
// GPT:
// 答:1. 减少CPU使用率
// 2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ex) {
LOG.error(ex.getMessage(), ex);
}
}
// 帮助 GC
hookNamePrefix = null;
hookId = new AtomicInteger(0);
props = null;
delayHooks = null;
delayedHooks = null;
}
}
}
-
TakeoverOtherHooksThread.run()
代码如上面给出的:
- 该线程用于生成钩子,这些钩子用来延迟执行已经添加的钩子,为的是处理一些资源的关闭,和注册信息的注销
- 通过反射拿到应用关闭时的所有钩子
- 限制处理钩子的时间在
delayOtherHooksExecTime
之内,为什么要控制时间,难道不会遗漏一些钩子无法延迟吗?
GPT:
答:
1. 避免死锁或长时间阻塞
2. 可以确保这个延迟逻辑不会过度拖延应用的关闭过程
3. 实用性考虑: 在大多数情况下,如果在给定的时间内无法连接到或修改某些钩子,可能是因为存在一些异常或特殊情况。 在这种情况下,继续等待可能不会带来太多好处,而是增加了关闭过程的复杂性和不确定性。确实,这种方法可能会遗漏一些在延迟期间新注册的钩子,但这通常是一个权衡的结果,设计者可能认为这种情况很少发生,或者遗漏的风险相对较小。 - 用于延迟执行原本钩子的钩子不必再延迟,所以跳过
- 正在处理的延迟的钩子和处理过的延迟的钩子不必再延迟,所以跳过
- 先睡眠 shutdownWaitTime,然后再执行原本的在应用关闭时的钩子
- 从原本的钩子 map 中移除这个原本要执行的钩子,即
delayHook
- 向运行时加入用来延迟执行原本钩子的钩子,即
delayedHooks
- 加入已处理过的钩子 map
- 睡眠 100ms,目的是?
GPT:
答:
1. 减少CPU使用率
2. 给其他操作留出处理时间,通过在每次循环后短暂休眠,可以给其他线程运行的机会
给 Admin 端发送 DTO 注册信息
-
ShenyuClientRegisterRepository.persistURI()
ShenyuClientRegisterRepository
、FailbackRegistryRepository
、HttpClientRegisterRepository
继承关系如下图 -
ShenyuClientRegisterRepository.persistURI()
/**
* Shenyu client register repository.
*/
@SPI
public interface ShenyuClientRegisterRepository {
/**
* Init.
*
* @param config the config
*/
default void init(ShenyuRegisterCenterConfig config) {
}
/**
* Persist metadata.
*
* @param metadata metadata
*/
void persistInterface(MetaDataRegisterDTO metadata);
/**
* Persist uri.
*
* @param registerDTO the register dto
*/
default void persistURI(URIRegisterDTO registerDTO) {
}
/**
* Node active offline when shutdown.
*
* @param offlineDTO the offline dto
*/
default void offline(URIRegisterDTO offlineDTO) {
}
/**
* persistApiDoc.
* @param apiDocRegisterDTO apiDocRegisterDTO
*/
default void persistApiDoc(ApiDocRegisterDTO apiDocRegisterDTO) {
}
/**
* closeRepository.
* If the close method is used, Spring will call it by default when the bean is destroyed,
* So its method name is closeRepository to avoid being called by default when the bean is destroyed.
*/
default void closeRepository() {
}
}
-
FailbackRegistryRepository.persistURI()
这里同样用到了模板方法,
doPersistURI
交由子类HttpClientRegisterRepository
实现
public abstract class FailbackRegistryRepository implements ShenyuClientRegisterRepository {
// ...
@Override
public void persistURI(final URIRegisterDTO registerDTO) {
try {
// 1. 同样是模板方法,交由子类 HttpClientRegisterRepository 实现
this.doPersistURI(registerDTO);
} catch (Exception ex) {
//If a failure occurs, it needs to be added to the retry list.
logger.warn("Failed to persistURI {}, cause:{}", registerDTO, ex.getMessage());
this.addFailureUriDataRegister(registerDTO);
}
}
}
-
HttpClientRegisterRepository.doPersistURI()
- 如果端口已被其他进程监听,则直接返回,不需要再注册
- 否则注册
public class HttpClientRegisterRepository extends FailbackRegistryRepository {
// ...
private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientRegisterRepository.class);
private static URIRegisterDTO uriRegisterDTO;
private static ApiDocRegisterDTO apiDocRegisterDTO;
private String username;
private String password;
private List<String> serverList;
/**
* server -> accessToken.
*/
private LoadingCache<String, String> accessToken;
// ...
@Override
public void doPersistURI(final URIRegisterDTO registerDTO) {
if (RuntimeUtils.listenByOther(registerDTO.getPort())) {
// 1. 如果端口已被其他进程监听,则直接返回,不需要再注册
return;
}
// 2. 否则注册
doRegister(registerDTO, Constants.URI_PATH, Constants.URI);
uriRegisterDTO = registerDTO;
}
private <T> void doRegister(final T t, final String path, final String type) {
int i = 0;
for (String server : serverList) {
i++;
String concat = server.concat(path);
try {
String accessToken = this.accessToken.get(server);
if (StringUtils.isBlank(accessToken)) {
throw new NullPointerException("accessToken is null");
}
// 1. 调用注册工具类进行注册
RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);
// considering the situation of multiple clusters, we should continue to execute here
} catch (Exception e) {
LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());
if (i == serverList.size()) {
throw new RuntimeException(e);
}
}
}
}
}
-
HttpClientRegisterRepository.doRegister()
- 调用注册工具类进行注册(代码如上)
-
RegisterUtils.doRegister()
- 构建 http 的 heade
- 在此通过 http 调用 Admin 的服务进行注册,
url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
json 为要传输的注册信息 - OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
public final class RegisterUtils {
// ...
public static void doRegister(final String json, final String url, final String type, final String accessToken) throws IOException {
if (StringUtils.isBlank(accessToken)) {
LOGGER.error("{} client register error accessToken is null, please check the config : {} ", type, json);
return;
}
// 1. 构建 http 的 header
Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN, accessToken).build();
// 2. 在此通过 http 调用 Admin 的服务进行注册,
// url 为 Admin 端的注册用的接口,有 localhost:9095/shenyu-client/register-metadata 等url;
// json 为要传输的注册信息
// 3. OkHttpTools 是 shenyu 对 okhttp 外部组件的封装
String result = OkHttpTools.getInstance().post(url, json, headers);
if (Objects.equals(SUCCESS, result)) {
LOGGER.info("{} client register success: {} ", type, json);
} else {
LOGGER.error("{} client register error: {} ", type, json);
}
}
}
向应用添加一个钩子,使得在应用关闭时,应用自动开启一个新线程去注销注册信息
-
ShutdownHookManager.addShutdownHook()文章来源:https://www.toymoban.com/news/detail-825051.html
- 向运行时添加一个关机钩子,这个钩子是一个新线程,新线程去执行 ShutdownHookManager 管理的要在关机时执行的钩子
- 添加关闭应用时要执行的注销注册的钩子
public final class ShutdownHookManager {
// ...
private static final ShutdownHookManager MGR = new ShutdownHookManager();
private final Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>());
static {
// 1. 向运行时添加一个关机钩子,这个钩子是一个新线程,
// 新线程去执行 ShutdownHookManager 管理的要在关机的钩子
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
MGR.shutdownInProgress.set(true);
for (Runnable hook : MGR.getShutdownHooksInOrder()) {
try {
hook.run();
} catch (Throwable ex) {
LOG.error(ex.getMessage(), ex);
}
}
})
);
}
// ...
public void addShutdownHook(final Runnable shutdownHook, final int priority) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
}
// 2. 添加关闭应用时要执行的注销注册的钩子
hooks.add(new HookEntry(shutdownHook, priority));
}
}
一张图总结
文章来源地址https://www.toymoban.com/news/detail-825051.html
到了这里,关于Apache 神禹(shenyu)源码阅读(三)——被网关路由的后端服务 Client 向 Admin 注册的数据传输(Client端)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!