Vert.x 源码解析(4.x)——Local EvnentBus入门使用和源码解析

这篇具有很好参考价值的文章主要介绍了Vert.x 源码解析(4.x)——Local EvnentBus入门使用和源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Vert.x 源码解析(4.x)——Local EvnentBus入门使用和源码解析

目录

Vert.x 源码解析(4.x)——Local EvnentBus入门使用和源码解析,Vert.x,java

1.简介

Vert.x EventBus是一个分布式事件总线,用于在Vert.x应用程序内或跨多个Vert.x应用程序之间实现异步通信。它允许不同的组件、模块或服务之间通过消息进行交互,从而实现松耦合和高度可扩展的架构。

EventBus分为两种,一种是Local模式(项目内),一种是Clustered(集群进行传输)。

本文先介绍用法和Local模式传输的源码,下一文章介绍Clustered集群传输的源码

2.基本概念

在深入源码之前,让我们先来了解一些EventBus的基本概念:

  • 地址(Address):每个事件都有一个唯一的地址,可以通过地址来发送和接收消息。地址在EventBus中起到标识作用,类似于消息主题。

  • 消息(Message):消息是在地址之间传递的数据单元,它包含了实际的信息以及一些元数据,如发送者、接收者、消息体等。

  • 生产者(Producer):负责向指定地址发送消息的组件,主要分为点对点,广播,以及需要得到回复的信息

  • 处理器(Handler):将处理器注册在某个地址上并处理接受到的信息,同一个地址可以注册许多不同的处理器,就好比观察者模式里的订阅者。

  • **发布订阅:**Event Bus支持 发布(publish)消息 功能。消息将被发布到一个地址上。 发布意味着信息会被传递给所有注册在该地址上的处理器。

    即我们熟悉的 发布/订阅 消息传递模式。

3. 入门使用

3.1 获取EventBus

每一个Vertx实例仅有一个EventBus实例

EventBus eb = vertx.eventBus();

注册处理器

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {
  System.out.println("I have received a message: " + message.body());
});

当消息达到您的处理器时,该消息会被放入 message 参数进行处理器的调用。

调用 consumer() 方法会返回一个 MessageConsumer 对象。

该对象后续可用于注销处理器,或者流式地处理该对象。

您也可以使用 consumer 方法直接返回一个不带处理器的 MessageConsumer, 之后再在这个返回的对象上设置处理器。如:

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());
});

在向集群模式下的 Event Bus 注册处理器时, 注册信息会花费一些时间才能传播到集群中的所有节点。

若您希望在完成注册后收到通知,您可以在 MessageConsumer 对象上注册 一个 completion handler

consumer.completionHandler(res -> {
  if (res.succeeded()) {
    System.out.println("The handler registration has reached all nodes");
  } else {
    System.out.println("Registration failed!");
  }
});

3.2 注销处理器

您可以通过 unregister 方法来注销处理器。

若您在使用集群模式的 Event Bus,注销处理器的动作会花费一些时间在节点中传播。若您想 在完成后收到通知,可以使用 unregister 方法注册回调:

consumer.unregister(res -> {
  if (res.succeeded()) {
    System.out.println("The handler un-registration has reached all nodes");
  } else {
    System.out.println("Un-registration failed!");
  }
});

3.3 发布消息

发布消息很简单,只需使用 publish 方法指定一个地址去发布即可。

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

这个消息将会传递给所有在地址 news.uk.sport 上注册过的处理器。

3.4 发送消息

在对应地址上注册过的所有处理器中,仅一个处理器能够接收到发送的消息。 这是一种点对点消息传递模式。Vert.x 使用不严格的轮询算法来选择绑定的处理器。

您可以使用 send 方法来发送消息:

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

3.5 设置消息头

在 Event Bus 上发送的消息可包含头信息。您可以在发送或发布(publish)时提供一个 DeliveryOptions 来指定头信息。例如:

DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);

3.6 消息顺序

Vert.x会按照发送者发送消息的顺序,将消息以同样的顺序传递给处理器。

3.7 消息对象

您在消息处理器中接收到的对象的类型是 Message

消息的 body 对应发送或发布(publish)的对象。

消息的头信息可以通过 headers 方法获取。

3.8 应答消息/发送回复

当使用 send 方法发送消息时, Event Bus会尝试将消息传递到注册在Event Bus上的 MessageConsumer 中。

某些情况下,发送者可以通过 请求/响应+ 模式来得知消费者已经收到并"处理"了该消息。

消费者可以通过调用 reply 方法来应答这个消息,确认该消息已被处理。

此时,它会将一个应答消息返回给发送者并调用发送者的应答处理器。

看这个例子会更清楚:

接收者:

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());
  message.reply("how interesting!");
});

发送者:

eventBus.request("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
  if (ar.succeeded()) {
    System.out.println("Received reply: " + ar.result().body());
  }
});

在应答的消息体中可以包含一些有用的信息。

“处理中”的实际含义应当由应用程序来定义。 这完全取决于消费者如何执行,Event Bus 对此并不关心。

一些例子:

  • 一个简单地实现了返回当天时间的服务, 在应答的消息里会包含当天时间信息。
  • 一个实现了持久化队列的消息消费者,可以回复 true 来表示消息已成功持久化到存储设备中,或回复 false 表示失败。
  • 一个处理订单的消息消费者可以使用 true 确认这个订单已经成功处理, 并且可以从数据库中删除。

3.9 带超时的发送

当发送带有应答处理器的消息时,可以在 DeliveryOptions 中指定一个超时时间。

如果在这个时间之内没有收到应答,则会以“失败的结果”为参数调用应答处理器。

默认超时是 30 秒

3.10 发送失败

消息发送可能会因为其他原因失败,包括:

  • 没有可用的处理器来接收消息
  • 接收者调用了 fail 方法显式声明失败

