[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十)

这篇具有很好参考价值的文章主要介绍了[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

asyncSendMessage方法用来处理来自producer发送的消息。

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

1.asyncSendMessage异步处理单条消息

  1. 调用preSend方法创建响应的命令对象, 包括自动创建topic的逻辑, 随后创建响应头对象。
  2. 随后创建MessageExtBrokerInner对象, 从请求中获取消息的属性并设置到对象属性中, 消息体, topic。
  3. 判断如果是重试或者死信消息, 调用handleRetryAndDLQ方法处理重试和死信队列消息, 如果已重试次数大于最大重试次数, 那么替换topic为死信队列topic, 消息会被发送至死信队列。
  4. 判断如果是事务准备消息, 并且不会拒绝处理事务消息, 调用asyncPrepareMessage方法以异步的方式处理存储事务准备消息。
  5. 如果是普通消息, 调用asyncPutMessage方法处理, 存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中, 处理器可以处理下一个请求而不是等待结, 当结果完成时, 以异步方式通知客户端。
  6. 调用handlePutMessageResultFuture方法处理消息存储的处理结果。

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理单条消息
 */
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
    /*
     * 1 创建响应的命令对象,包括自动创建topic的逻辑
     */
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    //获取响应头
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    if (response.getCode() != -1) {
        return CompletableFuture.completedFuture(response);
    }
    //获取消息体
    final byte[] body = request.getBody();
    //获取队列id
    int queueIdInt = requestHeader.getQueueId();
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果队列id小于0,则随机选择一个写队列索引作为id
    if (queueIdInt < 0) {
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }
    //构建消息对象,保存着要存入commitLog的数据
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    //设置topic
    msgInner.setTopic(requestHeader.getTopic());
    //设置队列id
    msgInner.setQueueId(queueIdInt);
    /*
     * 2 处理重试和死信队列消息,将会对死信消息替换为死信topic
     */
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return CompletableFuture.completedFuture(response);
    }
    /*
     * 设置一系列属性
     */
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    //设置到properties属性中
    MessageAccessor.setProperties(msgInner, origProps);
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    //WAIT属性表示 消息发送时是否等消息存储完成后再返回
    if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
        // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
        // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
        //不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。
        String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
        //将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用
        origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
    } else {
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    }

    CompletableFuture<PutMessageResult> putMessageResult = null;
    /*
     * 处理事务消息逻辑
     */
    //TRAN_MSG属性值为true,表示为事务消息
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    //处理事务消息
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        //判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        //调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        //不是事务消息,那么调用asyncPutMessage方法处理,存储消息
        //以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    //处理消息存放的结果
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

2.preSend准备响应命令对象

  • 创建响应的命令对象, 包括topic的校验和自动创建topic的逻辑。
  1. 创建RemotingCommand对象, 设置唯一请求id。
  2. 校验 当前时间是否小于broker的起始服务时间, 如果小于, 返回SYSTEM_ERROR, 表示不可用提供服务。
  3. msgCheck方法进行一系列的校验, 包括topic的自动创建逻辑。

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

/**
 * SendMessageProcessor的方法
 * <p>
 * 准备响应数据
 */
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                SendMessageRequestHeader requestHeader) {
    //创建响应命令对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    //设置唯一id为请求id
    response.setOpaque(request.getOpaque());
    //添加扩展字段属性"MSG_REGION"、"TRACE_ON"
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("Receive SendMessage request command {}", request);
    //获取配置的broker的处理请求的起始服务时间,默认为0
    final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    //如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务
    if (this.brokerController.getMessageStore().now() < startTimestamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
        return response;
    }
    //设置code为-1
    response.setCode(-1);
    /*
     * 消息校验,包括自动创建topic的逻辑
     */
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    return response;
}
2.1 msgCheck检查并自动创建topic
  1. 校验如果当前broker有没有写的权限, 如果没有, 则broker会返回一个NO_PERMISSION异常。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  2. 校验topic不能为空, 必须属于合法字符, 且长度不超过127字符。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  3. 校验如果当前topic是不为允许使用的系统topic, 抛出异常。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  4. 从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

    1. 如果不存在该topic信息, 调用createTopicInSendMessageMethod创建topic, 失败了, 判断是否重试, 如果是重试创建topic, 如果创建还是失败的话, 返回TOPIC_NOT_EXIST异常信息。
  5. 如果找到或者创建了topic, 校验queutId 不能大于等于该broker的读或写的最大queueId。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

