[RocketMQ] Producer发送消息的总体流程 (七)

这篇具有很好参考价值的文章主要介绍了[RocketMQ] Producer发送消息的总体流程 (七)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
  • 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
  • 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。

DefaultMQProducer提供了很多send方法的重载:

[RocketMQ] Producer发送消息的总体流程 (七)
[RocketMQ] Producer发送消息的总体流程 (七)

1.send源码入口

1.1 同步消息

[RocketMQ] Producer发送消息的总体流程 (七)

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用defaultMQProducerImpl#send发送消息
    return this.defaultMQProducerImpl.send(msg);
}

defaultMQProducerImpl#send发送消息。
[RocketMQ] Producer发送消息的总体流程 (七)

调用另一个send(), 超时时间为3s。

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用另一个send方法,设置超时时间参数,默认3000ms
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg     消息
 * @param timeout 超时时间,毫秒值
 */
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。

1.2 单向消息

单向消息使用sendOneway发送。

[RocketMQ] Producer发送消息的总体流程 (七)

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
 {
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#sendOneway发送消息
    this.defaultMQProducerImpl.sendOneway(msg);
}

defaultMQProducerImpl#sendOneway。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
    try {
        //调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
        this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
    } catch (MQBrokerException e) {
        throw new MQClientException("unknown exception", e);
    }
}

最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。

1.3 异步消息

异步消息使用带有callback函数的send方法发送。

[RocketMQ] Producer发送消息的总体流程 (七)

public void send(Message msg, SendCallback sendCallback) throws MQClientException,
 RemotingException, InterruptedException {
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
    this.defaultMQProducerImpl.send(msg, sendCallback);
}

该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。