发生这些情况时,应答处理器将会以这些异常失败结果为参数进行调用。

3.11 消息编解码器

您可以在 Event Bus 中发送任何对象,只需为这个对象类型注册一个编解码器 message codec 即可。

每个消息编解码器都有一个名称,您需要在发送或发布消息时通过 DeliveryOptions 来指定:

eventBus.registerCodec(myCodec);

DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());

eventBus.send("orders", new MyPOJO(), options);

若您希望某个类总是使用特定的编解码器,那么您可以为这个类注册默认编解码器。 这样您就不需要在每次发送的时候指定了:

eventBus.registerDefaultCodec(MyPOJO.class, myCodec);

eventBus.send("orders", new MyPOJO());

您可以通过 unregisterCodec 方法注销某个消息编解码器。

消息编解码器的编码输入和解码输出不一定使用同一个类型。 例如您可以编写一个编解码器来发送 MyPOJO 类的对象,但是当消息发送给处理器后解码成 MyOtherPOJO 对象。

Vert.x 内置某些特定类型的编解码器:

  • 基础类型(字符串,字节数组,字节,整型,长整型,双精度浮点型,布尔值,短整型,字符),或者
  • 一些 Vert.x 的数据类型(buffers,JSON 数组,JSON 对象),或者
  • 实现 ClusterSerializable 接口的类型,或者
  • 实现 java.io.Serializable 接口的类型。
重要 在集群模式下,出于安全考虑,将会默认拒绝 ClusterSerializablejava.io.Serializable 对象。您可以通过提供检查类名称的函数来定义允许编码和解码的类:EventBus.clusterSerializableChecker(),以及EventBus.serializableChecker()

3.12 集群模式的 Event Bus

Event Bus 不仅仅只存在于单个 Vert.x 实例中。将网络上不同的 Vert.x 实例组合成集群, 就可以在这些实例间形成一个单一的、分布式的Event Bus。

通过编写代码启用集群模式

若您用编程的方式创建 Vert.x 实例(Vertx),则可以通过将 Vert.x 实例配置成集群模式来获取集群模式的Event Bus:

VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());
  }
});

您需要确保在您的 classpath 中(或构建工具的依赖中)包含 ClusterManager 的实现类,如默认的 HazelcastClusterManager

通过命令行启用集群模式

您可以通过以下命令以集群模式运行 Vert.x 应用:

vertx run my-verticle.js -cluster

3.13 Verticle 中的自动清理

若您在 Verticle 中注册了 Event Bus 的处理器,那么这些处理器在 Verticle 被撤销(undeploy)的时候会自动被注销。

3.14 配置 Event Bus

Event Bus 是可配置的,这对于以集群模式运行的 Event Bus 来说非常有用。 Event Bus 使用 TCP 连接发送和接收消息,因此可以通过 EventBusOptions 对TCP连接进行全面的配置。 由于 Event Bus 既可以用作客户端又可以用作服务端,因此这些配置近似于 NetClientOptionsNetServerOptions

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setSsl(true)
        .setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setClientAuth(ClientAuth.REQUIRED)
    );

Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());
  }
});

上边代码段描述了如何在Event Bus中使用SSL连接替换明文的TCP连接。

警告 :若要在集群模式下保证安全性,您 必须 将集群管理器配置成加密的或者加强安全规则。 参考集群管理器的文档获取更多细节。

Event Bus 的配置需要在集群的所有节点中保持一致。

EventBusOptions 还允许您指定 Event Bus 是否运行在集群模式下,以及它的端口和主机信息(译者注:host,这里指网络socket绑定的地址)。

在容器中使用时,您还可以配置公共主机和端口号:(译者注:setClusterPublicHost 和 setClusterPublicPort 的功能在原文档上描述得不清晰,但是API文档上有详细描述。 在某些容器、云环境等场景下,本节点监听的地址,和其他节点连接本节点时使用的地址,是不同的。这种情况下则可以利用上面两个配置区分监听地址和公开暴露的地址。 )

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setClusterPublicHost("whatever")
        .setClusterPublicPort(1234)
    );

Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());
  }
});

4 关键类简介

4.1 主要类的作用

1.EventBus、EventBusInternal、EventBusImpl:

EventBus、EventBusInternal这两个是接口,基本所有的方法都在EventBus里进行了定义,EventBusInternal只是定义了开始和关闭的内部调用方法。EventBusImpl是实现,是Vert.x事件总线的实现类,负责管理消息的发送和接受,维护事件监听器的注册和接触,以及处理消息的派发。以及它当然是异步的

2.HandlerRegistration,MessageConsumerImpl

HandlerRegistration是超类,MessageConsumerImpl是子类,他是消息消费者的实现类(相当于包含主题以及消费者),责维护消费者的订阅关系,将接收到的消息派发给消费者的处理器(Handler),以及处理消费者的取消订阅操作

3.DeliveryContextBase,InboundDeliveryContext,OutboundDeliveryContext

DeliveryContextBase是超类,用于管理消息的传递,其中包含拦截器,以及出信息出入的处理

InboundDeliveryContext是接受信息时的处理

OutboundDeliveryContext是接受信息时的处理

4.2 EventBus系列:

EventBus、EventBusInternal这两个是接口,基本所有的方法都在EventBus里进行了定义,EventBusInternal只是定义了开始和关闭的内部调用方法。EventBusImpl是实现,是Vert.x事件总线的实现类,负责管理消息的发送和接受,维护事件监听器的注册和接触,以及处理消息的派发。以及它当然是异步的

4.2.1 EventBus