/**
 * AbstractSendMessageProcessor的方法
 * <p>
 * 消息校验,包括自动创建topic的逻辑
 */
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
                                   final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    //如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
            && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending message is forbidden");
        return response;
    }
    //校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符
    if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
        return response;
    }
    //校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX
    if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
        return response;
    }
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果不存在该topic信息
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        /*
         * 尝试创建普通topic
         */
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
        /*
         * 尝试创建重试topic
         */
        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                                requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                                topicSysFlag);
            }
        }

        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }
    //校验queutId 不能大于等于该broker的读或写的最大数量
    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
                queueIdInt,
                topicConfig.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
2.1.1 createTopicInSendMessageMethod创建普通topic

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  • 创建一个新的topic
  1. 获取锁防止并发创建相同的topic, 获取锁之后, 再次尝试从topicConfigTable获取topic信息, 如果获取到了, 直接返回, 如果没有, 创建topic。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  2. 获取默认topic的信息, 作为创建新topic的模板, 默认topic实际上就是TBW102, 其有8个读写队列, 权限为读写并且可继承,即7。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  3. 如果默认topic就是TBW102, 并且如果broker配置不支持自动创建topic, 即autoCreateTopicEnable为false, 设置权限为可读写, 不可继承。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  4. 如果默认topic配置的权限包括可继承, 从默认topic继承属性创建新topic。创建一个TopicConfig, 选择默认队列数量与默认topic写队列中数量小的值作为新topic的读写队列数量, 默认为4, 设置权限, 去除可继承权限。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  5. 如果topic不是null, 则表示创建了topic, 将新的topic信息存入topicConfigTable缓存中, 生成下一个数据版本。标识位置为true, 调用persist方法将topic配置持久化到配置文件 {user.home}/store/config/topics.json中。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  6. 解锁, 判断如果创建了新topic, 调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
    [RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  /**
   * TopicConfigManager的方法
   * <p>
   * 创建普通topic,并持久化至配置文件 {user.home}/store/config/topics.json中
   *
   * @param topic                       待创建topic
   * @param defaultTopic                默认topic,用于作为模板创建新topic
   * @param remoteAddress               远程地址
   * @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
*                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
   * @param topicSysFlag                topic标识
   * @return topic配置
   */
  public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
                                                    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
      TopicConfig topicConfig = null;
      boolean createNew = false;

      try {
          //需要加锁防止并发创建相同的topic
          if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
              try {
                  //再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
                  topicConfig = this.topicConfigTable.get(topic);
                  if (topicConfig != null)
                      return topicConfig;
                  //获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7
                  TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                  if (defaultTopicConfig != null) {
                   //如果默认topic就是TBW102
                      if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                       //如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6
                          if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                              defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                          }
                      }
                      //如果默认topic配置的权限包括可继承,那么从默认topic继承属性
                      if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                       //创建topic配置
                          topicConfig = new TopicConfig(topic);
                          //选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4
                          int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                          if (queueNums < 0) {
                              queueNums = 0;
                          }

                          topicConfig.setReadQueueNums(queueNums);
                          topicConfig.setWriteQueueNums(queueNums);
                          //权限
                          int perm = defaultTopicConfig.getPerm();
                          //去掉可继承权限
                          perm &= ~PermName.PERM_INHERIT;
                          topicConfig.setPerm(perm);
                          topicConfig.setTopicSysFlag(topicSysFlag);
                          topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                      } else {
                          log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                                  defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                      }
                  } else {
                      log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
                              defaultTopic, remoteAddress);
                  }
                  //如果topic不为null,说明创建了新topic
                  if (topicConfig != null) {
                      log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
                              defaultTopic, topicConfig, remoteAddress);
                      //将新的topic信息存入topicConfigTable缓存中
                      this.topicConfigTable.put(topic, topicConfig);
                      //生成下一个数据版本
                      this.dataVersion.nextVersion();
                      //标识位置为true
                      createNew = true;
                      /*
                       * 将topic配置持久化到配置文件 {user.home}/store/config/topics.json中
                       */
                      this.persist();
                  }
              } finally {
               //解锁
                  this.topicConfigTableLock.unlock();
              }
          }
      } catch (InterruptedException e) {
          log.error("createTopicInSendMessageMethod exception", e);
      }
      //如果创建了新topic,那么马上向nameServer注册当前broker的新配置路由信息
      if (createNew) {
          this.brokerController.registerBrokerAll(false, true, true);
      }

      return topicConfig;
  }