public void send(Message msg, SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
    //该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
    send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
    //调用起始时间
    final long beginStartTime = System.currentTimeMillis();
    //获取异步发送执行器线程池
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        /*
         * 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
         */
        executor.submit(new Runnable() {
            @Override
            public void run() {
                /*
                 * 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
                 */
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        //调用sendDefaultImpl方法执行发送操作
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        //抛出异常,执行回调函数onException方法
                        sendCallback.onException(e);
                    }
                } else {
                    //超时,执行回调函数onException方法
                    sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }

}

方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。

发送之前计算超时时间, 如果超时则执行回调函数onException()。

2.sendDefaultImpl发送消息实现

该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。

  1. makeSureStateOK方法, 确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
  2. 检查消息的合法性。
  3. 调用tryToFindTopicPublishInfo方法, 尝试查找消息的一个topic路由, 进行发送消息。
  4. 计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。
  5. 调用selectOneMessageQueue方法, 选择一个队列MessageQueue, 支持失败故障转移。
  6. 调用sendKernelImpl方法发送消息, 同步、异步、单向发送消息都是这个方法实现的。
  7. 调用updateFaultItem方法, 更新本地错误表缓存数据, 用于延迟时间的故障转移的功能。
  8. 根据发送模式的不同, 如果是异步或者单向发送则直接返回, 如果是同步的话, 如果开启了retryAnotherBrokerWhenNotStoreOK, 那么如果返回值不返回SEND_OK状态, 则重新执行。
  9. 此过程中, 如果抛出RemotingException、MQClientException、以及MQBrokerException异常, 那么会重试, 如果抛出InterruptedException, 或者超时则不会重试。

[RocketMQ] Producer发送消息的总体流程 (七)
[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg               方法
 * @param communicationMode 通信模式
 * @param sendCallback      回调方法
 * @param timeout           超时时间
 */
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    /*
     * 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
     */
    this.makeSureStateOK();
    /*
     * 2 校验消息的合法性
     */
    Validators.checkMessage(msg, this.defaultMQProducer);
    //生成本次调用id
    final long invokeID = random.nextLong();
    //开始时间戳
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    //结束时间戳
    long endTimestamp = beginTimestampFirst;
    /*
     * 3 尝试查找消息的一个topic路由,用以发送消息
     */
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    //找到有效的topic信息
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        /*
         * 4 计算发送消息的总次数
         * 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
         */
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        //记录每一次重试时候发送消息目标Broker名字的数组
        String[] brokersSent = new String[timesTotal];
        /*
         * 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
         */
        for (; times < timesTotal; times++) {
            //上次使用过的broker,可以为空,表示第一次选择
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            /*
             * 5 选择一个消息队列MessageQueue
             */
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                //设置brokerName
                brokersSent[times] = mq.getBrokerName();
                try {
                    //调用的开始时间
                    beginTimestampPrev = System.currentTimeMillis();
                    //如果还有可调用次数,那么
                    if (times > 0) {
                        //在重新发送期间用名称空间重置topic
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //如果已经超时了,那么直接结束循环,不再发送
                    //即超时的时候,即使还剩下重试次数,也不会再继续重试
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    /*
                     * 6 异步、同步、单向发送消息
                     */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    //方法调用结束时间戳
                    endTimestamp = System.currentTimeMillis();
                    /*
                     * 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
                     */
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    /*
                     * 8 根据发送模式执行不同的处理
                     */
                    switch (communicationMode) {
                        //异步和单向模式直接返回null
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            //如果发送成功,则返回
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    //RemotingException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {
                    //MQClientException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {
                    //MQBrokerException异常
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    //如果返回的状态码属于一下几种,则支持重试:
                    //ResponseCode.TOPIC_NOT_EXIST,
                    //ResponseCode.SERVICE_NOT_AVAILABLE,
                    //ResponseCode.SYSTEM_ERROR,
                    //ResponseCode.NO_PERMISSION,
                    //ResponseCode.NO_BUYER_ID,
                    //ResponseCode.NOT_IN_CURRENT_UNIT

                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                        continue;
                    } else {
                        //其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
                        if (sendResult != null) {
                            return sendResult;
                        }

                        throw e;
                    }
                } catch (InterruptedException e) {
                    //InterruptedException异常,不会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        /*
         * 抛出异常的操作
         */
        if (sendResult != null) {
            return sendResult;
        }

        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
2.1 makeSureStateOK确定生产者服务状态

确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 */
private void makeSureStateOK() throws MQClientException {
    //服务状态不是RUNNING,那么抛出MQClientException异常。
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
    }
}
2.2 checkMessage校验消息的合法性
  1. 如果msg消息为null, 抛出异常。
  2. 校验topic, 如果topic为空, 或者长度大于127字符, 或者topic中有非法字符则抛出异常, 如果当前topic是不为允许使用的系统topic, 抛出异常。
  3. 校验消息体, 如果消息体为null, 或者为空数组, 或者消息字节数组长度大于4M, 抛出异常。
    [RocketMQ] Producer发送消息的总体流程 (七)
/**
 * Validators的方法
 */
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    //如果消息为null,抛出异常
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    /*
     * 校验topic
     */
    //如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常
    Validators.checkTopic(msg.getTopic());
    //如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // body
    //如果消息体为null,那么抛出异常
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    //如果消息体为空数组,那么抛出异常
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    //如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

public static void checkTopic(String topic) throws MQClientException {
    //如果topic为空,那么抛出异常
    if (UtilAll.isBlank(topic)) {
        throw new MQClientException("The specified topic is blank", null);
    }
    //如果topic长度大于127个字符,那么抛出异常
    if (topic.length() > TOPIC_MAX_LENGTH) {
        throw new MQClientException(
                String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
    }
    //如果topic字符串包含非法字符,那么抛出异常
    if (isTopicOrGroupIllegal(topic)) {
        throw new MQClientException(String.format(
                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                "^[%|a-zA-Z0-9_-]+$"), null);
    }
}
2.3 tryToFindTopicPublishInfo查找topic的发布信息

该方法用于查找指定topic的发布信息TopicPublishInfo。

  1. 首先在本地缓存topicPublishInfoTable获取。
  2. updateTopicRouteInfoFromNameServer()获取, 从nameServer同步此topic的路由配置信息。
  3. 从nameServer同步topic数据。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 * <p>
 * 查找指定topic的推送信息
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //尝试直接从producer的topicPublishInfoTable中获取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //如果没有获取到有效信息,
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //那么立即创建一个TopicPublishInfo
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //立即从nameServer同步此topic的路由配置信息,并且更新本地缓存
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        //再次获取topicPublishInfo
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //如果找到的路由信息是可用的,直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  • 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。

TopicPublishInfo: topic的发布信息

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * 是否是顺序消息
 */
private boolean orderTopic = false;
/**
 * 是否包含路由信息
 */
private boolean haveTopicRouterInfo = false;
/**
 * topic的消息队列集合
 */
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
 * 当前线程线程的消息队列的下标,循环选择消息队列使用+1
 */
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/**
 * topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性
 */
private TopicRouteData topicRouteData;
2.4 计算发送次数timesTotal
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + 
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。

2.5 selectOneMessageQueue选择消息队列

方法内部调用mqFaultStrategy#selectOneMessageQueue方法:

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 *
 * 选择一个消息队列
 * @param tpInfo topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //调用mqFaultStrategy#selectOneMessageQueue方法
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:

  1. 首先判断是否开启了发送延迟故障转移机制, 即判断sendLatencyFaultEnable属性是否为true, 默认为false不开启。
    1. 如果开启的话, 首先仍然是遍历消息队列, 按照轮询的方式选取一个消息队列, 当消息队列可用时, 选择消息队列的工作就结束, 否则循环选择其他队列。如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中, 或者当前时间戳大于该broker下一次开始可用的时间戳, 则表示无障碍。
    2. 如果没有选出无故障mq, 那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker, 随后判断如果写队列数大于0, 那么选择该Broker。然后遍历消息队列, 采用取模的方式获取一个队列, 重置其brokerName和queueId, 进行消息发送。
    3. 如果上面的步骤抛出了异常, 那么遍历消息队列, 采用取模的方式获取一个队列。
  2. 如果没有开启延迟故障转移机制, 那么遍历消息队列, 采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列, 即不会再次选择上次发送失败的broker。如果没有找到一个不同broker的mq, 那么退回到轮询的方式。

selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。

  1. 延迟时间的故障转移, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
  2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
/**
 * MQFaultStrategy的方法
 * <p>
 * 选择一个消息队列,支持故障延迟转移
 *
 * @param tpInfo         topic信息
 * @param lastBrokerName 上次使用过的broker
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    /*
     * 判断是否开启了发送延迟故障转移机制,默认false不打开
     * 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。
     */
    if (this.sendLatencyFaultEnable) {
        try {
            //当前线程线程的消息队列的下标,循环选择消息队列使用+1
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                //取模
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                //获取该消息队列
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //如果当前消息队列是可用的,即无故障,那么直接返回该mq
                //如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //如果写队列数大于0,那么选择该broker
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    //重置其brokerName,queueId,进行消息发送
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                //如果写队列数小于0,那么移除该broker
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
        return tpInfo.selectOneMessageQueue();
    }
    //如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式
    //获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。

无参的表示选择mq无限制:

/**
 * TopicPublishInfo的方法
 * <p>
 * 轮询的选择一个mq
 */
public MessageQueue selectOneMessageQueue() {
    //获取下一个index
    int index = this.sendWhichQueue.incrementAndGet();
    //取模计算索引
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    //获取该索引的mq
    return this.messageQueueList.get(pos);
}

有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。

/**
 * TopicPublishInfo的方法
 *
 * @param lastBrokerName 上一次发送失败的brokerName
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //如果lastBrokerName为null,即第一次发送,那么轮询选择一个
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //轮询选择一个mq
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //没有选出来,那么轮询选择一个
        return selectOneMessageQueue();
    }
}
2.6 sendKernelImpl发送消息

选择了队列后, 调用sendKernelImpl发送消息

  1. 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Master broker地址。如果找不到, tryToFindTopicPublishInfo从nameServer远程拉取配置, 并更新本地缓存, 再次尝试获取Master broker地址。
  2. 调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
  3. 如果不是批量消息, 那么设置唯一的uniqId。
  4. 如果不是批量消息, 且消息体大于4K, 那么压缩消息。
  5. 如果存在CheckForbiddenHook, 则执行checkForbidden钩子方法。如果存在SendMessageHook, 则执行sendMessageBefore钩子方法。
  6. 设置请求头信息SendMessageRequestHeader, 请求头包含各种基本属性, producerGroup, topic, queueId, 并且将消息重试次数和最大重试次数存入请求头中。
  7. 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。
  8. 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。
  9. finally中对消息进行恢复。

[RocketMQ] Producer发送消息的总体流程 (七)

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 * 发送消息
 *
 * @param msg               消息
 * @param mq                mq
 * @param communicationMode 发送模式
 * @param sendCallback      发送回调
 * @param topicPublishInfo  topic信息
 * @param timeout           超时时间
 * @return 发送结果
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //开始时间
    long beginStartTime = System.currentTimeMillis();
    /*
     * 1 根据brokerName从brokerAddrTable中查找broker地址
     */
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    //如果本地找不到 broker 的地址
    if (null == brokerAddr) {
        /*
         * 2 从nameServer远程拉取配置,并更新本地缓存
         * 该方法此前就学习过了
         */
        tryToFindTopicPublishInfo(mq.getTopic());
        //再次获取地址
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        /*
         * 3 vip通道判断
         */
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            /*
             * 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId
             * uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息
             */
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }
            /*
             * 设置nameSpace为实例Id
             */
            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }
            //消息标识符
            int sysFlag = 0;
            //消息压缩标识
            boolean msgBodyCompressed = false;
            /*
             * 5 尝试压缩消息
             */
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }
            //事务消息标志,prepare消息
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
            /*
             * 6 如果存在CheckForbiddenHook,则执行checkForbidden方法
             * 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)
             */
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
            /*
             * 7 如果存在SendMessageHook,则执行sendMessageBefore方法
             */
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
            /*
             * 8 设置请求头信息
             */
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            //针对重试消息的处理
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                //获取消息重新消费次数属性值
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    //将重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
                //获取消息的最大重试次数属性值
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    //将最大重新消费次数设置到请求头中,并且清除该属性
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
            /*
             * 9 根据不同的发送模式,发送消息
             */
            SendResult sendResult = null;
            switch (communicationMode) {
                /*
                 * 异步发送模式
                 */
                case ASYNC:
                    /*
                     * 首先克隆并还原消息
                     *
                     * 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?
                     *
                     * 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,
                     * 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,
                     * 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据
                     */
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    //如果开启了消息压缩
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        //克隆一个message
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        //恢复原来的消息体
                        msg.setBody(prevBody);
                    }
                    //如果topic整合了namespace
                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        //还原topic
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送异步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                    break;
                /*
                 * 单向、同步发送模式
                 */
                case ONEWAY:
                case SYNC:
                    /*
                     * 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
                     */
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    /*
                     * 10 发送单向、同步消息
                     */
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                    break;
                default:
                    assert false;
                    break;
            }
            /*
             * 9 如果存在SendMessageHook,则执行sendMessageAfter方法
             */
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
            //返回执行结果
            return sendResult;

            //如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            /*
             * 对消息进行恢复
             * 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看
             * 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩
             */
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2.6.1 findBrokerAddressInPublish查找broker地址

根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * MQClientInstance的方法
 */
public String findBrokerAddressInPublish(final String brokerName) {
    //查询brokerAddrTable缓存的数据
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    //返回Mater节点的地址
    if (map != null && !map.isEmpty()) {
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}
2.6.2 brokerVIPChannel判断vip通道

调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * MixAll的方法
 */
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
    //如果开启了vip通道
    if (isChange) {
        int split = brokerAddr.lastIndexOf(":");
        String ip = brokerAddr.substring(0, split);
        String port = brokerAddr.substring(split + 1);
        //重新拼接brokerAddr,其中port - 2
        String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);
        return brokerAddrNew;
    } else {
        //如果没有开启vip通道,那么返回原地址
        return brokerAddr;
    }
}

消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。

2.6.3 setUniqID生成uniqId

设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。

uniqId表示客户端生成的唯一一条消息。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * MessageClientIDSetter的方法
 */
public static void setUniqID(final Message msg) {
    //如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}
2.6.4 tryToCompressMessage压缩消息

消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 */
private boolean tryToCompressMessage(final Message msg) {
    //如果是批量消息,那么不进行压缩
    if (msg instanceof MessageBatch) {
        //batch dose not support compressing right now
        return false;
    }
    byte[] body = msg.getBody();
    if (body != null) {
        //如果消息长度大于4K
        if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
            try {
                //进行压缩,使用的JDK自带的压缩类
                byte[] data = UtilAll.compress(body, zipCompressLevel);
                if (data != null) {
                    //重新设置到body中
                    msg.setBody(data);
                    return true;
                }
            } catch (IOException e) {
                log.error("tryToCompressMessage exception", e);
                log.warn(msg.toString());
            }
        }
    }

    return false;
}
2.7 updateFaultItem更新故障表

发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。

故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * DefaultMQProducerImpl的方法
 * @param brokerName brokerName
 * @param currentLatency 当前延迟
 * @param isolation 是否使用默认隔离
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //调用MQFaultStrategy#updateFaultItem方法
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。

/**
 * MQFaultStrategy的方法
 *
 * @param brokerName     brokerName
 * @param currentLatency 当前延迟
 * @param isolation      是否使用默认隔离时间
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //如果开启了故障转移,即sendLatencyFaultEnable为true,默认false
    if (this.sendLatencyFaultEnable) {
        //根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration
        //如果isolation为true,则使用默认隔离时间30000,即30s
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新故障记录表
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
2.7.1 computeNotAvailableDuration计算隔离时间

根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。

[RocketMQ] Producer发送消息的总体流程 (七)

//延迟等级
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

/**
 * MQFaultStrategy的方法
 *
 * @param currentLatency 当前延迟
 * @return 故障延迟的时间
 */
private long computeNotAvailableDuration(final long currentLatency) {
    //倒叙遍历latencyMax
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        //选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:

[RocketMQ] Producer发送消息的总体流程 (七)

2.7.2 updateFaultItem更新故障表

该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。

下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。

[RocketMQ] Producer发送消息的总体流程 (七)

/**
 * LatencyFaultToleranceImpl的方法
 *
 * @param name                 brokerName
 * @param currentLatency       当前延迟
 * @param notAvailableDuration 隔离时间(不可用时间)
 */
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    //获取该broker此前的故障记录数据
    FaultItem old = this.faultItemTable.get(name);
    //如果此前没有数据,那么设置一个新对象肌凝乳
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        //设置当前延迟
        faultItem.setCurrentLatency(currentLatency);
        //设置下一次可用时间点
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        //已有故障记录,更新
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        //已有故障记录,更新
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

3.总结

  1. 生产者消息重试: RocketMQ的消费者消息重试和生产者消息重投。

  2. 生产者故障转移: 通过sendLatencyFaultEnable属性配置是否开启, 目的是为了保证每次发送消息成功, 是一种高可用手段。

    1. 延迟时间的故障转移, 需要sendLatencyFaultEnable为true, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
    2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
  3. Vip通道: VIP通道用于隔离读写操作, 消费者拉取消息只能请求普通通道, 生产者可用选择vip或者普通通道。

  4. 故障转移表: RocketMQ的Producer生产者故障转移依赖于故障转移表实现, 是一个HasmMap。消息发送结束之后, 会根据本次发送消息的延迟时间currentLatency, 计算该broker的隔离时间duration, 即为broker的下一次可用时间点。然后更新故障记录表。故障转移表的key为brokerName, value为未来该broker可用时间。文章来源地址https://www.toymoban.com/news/detail-510140.html

到了这里,关于[RocketMQ] Producer发送消息的总体流程 (七)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ发送消息失败排查

    错误信息: 错误截图: 查看结果: 说明:发现对应的订阅组已经离线(查看对应的项目MQ地址和配置都是正确的),然后从服务日志中也看不出更多的问题 说明:调整服务日志级别到info,通过详细的日志信息定位发送失败的原因 日志截图: 说明:日志不断打印 closeChanne

    2024年02月04日
    浏览(36)
  • RocketMQ发送消息

    目录 一.消费模式​编辑 二.发送消息 1.普通消息 同步消息(***)  异步消息(***) 单向消息(*) 日志服务的编写思路 2.延迟消息(***) 延迟等级  3.批量消息 4.顺序消息(*) 三.Tag过滤 订阅关系的一致性 ①订阅一个Topic且订阅一个Tag ②订阅一个Topic且订阅多个Tag ③订阅多个Topic且订阅多

    2024年02月11日
    浏览(37)
  • RocketMQ如何安全的批量发送消息❓

    优点: 批量发送消息可以提高rocketmq的生产者性能和吞吐量。 使用场景: 发送大量小型消息时; 需要降低消息发送延迟时; 需要提高生产者性能时; 注意事项: 消息列表的大小不能超过broker设置的最大消息大小; 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参

    2024年02月03日
    浏览(41)
  • rocketMQ-console 发送消息

    rocketMQ-console是一款非常使用的rocketMQ扩展工具 工具代码仓 mirrors / apache / rocketmq-externals · GitCode 安装详细教程 ​​​​​​rocketMQ学习笔记二:RocketMQ-Console安装、使用详解_麦田里的码农-CSDN博客_rocketmq-consoled 直接来到工具页面 ,右上角可以切换语言 发送消息流程 1.点击 最

    2024年02月14日
    浏览(35)
  • 13.RocketMQ之消息的存储与发送

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。 消息生成者发送消息 Broker收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 Broker消息给对应的消费者,然后等待消费者返回ACK 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消

    2024年02月11日
    浏览(44)
  • SpringBoot集成RocketMQ实现三种消息发送方式

    目录 一、pom文件引入依赖 二、application.yml文件添加内容 三、创建producer生产者 四、创建Consumer消费者(创建两个消费者,所属一个Topic) 五、启动项目测试 RocketMQ 支持3 种消息发送方式: 同步 (sync)、异步(async)、单向(oneway)。 同步 :发送者向 MQ 执行发送消息API 时

    2024年02月13日
    浏览(46)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(42)
  • RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?

    更多RocketMQ内容,见专栏:https://blog.csdn.net/saintmm/category_11280399.html 消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。 在项目中存在发送方与消费方相互“扯皮”的情况: 发送方说消息已经发送成功,而消费方说没有消费到。 这时我们就希

    2024年01月17日
    浏览(49)
  • RocketMQ教程-(5)-功能特性-消息发送重试和流控机制

    本文为您介绍 Apache RocketMQ 的消息发送重试机制和消息流控机制。 消息发送重试 Apache RocketM Q的消息发送重试机制主要为您解答如下问题: 部分节点异常是否影响消息发送? 请求重试是否会阻塞业务调用? 请求重试会带来什么不足? 消息流控 Apache RocketMQ 的流控机制主要为

    2024年02月15日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包