public interface EventBus extends Measured {
  /**
   * 指定的地址和要发送的数据,只有一个接收者会收到
   *	options可以设置一些参数,比如超时,或者头信息等
   */
  @Fluent
  EventBus send(String address, @Nullable Object message);
  @Fluent
  EventBus send(String address, @Nullable Object message, DeliveryOptions options);

  /**
   * 这是响应信息,需要消费者通过调用reply方法进行响应
   */
  default <T> Future<Message<T>> request(String address, @Nullable Object message) {
    return request(address, message, new DeliveryOptions());
  }
  <T> Future<Message<T>> request(String address, @Nullable Object message, DeliveryOptions options);

  /**
   * 发布一个信息到指定地址,所有订阅了该地址的人都会收到信息
   *
   */
  @Fluent
  EventBus publish(String address, @Nullable Object message);
  @Fluent
  EventBus publish(String address, @Nullable Object message, DeliveryOptions options);

  /**
   * 订阅某个地址主题
   */
  <T> MessageConsumer<T> consumer(String address);

  /**
   * handler是消费信息的回调
   */
  <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

  /**
   * 本地订阅,不会进行集群传播
   */
  <T> MessageConsumer<T> localConsumer(String address);
  <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);

  /**
   * 创建消息发送者
   */
  <T> MessageProducer<T> sender(String address);
  <T> MessageProducer<T> sender(String address, DeliveryOptions options);

  /**
   * 创建消息发布者
   */
  <T> MessageProducer<T> publisher(String address);
  <T> MessageProducer<T> publisher(String address, DeliveryOptions options);

  /**
   * 注册消息解码器
   */
  @Fluent
  @GenIgnore(PERMITTED_TYPE)
  EventBus registerCodec(MessageCodec codec);

  /**
   * 取消编码器
   */
  @Fluent
  @GenIgnore(PERMITTED_TYPE)
  EventBus unregisterCodec(String name);

  /**
   * 指定注册默认编解码器
   */
  @Fluent
  @GenIgnore
  <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);

  /**
   * 解绑默认编码器
   */
  @Fluent
  @GenIgnore
  EventBus unregisterDefaultCodec(Class clazz);

  /**
   * 编码器选择器
   */
  @Fluent
  EventBus codecSelector(Function<Object, String> selector);

  /**
   * 拦截器,发送信息时调用
   */
  @Fluent
  <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> interceptor);

  /**
   * 移除拦截器
   */
  @Fluent
  <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> interceptor);

  /**
   * 接受信息时调用
   */
  @Fluent
  <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> interceptor);

  /**
   * 移除拦截器
   */
  @Fluent
  <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> interceptor);

4.2.2 EventBusInternal

只有开启和关闭

public interface EventBusInternal extends EventBus {

  /**
   * Start the event bus.
   */
  void start(Promise<Void> promise);

  /**
   * Close the event bus and release any resources held.
   */
  void close(Promise<Void> promise);
}

4.2.3 EventBusImpl

public class EventBusImpl implements EventBusInternal, MetricsProvider {

  //用与存储发送信息拦截器和接受信息拦截器
  private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors");
  private static final AtomicReferenceFieldUpdater<EventBusImpl, Handler[]> INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors");

  private volatile Handler<DeliveryContext>[] outboundInterceptors = new Handler[0];
  private volatile Handler<DeliveryContext>[] inboundInterceptors = new Handler[0];
  //存储所有handler(消费者)
  protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap<>();
  //解析器  
  protected final CodecManager codecManager = new CodecManager();
}

4.3 MessageConsumer系列

MessageConsumerImpl主要是继承HandlerRegistration和实现MessageConsumer

它就是消费者的实现以及管理订阅功能,针对于接收到的信息

MessageConsumer

public interface MessageConsumer<T> extends ReadStream<Message<T>> {

  //注册
  @Override
  MessageConsumer<T> handler(Handler<Message<T>> handler);

  // 暂停消息消费者,使其停止接收消息,但不注销它。
  @Override
  MessageConsumer<T> pause();

  // 恢复消息消费者,使其继续接收消息。
  @Override
  MessageConsumer<T> resume();
   
    //.....等等
}
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> { 
//是否只处理本地信息
  private final boolean localOnly;
  //信息回调
  private Handler<Message<T>> handler;
  //结束回调
  private Handler<Void> endHandler;
  //数据丢弃的回调
  private Handler<Message<T>> discardHandler;
  //最大缓冲消息数
  private int maxBufferedMessages = DEFAULT_MAX_BUFFERED_MESSAGES;
  //队列,用于缓存待处理消息
  private Queue<Message<T>> pending = new ArrayDeque<>(8);
  //现在需求信息的数量,控制发送
  private long demand = Long.MAX_VALUE;
  //结果
  private Promise<Void> result;
  //是否注册
  private boolean registered;
 MessageConsumerImpl(Vertx vertx, ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
  super(context, eventBus, address, false);
  this.vertx = vertx;
  this.context = context;
  this.eventBus = eventBus;
  this.address = address;
  this.localOnly = localOnly;
  this.result = context.promise();
 }
}

4.4 DeliveryContext系列

4.4.1 DeliveryContext

public interface DeliveryContext<T> {

  /**
   * 当前信息
   * @return  The message being delivered
   */
  Message<T> message();

  /**
   * 执行所有拦截器方法,以及执行execute方法
   * Call the next interceptor
   */
  void next();

  /**
   * 判断是发送还是发布
   * @return true if the message is being sent (point to point) or False if the message is being published
   */
  boolean send();

  /**
   * 返回传输的信息体
   * @return the value delivered by the message (before or after being processed by the codec)
   */
  Object body();
}

4.4.2 DeliveryContextBase

abstract class DeliveryContextBase<T> implements DeliveryContext<T> {

  public final MessageImpl<?, T> message;
  public final ContextInternal context;

  //拦截器
  private final Handler<DeliveryContext>[] interceptors;