2.1.2 createTopicInSendMessageBackMethod创建重试topc
  • 自动创建重试topic, 源码和创建普通topic类似,. 不同的是重试topic不需要模板topic, 默认读写队列都是1, 权限为读写。
/**
 * TopicConfigManager的方法
 * <p>
 * 创建重试topic,并持久化至配置文件 {user.home}/store/config/topics.json中
 *
 * @param topic                       待创建topic
 * @param perm                        权限
 * @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
 *                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
 * @param topicSysFlag                topic标识
 * @return topic配置
 */
public TopicConfig createTopicInSendMessageBackMethod(
        final String topic,
        final int clientDefaultTopicQueueNums,
        final int perm,
        final int topicSysFlag) {
    //尝试获取topic
    TopicConfig topicConfig = this.topicConfigTable.get(topic);
    //如果存在则直接返回
    if (topicConfig != null)
        return topicConfig;

    boolean createNew = false;

    try {
        //需要加锁防止并发创建相同的topic
        if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                //再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;
                //创建topic
                topicConfig = new TopicConfig(topic);
                //重试topic的默认读写队列数量为1
                topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
                topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
                //重试topic的默认权限为读写
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);

                log.info("create new topic {}", topicConfig);
                this.topicConfigTable.put(topic, topicConfig);
                createNew = true;
                //获取下一个版本
                this.dataVersion.nextVersion();
                //持久化broker信息
                this.persist();
            } finally {
                //解锁
                this.topicConfigTableLock.unlock();
            }
        }
    } catch (InterruptedException e) {
        log.error("createTopicInSendMessageBackMethod exception", e);
    }

    if (createNew) {
        //注册broker信息
        this.brokerController.registerBrokerAll(false, true, true);
    }

    return topicConfig;
}
2.1.3 autoCreateTopicEnable自动创建topic的问题

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java
[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

  • Producer发送消息源码, 客户端发送信息前, 会选择一个topic所在的broker地址, 如果topic不存在, 那么会默认topic的路由信息中的一个broker发送。

  • 信息发送到broker后, 会发送没有指定的topic并且如果broker的autoCreateTopicEnable为true的话, 会走createTopicInSendMessageMethod方法, 会自动创建topic方法的最后马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。

  • 生产者客户端会定时每30s从nameServer更新路由数据, 如果此时有其他的producer的存在, 并且刚好从nameServer获取到了这个新的topic的路由信息, 假设其他producer也需要向该topic发送信息, 由于发现topic路由信息已存在, 并且只存在于刚才那一个broker中, 此时这些producer都会将该topic的消息发送到这一个broker中来。

  • 接下来所有的Producer都只会向这一个Broker发送消息, 其他broker不再有机会创建新topic。本想topic在所有broker创建, 但是只有一个broker有topic信息, 违背了RocketMQ集群实现高可用的本事。

  • 所以RocketMQ官方建议生产环境下将broker的autoCreateTopicEnable设置为false, 闭自动创建topic, 改为手动在每个broker上创建。

3.handlePutMessageResultFuture处理消息存放结果

存放消息后, 调用handlePutMessageResult方法。

[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十),RocketMq 基础,RocketMq 源码分析,java-rocketmq,rocketmq,java

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理消息存放结果
 *
 * @param putMessageResult   存放结果
 * @param response           响应对象
 * @param request            请求对象
 * @param msgInner           内部消息对象
 * @param responseHeader     响应头
 * @param sendMessageContext 发送消息上下文
 * @param ctx                连接上下文
 * @param queueIdInt         queueId
 * @return
 */
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,
                                                                        RemotingCommand response,
                                                                        RemotingCommand request,
                                                                        MessageExt msgInner,
                                                                        SendMessageResponseHeader responseHeader,
                                                                        SendMessageContext sendMessageContext,
                                                                        ChannelHandlerContext ctx,
                                                                        int queueIdInt) {
    //阻塞,当从存放消息完毕时,执行后续的操作,即执行handlePutMessageResult方法
    return putMessageResult.thenApply((r) ->
            handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt)
    );
}