  //目前执行拦截器的索引
  private int interceptorIdx;
  //是否正在调用拦截
  private boolean invoking;
  //是否需要继续调用下一个拦截器
  private boolean invokeNext;

  protected DeliveryContextBase(MessageImpl<?, T> message, Handler<DeliveryContext>[] interceptors, ContextInternal context) {
    this.message = message;
    this.interceptors = interceptors;
    this.context = context;
    this.interceptorIdx = 0;
  }
   //实现了如下方法
    
  public void next() {...}
  
  //定义了如下方法
  protected abstract void execute();
  

4.4.3 InboundDeliveryContext和OutboundDeliveryContext

这两个类主要实现的就是exceute方法,一个是发送信息时bus.sendOrPub(this);,一个时接受信息时调用HandlerRegistration.this.dispatch(message, ctx, handler);

就是发送信息出入时会调用到。

4.5 Message系列

4.5.1 Message

public interface Message<T> {

  /**
   * 地址
   */
  String address();

  /**
   * 消息头
   */
  MultiMap headers();

  /**
   * 消息体
   */
  @CacheReturn
  T body();

  /**
   * 获取回复消息的地址
   */
  @Nullable
  String replyAddress();

  /**
   * 是发送还是发布
   */
  boolean isSend();

  /**
   * 用于回复信息
   */
  default void reply(@Nullable Object message) {
    reply(message, new DeliveryOptions());
  }
  void reply(@Nullable Object message, DeliveryOptions options);

  /**
   * 用于回复信息并且反馈
   */
  default <R> Future<Message<R>> replyAndRequest(@Nullable Object message) {
    return replyAndRequest(message, new DeliveryOptions());
  }

    .....

}

4.5.2 MessageImpl

具体的实现

public class MessageImpl<U, V> implements Message<V> {

  //消息解析器
  protected MessageCodec<U, V> messageCodec;
  protected final EventBusImpl bus;
  //地址
  protected String address;
  //回复地址
  protected String replyAddress;
  //消息头
  protected MultiMap headers;
  //消息体未经解码的
  protected U sentBody;
  //解码的消息体
  protected V receivedBody;
  //标志位
  protected boolean send;
  //消息追踪
  protected Object trace;

  public MessageImpl(EventBusImpl bus) {
    this.bus = bus;
  }

  public MessageImpl(String address, MultiMap headers, U sentBody,
                     MessageCodec<U, V> messageCodec,
                     boolean send, EventBusImpl bus) {
    this.messageCodec = messageCodec;
    this.address = address;
    this.headers = headers;
    this.sentBody = sentBody;
    this.send = send;
    this.bus = bus;
  }
  protected MessageImpl(MessageImpl<U, V> other) {
    this.bus = other.bus;
    this.address = other.address;
    this.replyAddress = other.replyAddress;
    this.messageCodec = other.messageCodec;
    if (other.headers != null) {
      List<Map.Entry<String, String>> entries = other.headers.entries();
      this.headers = MultiMap.caseInsensitiveMultiMap();
      for (Map.Entry<String, String> entry: entries) {
        this.headers.add(entry.getKey(), entry.getValue());
      }
    }
    if (other.sentBody != null) {
      this.sentBody = other.sentBody;
        //对数据进行解码
      this.receivedBody = messageCodec.transform(other.sentBody);
    }
    this.send = other.send;
  }

4.5.3 MessageCodec系列

这边就介绍下CodecManager,具体解码器代码很少,但是实现很多,可以自行查看

主要介绍获取解码器的lookupCodec方法

public class CodecManager {

  // 各种解码器
  public static final MessageCodec<String, String> PING_MESSAGE_CODEC = new PingMessageCodec();
  public static final MessageCodec<String, String> NULL_MESSAGE_CODEC = new NullMessageCodec();
  public static final MessageCodec<String, String> STRING_MESSAGE_CODEC = new StringMessageCodec();
  public static final MessageCodec<Buffer, Buffer> BUFFER_MESSAGE_CODEC = new BufferMessageCodec();
  public static final MessageCodec<JsonObject, JsonObject> JSON_OBJECT_MESSAGE_CODEC = new JsonObjectMessageCodec();
  public static final MessageCodec<JsonArray, JsonArray> JSON_ARRAY_MESSAGE_CODEC = new JsonArrayMessageCodec();
  public static final MessageCodec<byte[], byte[]> BYTE_ARRAY_MESSAGE_CODEC = new ByteArrayMessageCodec();
  public static final MessageCodec<Integer, Integer> INT_MESSAGE_CODEC = new IntMessageCodec();
  public static final MessageCodec<Long, Long> LONG_MESSAGE_CODEC = new LongMessageCodec();
  public static final MessageCodec<Float, Float> FLOAT_MESSAGE_CODEC = new FloatMessageCodec();
  public static final MessageCodec<Double, Double> DOUBLE_MESSAGE_CODEC = new DoubleMessageCodec();
  public static final MessageCodec<Boolean, Boolean> BOOLEAN_MESSAGE_CODEC = new BooleanMessageCodec();
  public static final MessageCodec<Short, Short> SHORT_MESSAGE_CODEC = new ShortMessageCodec();
  public static final MessageCodec<Character, Character> CHAR_MESSAGE_CODEC = new CharMessageCodec();
  public static final MessageCodec<Byte, Byte> BYTE_MESSAGE_CODEC = new ByteMessageCodec();
  public static final MessageCodec<ReplyException, ReplyException> REPLY_EXCEPTION_MESSAGE_CODEC = new ReplyExceptionMessageCodec();
  private volatile Function<Object, String> codecSelector = o -> null;
  public void codecSelector(Function<Object, String> selector) {
    this.codecSelector = Objects.requireNonNull(selector);
  }
  public MessageCodec lookupCodec(Object body, String codecName, boolean local) {
    MessageCodec codec;
      //解码器名不为空的话首先是根据你传入进来的解码器名先进行获取
      //如果为空则根据body是否为空,如果不为空则根据body的类型进行获取相应的解码器
    
    if (codecName != null) {
      codec = getCodec(codecName);
    } else if (body == null) {
      codec = NULL_MESSAGE_CODEC;
    } else if (body instanceof String) {
      codec = STRING_MESSAGE_CODEC;
    } else if (body instanceof Buffer) {
      codec = BUFFER_MESSAGE_CODEC;
    } else if (body instanceof JsonObject) {
      codec = JSON_OBJECT_MESSAGE_CODEC;
    } else if (body instanceof JsonArray) {
      codec = JSON_ARRAY_MESSAGE_CODEC;
    } else if (body instanceof byte[]) {
      codec = BYTE_ARRAY_MESSAGE_CODEC;
    } else if (body instanceof Integer) {
      codec = INT_MESSAGE_CODEC;
    } else if (body instanceof Long) {
      codec = LONG_MESSAGE_CODEC;
    } else if (body instanceof Float) {
      codec = FLOAT_MESSAGE_CODEC;
    } else if (body instanceof Double) {
      codec = DOUBLE_MESSAGE_CODEC;
    } else if (body instanceof Boolean) {
      codec = BOOLEAN_MESSAGE_CODEC;
    } else if (body instanceof Short) {
      codec = SHORT_MESSAGE_CODEC;
    } else if (body instanceof Character) {
      codec = CHAR_MESSAGE_CODEC;
    } else if (body instanceof Byte) {
      codec = BYTE_MESSAGE_CODEC;
    } else if (body instanceof ReplyException) {
      codec = defaultCodecMap.get(body.getClass());
      if (codec == null) {
        codec = REPLY_EXCEPTION_MESSAGE_CODEC;
      }
    } else {
      //如果都不是则使用默认的解码器进行解码
      codec = defaultCodecMap.get(body.getClass());
     //如果没有则根据codecSelector传入进去body获取相应的name进行获取解码器,这个codecSelector默认返回的时null,但是有set方法,由你自己实现并传入。
      if (codec == null) {
        if ((codecName = codecSelector.apply(body)) != null) {
          codec = getCodec(codecName);
 			//是否是ClusterSerializable或其子类等等获取
        } else if (body instanceof ClusterSerializable && (local || acceptClusterSerializable(body.getClass().getName()))) {
          codec = clusterSerializableCodec;
        } else if (body instanceof Serializable && (local || acceptSerializable(body.getClass().getName()))) {
          codec = serializableCodec;
        }
      }
    }
    if (codec == null) {
      throw new IllegalArgumentException("No message codec for type: " + body.getClass());
    }
    return codec;
  }
}

5 Local模式EventBus源码解析

5.0 按照如下示例进行源码解析

    EventBus eb=vertx.eventBus();
    eb.consumer("foo").handler(msg -> {
      System.out.println(msg);
    });
    eb.send("foo", "Test");

5.1 consumer方法分析

绑定的时候会调用该方法,传入地址,接着创建MessageConsumerImpl

  @Override
  public <T> MessageConsumer<T> consumer(String address) {
    checkStarted();
    Objects.requireNonNull(address, "address");
    return new MessageConsumerImpl<>(vertx, vertx.getOrCreateContext(), this, address,  false);
  }

5.2 handler

handler

主要作用是注册handler

  public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
    if (h != null) {
      //保证只有一个handler注册
      synchronized (this) {
        handler = h;
        if (!registered) {
          registered = true;
          Promise<Void> p = result;
          Promise<Void> registration = context.promise();
          //调用父类注册方法 <1>
          register(null, localOnly, registration);
          //
          registration.future().onComplete(ar -> {
            if (ar.succeeded()) {
              p.tryComplete();
            } else {
              p.tryFail(ar.cause());
            }
          });
        }
      }
    } else {
      unregister();
    }
    return this;
  }

<1> register

HandlerRegistration类(MessageConsumerImpl的 父类)

/**
 * 注册加锁
 * @param repliedAddress 地址
 * @param localOnly 是否本地处理
 * @param promise 注册回调
 */
synchronized void register(String repliedAddress, boolean localOnly, Promise<Void> promise) {
  if (registered != null) {
    throw new IllegalStateException();
  }
  //将该MessageConsumer用户添加到bus <2>
  registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise);
  if (bus.metrics != null) {
    metric = bus.metrics.handlerRegistered(address, repliedAddress);
  }
}

<2> addRegistration

EventBusImpl

  protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, Promise<Void> promise) {
     // <3>
    HandlerHolder<T> holder = addLocalRegistration(address, registration, replyHandler, localOnly);
    //完成调用,执行promise.complete
    onLocalRegistration(holder, promise);
    return holder;
  }

<3> addLocalRegistration

EventBusImpl

//线程安全,根据地址来存储所有的订阅者
protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap<>();

private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
                                                  boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  ContextInternal context = registration.context;

  // <4>
  //创建HandlerHolder,主要作用就是汇总参数到一个类里,比如信息接受的回调handler以及是否回复处理器等参数
  HandlerHolder<T> holder = createHandlerHolder(registration, replyHandler, localOnly, context);

  // <5>
  //将新的holder添加到到的该地址的ConcurrentCyclicSequence里
  ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
  //handlerMap是根据地址来存储所有订阅者
  ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
    address,
    handlers,
    (old, prev) -> old.add(prev.first()));

  //如果是部署的话增加关闭回调
  if (context.isDeployment()) {
    context.addCloseHook(registration);
  }

  return holder;
}

<4> HandlerHolder

具体如下