handlePutMessageResult处理存放消息的结果, 响应写回给客户端。文章来源地址https://www.toymoban.com/news/detail-517504.html

/**
 * SendMessageProcessor的方法
 * <p>
 * 处理存放消息的结果
 *
 * @param putMessageResult   存放结果
 * @param response           响应对象
 * @param request            请求对象
 * @param msg                内部消息对象
 * @param responseHeader     响应头
 * @param sendMessageContext 发送消息上下文
 * @param ctx                连接上下文
 * @param queueIdInt         queueId
 * @return
 */
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                               RemotingCommand request, MessageExt msg,
                                               SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                               int queueIdInt) {
    //结果为null,那么直接返回系统异常
    if (putMessageResult == null) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("store putMessage return null");
        return response;
    }
    boolean sendOK = false;
    //解析存放消息状态码,转换为对应的响应码
    switch (putMessageResult.getPutMessageStatus()) {
        // Success
        case PUT_OK:
            sendOK = true;
            response.setCode(ResponseCode.SUCCESS);
            break;
        case FLUSH_DISK_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
            sendOK = true;
            break;
        case FLUSH_SLAVE_TIMEOUT:
            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
            sendOK = true;
            break;
        case SLAVE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            sendOK = true;
            break;

        // Failed
        case CREATE_MAPEDFILE_FAILED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("create mapped file failed, server is busy or broken.");
            break;
        case MESSAGE_ILLEGAL:
        case PROPERTIES_SIZE_EXCEEDED:
            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
            response.setRemark(
                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
            break;
        case SERVICE_NOT_AVAILABLE:
            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
            response.setRemark(
                    "service not available now. It may be caused by one of the following reasons: " +
                            "the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");
            break;
        case OS_PAGECACHE_BUSY:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
            break;
        case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
            break;
        case UNKNOWN_ERROR:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR");
            break;
        default:
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("UNKNOWN_ERROR DEFAULT");
            break;
    }

    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    //如果发送成功
    if (sendOK) {
        //如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topic
        if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
            //增加统计计数
            this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
        }
        //增加统计计数
        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());

        response.setRemark(null);
        //设置响应头中的migId,实际上就是broker生成的offsetMsgId属性
        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        //消息队列Id
        responseHeader.setQueueId(queueIdInt);
        //消息逻辑偏移量
        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
        //如果不是单向请求,那么将响应写会客户端
        doResponse(ctx, request, response);
        //如果有发送消息的钩子,那么执行
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(responseHeader.getMsgId());
            sendMessageContext.setQueueId(responseHeader.getQueueId());
            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());

            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
            int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        return null;
    } else {
        //如果有发送消息的钩子,那么执行
        if (hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);

            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
    }
    return response;
}