主要作用就是汇总参数到一个类里,比如信息接受的回调handler以及是否回复处理器等参数

public class HandlerHolder<T> {

  public final ContextInternal context;
  //信息接受的回调就是前面handler(msg->{})传入进来的
  public final HandlerRegistration<T> handler;
  //是否回复处理器
  public final boolean replyHandler;
  //是否只在本地执行
  public final boolean localOnly;
  //移除标志
  private boolean removed;

  public HandlerHolder(HandlerRegistration<T> handler, boolean replyHandler, boolean localOnly, ContextInternal context) {
    this.context = context;
    this.handler = handler;
    this.replyHandler = replyHandler;
    this.localOnly = localOnly;
  }
}

<5> ConcurrentCyclicSequence

它主要作用是用来存储某个地址的所有订阅者。

public class ConcurrentCyclicSequence<T> implements Iterable<T>, Iterator<T> {

  private static final Object[] EMPTY_ARRAY = new Object[0];

  private final AtomicInteger pos;
  //存储某个地址的所有订阅者
  private final Object[] elements;
  public ConcurrentCyclicSequence<T> add(T element) {
    int len = elements.length;
    Object[] copy = Arrays.copyOf(elements, len + 1);
    copy[len] = element;
    return new ConcurrentCyclicSequence<>(pos.get(), copy);
  }
}

5.3 send

用于发送信息

EventBusImpl类

@Override
public EventBus send(String address, Object message) {
  return send(address, message, new DeliveryOptions());
}

/**
 *
 * @param address  地址
 * @param message  发送信息
 * @param options  消息传递参数,比如超时时间,消息头等
 * @return
 */
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
  //<6>
  MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName());
  //<7>
  sendOrPubInternal(msg, options, null, null);
  return this;
}

<6> createMessage

这里就是创建信息类,其中lookupCodec方法可以查看上面相关类的解析里

public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
  Objects.requireNonNull(address, "no null address accepted");
  //获取相应的信息解码器
  MessageCodec codec = codecManager.lookupCodec(body, codecName, true);
  // 创建相应的信息类
  @SuppressWarnings("unchecked")
  MessageImpl msg = new MessageImpl(address, headers, body, codec, send, this);
  return msg;
}

<-7>sendOrPubInternal

EventBusImpl

public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
                                  ReplyHandler<T> handler, Promise<Void> writePromise) {
  //判断是否开始
  checkStarted();
  //<8 newSendContext>  
  //<9 sendOrPubInternal>              
  sendOrPubInternal(newSendContext(message, options, handler, writePromise));
}

<8> newSendContext

创建了一个OutboundDeliveryContext(拦截器,具体的方法可以查看类详细解析里)

newSendContext,这里是发送所以创建的时发送Context

EventBusImpl类

  public <T> OutboundDeliveryContext<T> newSendContext(MessageImpl message, DeliveryOptions options,
                                               ReplyHandler<T> handler, Promise<Void> writePromise) {
    return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise);
  }

<9>sendOrPubInternal

EventBusImpl

  public <T> void sendOrPubInternal(OutboundDeliveryContext<T> senderCtx) {
    //判断是否开始
    checkStarted();
    senderCtx.bus = this;
    senderCtx.metrics = metrics;
    //就是调用刚刚的context执行next方法,主要作用就是执行拦截器以及发送方法
    //<10>
    senderCtx.next();
  }

**<10> senderCtx.next(); **

DeliveryContextBase

主要作用就是先执行拦截器,再执行实际的方法

@Override
public void next() {
  //判断现在是否正在执行
  if (invoking) {
    invokeNext = true;
  } else {
    //当前拦截器id是否小于拦截器数量
    while (interceptorIdx < interceptors.length) {
      Handler<DeliveryContext> interceptor = interceptors[interceptorIdx];
      invoking = true;
      interceptorIdx++;
      //判断当前执行线程是否与总线线程相同
      if (context.inThread()) {
        //是的话则直接执行
        context.dispatch(this, interceptor);
      } else {
        try {
          //如果线程不同,则直接执行拦截器方法
          interceptor.handle(this);
        } catch (Throwable t) {
          context.reportException(t);
        }
      }
      //设置false并检查是否继续调用下一个拦截器
      invoking = false;
      if (!invokeNext) {
        return;
      }
      invokeNext = false;
    }
    //所有拦截器执行完后将id设置为0
    interceptorIdx = 0;
    //调用execute方法,该方法是在子类里进行实现的
    //<11>
    execute();
  }
}

<11> execute()

OutboundDeliveryContext

  @Override
  protected void execute() {
    VertxTracer tracer = ctx.tracer();
    //确认是否有追踪器
    if (tracer != null) {
      //如果信息为为被追踪,则将标为信息起点
      if (message.trace == null) {
        src = true;
        BiConsumer<String, String> biConsumer = (String key, String val) -> message.headers().set(key, val);
        TracingPolicy tracingPolicy = options.getTracingPolicy();
        //没有指定则使用默认追踪策略
        if (tracingPolicy == null) {
          tracingPolicy = TracingPolicy.PROPAGATE;
        }
        //创建发送追踪请求
        message.trace = tracer.sendRequest(ctx, SpanKind.RPC, tracingPolicy, message, message.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
      } else {
        //如果消息存在就直接发送
        // Handle failure here
        tracer.sendResponse(ctx, null, message.trace, null, TagExtractor.empty());
      }
    }
    // 实际执行消息的发送
    //<12>
    bus.sendOrPub(this);
  }

<12> bus.sendOrPub(this)

EventBusImpl

  protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
    sendLocally(sendContext);
  }
  private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
    //<13> 将信息进行投递
    ReplyException failure = deliverMessageLocally(sendContext.message);
    if (failure != null) {
      // 如果发送失败,将失败异常传递给发送上下文,表示发送过程中出现错误
      sendContext.written(failure);
    } else {
      // 如果发送成功,将没有异常传递给发送上下文
      sendContext.written(null);
    }
  }

<13> deliverMessageLocally

EventBusImpl

这里有一个重点:

具体是发布还是发送主要就是这边的区别,是从重选择一个消费者还是循环所有订阅该地址的所有消费者

protected ReplyException deliverMessageLocally(MessageImpl msg) {
    //根据地址获取到订阅该地址的消费者集合
    ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
    //判断是否本地信息
    boolean messageLocal = isMessageLocal(msg);
    if (handlers != null) {
      //判断是发送信息还是发布信息 send还是publish
      if (msg.isSend()) {
        //选择其中一个具体的消费者
        HandlerHolder holder = nextHandler(handlers, messageLocal);
        //指标
        if (metrics != null) {
          metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0);
        }
        //调用消费者的recive方法
        if (holder != null) {
          //<14>   msg.copyBeforeReceive会生成一个新的messageImpl,并且在messageImpl的构造方法里进行解码
          holder.handler.receive(msg.copyBeforeReceive());
        } else {
          // RACY issue !!!!!
        }
      } else {
        //如果是发布信息则遍历所有的消费者
        // Publish
        if (metrics != null) {
          metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size());
        }
        for (HandlerHolder holder: handlers) {
          if (messageLocal || !holder.isLocalOnly()) {
            holder.handler.receive(msg.copyBeforeReceive());
          }
        }
      }
      return null;
    } else {
      if (metrics != null) {
        metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0);
      }
      return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
    }
  }

<14> holder.handler.receive

HandlerRegistration(MessageConsumerImpl的父类)

void receive(MessageImpl msg) {
  if (bus.metrics != null) {
    bus.metrics.scheduleMessage(metric, msg.isLocal());
  }
   //这里直接再context里面执行doRevceive方法
  context.executor().execute(() -> {
    // Need to check handler is still there - the handler might have been removed after the message were sent but
    // before it was received
    if (!doReceive(msg)) {
      discard(msg);
    }
  });
}
protected boolean doReceive(Message<T> message) {
  Handler<Message<T>> theHandler;
  //防止并发同时处理
  synchronized (this) {
    if (handler == null) {
      return false;
    }
    if (demand == 0L) {
      //如果需求量为0且队列消息量小于设置的缓存量,则数据进行添加进队列
      if (pending.size() < maxBufferedMessages) {
        pending.add(message);
        return true;
      } else {
        //否则则丢弃数据
        discard(message);
        if (discardHandler != null) {
          //并且执行丢弃逻辑
          discardHandler.handle(message);
        } else {
          log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer. address: " + address);
        }
      }
      return true;
    } else {
      //如果需求量还需要,队列的数量如果大于0则添加,并且获取队列里的数据
      if (pending.size() > 0) {
        pending.add(message);
        message = pending.poll();
      }
      //如果需求书不是最大值,则进行相减
      if (demand != Long.MAX_VALUE) {
        demand--;
      }
      theHandler = handler;
    }
  }
   //<15>
  deliver(theHandler, message);
  return true;
}

<15>deliver

private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
  // Handle the message outside the sync block
  // https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
  //<16> 具体执行(DuplicatedContext 其实相当于一个代理类,复制了一下context,实际执行的都是context)
  dispatch(theHandler, message, context.duplicate());
  //检查队列里是否还有任务,有任务就进行执行
  checkNextTick();
}

<16>dispatch

DeliveryContextBase

因为要接受信息了,所以要创建InboundDeliveryContext

void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
  InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
  deliveryCtx.dispatch();
}
void dispatch() {
  this.interceptorIdx = 0;
  if (invoking) {
    this.invokeNext = true;
  } else {
   //其中next是
    next();
  }
}

dispatchnext()因为都是调用的父类,所以和前面说的Next方法一样的就是执行拦截器,唯一不一样的就是执行execute的时候这次是执行的InboundDeliveryContextexecute方法

void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
  InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
  deliveryCtx.dispatch();
}

InboundDeliveryContext

   protected void execute() {
      ContextInternal ctx = InboundDeliveryContext.super.context;
      Object m = metric;
      //这边都是和监听相关
      VertxTracer tracer = ctx.tracer();
      if (bus.metrics != null) {
        bus.metrics.messageDelivered(m, message.isLocal());
      }
      if (tracer != null && !src) {
        message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
       //这个是实际消费信息执行,message就是信息,handler就是订阅的消费者,其实就是我们前面传入的handler类(eb.consumer("foo").handler),接着这里会调用它
       //<17>
        HandlerRegistration.this.dispatch(message, ctx, handler);
        Object trace = message.trace;
        if (message.replyAddress == null && trace != null) {
          tracer.sendResponse(this.context, null, trace, null, TagExtractor.empty());
        }
      } else {
        HandlerRegistration.this.dispatch(message, ctx, handler);
      }
    }

<17>HandlerRegistration.this.dispatch(message, ctx, handler)

调用的是HandlerRegistration的,但是实际实现的是MessageConsumerImpl

MessageConsumerImpl

protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
  if (handler == null) {
    throw new NullPointerException();
  }
  context.dispatch(msg, handler);
}
default <E> void dispatch(E event, Handler<E> handler) {
  ContextInternal prev = beginDispatch();
  try {
     //到此为止就是直接调用的eb.consumer("foo").handler传入的handler进行执行
    handler.handle(event);
  } catch (Throwable t) {
    reportException(t);
  } finally {
    endDispatch(prev);
  }
}

5.4 reply

这里再讲下reply

这个方法是做回应的