到了这里,关于[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(34)
  • RocketMQ(三) broker启动

    RocketMQ源码版本V5.0.0,可兼容之前的版本,因为整理资料的时候,之前的版本,和V5版本有所出入,核心流程基本还是大同小异的。 此前已经总结了NameServer的启动流程源码:现在来了解Broker的启动流程。在RocketMQ启动的时候,首先要启动NameServer,然后再启动Broker。 Broker模块主

    2024年02月08日
    浏览(36)
  • RocketMQ broker启动失败

    版本:4.9.3 现象:NameServer启动没问题,Broker无法启动。 查看日志,没有broker方面的报错,应该是整个服务都没起来。 于是开始网上搜索解决方案: 方案1: 删除store文件夹。 删除之后问题依旧 方案2: 更改broker.conf,加上IP等配置。 发现这些配置已经有了,于是更改无效。

    2024年02月09日
    浏览(33)
  • RocketMQ和Kafka的区别,以及如何保证消息不丢失和重复消费

    性能(单台) 语言 多语言支持客户端 优缺点 RocketMQ 十万级 java java 模型简单、接口易用,在阿里有大规模应用 文档少,支持的语言少 Kafka 百万级 服务端scala,客户端java 主流语言均支持 天生分布式、性能最好,常用于大数据领域 运维难度大,对zookeeper强依赖,多副本机制

    2024年01月16日
    浏览(32)
  • RocketMq 源码解读-Broker(1)

    这边源码分支为release-4.2.0 处理拉取信息请求 通过broker模块下面的processor目录下, 我们找到 org.apache.rocketmq.broker.processor.PullMessageProcessor 这个类的 processRequest 方法.这边就不省略代码了, 只是在重要的部分加上注释, 供参阅 其中, 有一行代码为 找到getMessage的实现类 package org.

    2024年02月09日
    浏览(30)
  • 【深入浅出RocketMQ原理及实战】「消息队列架构分析」帮你梳理RocketMQ或Kafka的选择理由以及二者PK

    前提背景 大家都知道,市面上有许多开源的MQ,例如,RocketMQ、Kafka、RabbitMQ等等,现在Pulsar也开始发光,今天我们谈谈笔者最常用的RocketMQ和Kafka,想必大家早就知道二者之间的特点以及区别,但是在实际场景中,二者的选取有可能会范迷惑,那么今天笔者就带领大家分析一下

    2024年02月19日
    浏览(39)
  • 38.RocketMQ之Broker的主从架构

    本文摘自:Broker的主从架构是怎么实现的 Broker是RocketMQ的核心模块,负责接收并存储消息,为了保证整个MQ的高可用,一般情况都会将Broker部署成集群,集群中的每一部分都由Master和Slave组成,那么Master与Slave之间的数据是如何保证同步一致的呢? 是Master主动把数据推送给Slav

    2024年02月13日
    浏览(38)
  • [RocketMQ] Broker启动流程源码解析 (二)

    1.Brocker介绍 Broker主要负责消息的存储、投递和查询以及服务高可用保证, Broker包含了以下几个重要子模块。 Remoting Module: 整个Broker的实体, 负责处理来自Client端的请求 Client Manager: 负责管理客户端 Producer和Consumer, 维护Consumer的Topic订阅信息 Store Service: 提供方便简单的API接口处理

    2024年02月09日
    浏览(40)
  • RocketMQ的学习历程(5)----broker内部设计

    在首个学习历程中,我们已经了解了,RokctMQ简单的工作流程。 如果想要更深的理解RokcetMQ消息处理的流程,broker内部流程的理解是必要的, 这里我们就了解一下Broker内部的工作流程。 当Consumer消费消息时,需要先读取ConsumeQueue得到offset,再通过offset找到CommitLog对应的消息内

    2024年02月07日
    浏览(27)
  • 消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别

    目录 一、什么是消息中间件 二、消息中间件的组成 1、Broker 2、Producer 3、Consumer 4、Topic 5、Queue 6、Message 三、消息中间件通信模式 1、点对点(kafka不支持这种模式)  2、发布/订阅  四、消息中间件的作用 1、系统解耦 2、提高系统响应时间 3、为大数据处理架构提供服务 五、

    2024年01月25日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包