Message接口

  default void reply(@Nullable Object message) {
    reply(message, new DeliveryOptions());
  }
  void reply(@Nullable Object message, DeliveryOptions options);

实际实现是他的实现类

@Override
public void reply(Object message, DeliveryOptions options) {
  if (replyAddress != null) {
     // <18>
    MessageImpl reply = createReply(message, options);
     //<19>
    bus.sendReply(reply, options, null);
  }
}

<18> createReply根据回复地址创建信息

protected MessageImpl createReply(Object message, DeliveryOptions options) {
  MessageImpl reply = bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName());
  reply.trace = trace;
  return reply;
}

<19> sendReply这里调用的也是sendOrPubInternal方法,跟前面的send是一样的

protected <T> void sendReply(MessageImpl replyMessage, DeliveryOptions options, ReplyHandler<T> replyHandler) {
  if (replyMessage.address() == null) {
    throw new IllegalStateException("address not specified");
  } else {
    sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null));
  }
}

5.5 总结

步骤是比较简单的

  1. 先创建MessageConsumer

  2. 接着注册实际消费逻辑的handler,存储实际是再EventBusImpl

  3. 接着通过send进行发送信息

问题点:

sendpulish的区别就是前者选其中一个进行发送,后者是遍历该地址的所有订阅者进行发送

reply,就是在收到信息后进行调用进行信息回复,代码和send的是一样的文章来源地址https://www.toymoban.com/news/detail-698452.html

到了这里,关于Vert.x 源码解析(4.x)——Local EvnentBus入门使用和源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Kettle Local引擎使用记录(一)(基于Kettle web版数据集成开源工具data-integration源码)

    Kettle Local引擎使用记录(一)(基于Kettle web版数据集成开源工具data-integration源码)

    在前面对 data-integration 做了一些简单了解,从部署到应用,今天尝试把后端运行作业代码拎出来,去真正运行一下,只有实操之后才会有更深刻的认识,有些看着简单的功能,实操过程中会遇到很多问题,这个时候你的想法也会发生改变,所以很多时候为什么开发人员痛恨做

    2024年02月02日
    浏览(11)
  • python爬虫原理及源码解析(入门)

    python爬虫原理及源码解析(入门)

    ​ 如果将互联网比作一张大的蜘蛛网,数据便是存放在蜘蛛网的各个节点,而爬虫就是一只小蜘蛛,沿着网络抓取自己的猎物(数据)爬虫指的是:向网站发起请求,获取资源后分析并提取有用数据的程序。 爬虫能通过网址获得 网络 中的数据、然后根据目标 解析数据 、存储

    2023年04月24日
    浏览(11)
  • Java源码-servlet源码解析

    Servlet是运行在Web服务器上的Java组件,用于处理客户端请求并生成响应。下面将介绍Servlet的源码解析。 Servlet接口源码解析 Servlet接口是所有Servlet类必须实现的接口。该接口定义了Servlet生命周期方法和服务方法。 init方法初始化Servlet,service方法处理请求并生成响应。destroy方

    2024年02月13日
    浏览(11)
  • java源码-List源码解析

    Java中的List是一个接口,它定义了一组操作列表的方法。List接口的常见子类包括ArrayList、LinkedList和Vector等。 以下是Java中List接口及其常见方法的源码解析: 1. List接口定义 ``` public interface ListE extends CollectionE {     // 返回列表中元素的数量     int size();          // 返回列表

    2024年02月15日
    浏览(5)
  • 2023年的深度学习入门指南(19) - LLaMA 2源码解析

    2023年的深度学习入门指南(19) - LLaMA 2源码解析

    上一节我们学习了LLaMA 2的补全和聊天两种API的使用方法。本节我们来看看LLaMA 2的源码。 上一节我们讲了LLaMA 2的编程方法。我们来复习一下: 我们先来看看text_completion函数的参数是什么意思,该函数的原型为: 我们来看下这些参数的含义: prompts:这是一个字符串列表,每

    2024年02月15日
    浏览(9)
  • import-local执行流程解析

    import-local执行流程解析

    当本地和全局同时存在两个脚手架命令时,使用 import-local 可以优先加载本地脚手架命令 以上述代码为例:执行 jinhui 命令时实际执行的应该是 node C:Program Filesnodejsjinhui-clicli.js 所以将调试程序定位到全局下的 cli 文件,进入 import-local 源码 Nodejs 模块路径解析流程 Nqde.js 项

    2024年02月15日
    浏览(6)
  • 【Java 】从源码全面解析Java 线程池

    【Java 】从源码全面解析Java 线程池

    线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。 作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我

    2024年02月03日
    浏览(10)
  • Vert.x学习笔记-Vert.x的基本处理单元Verticle

    Vert.x学习笔记-Vert.x的基本处理单元Verticle

    Verticle是Vert.x的基本处理单元,Vert.x应用程序中存在着处理各种事件的处理单元,比如 负责HTTP API响应请求的处理单元 、 负责数据库存取的处理单元 、 负责向第三方发送请求的处理单元 。Verticle就是对这些功能单元的封装,Verticle可被部署,有自己的生命周期,Verticle是Ve

    2024年02月05日
    浏览(11)
  • 【JAVA】CyclicBarrier源码解析以及示例

    【JAVA】CyclicBarrier源码解析以及示例

    前言 在多线程编程中,同步工具是确保线程之间协同工作的重要组成部分。 CyclicBarrier (循环屏障)是Java中的一个强大的同步工具,它允许一组线程在达到某个共同点之前互相等待。 在本文中,我们将深入探讨 CyclicBarrier 的源码实现以及提供一些示例,以帮助您更好地理解

    2024年02月04日
    浏览(9)
  • 从源码全面解析 Java SPI 的来龙去脉

    从源码全面解析 Java SPI 的来龙去脉

    👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦

    2024年02月12日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包