RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?

这篇具有很好参考价值的文章主要介绍了RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前言

更多RocketMQ内容,见专栏:https://blog.csdn.net/saintmm/category_11280399.html

二、消息轨迹

消息轨迹简单来说就是日志,其把消息的生产、存储、消费等所有的访问和操作日志。

1、消息轨迹的引入目的

在项目中存在发送方与消费方相互“扯皮”的情况:

  • 发送方说消息已经发送成功,而消费方说没有消费到。
  • 这时我们就希望能记录一条消息的流转轨迹,即:消息是由哪个IP发送的?什么时候发送的?是被哪个消费者消费的?

2、如何使用消息轨迹

1> 修改Broker服务端配置,设置 traceTopicEnable=true

  • 表示在Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队

    列个数为1。所有的msgTrace信息默认都存储在这个topic中。

2> Producer中开启消息轨迹;

  •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
    
    • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。
  •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
    
    • String类型的入参customizedTraceTopic,表示用于记录消息轨迹的topic,不设置默认为RMQ_SYS_TRACE_TOPIC

3> Consuemr中开启消息轨迹;

  •   public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
    
    • boolean类型的入参enableMsgTrace设置为true,表示启用消息轨迹追踪,默认为false。

如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

1)使用案例

1> broker的配置文件(broker.conf)中增加如下配置,然后重启Broker:

traceTopicEnable = true

2> Producer:

public class TraceProducer {
    public static void main(String[] args) throws Exception {
        // 第二个参数TRUE,表示开启MsgTrace
        DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setMaxMessageSize(1024 * 1024 * 10);
        producer.start();

        Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
        SendResult send = producer.send(msg);

        System.out.println("sendResult: " + send);

        // 关闭生产者
        producer.shutdown();
        System.out.println("已经停机");
    }
}

3> Consumer:

public class TraceConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("test-topic-trace", "*");
        consumer.setConsumeTimeout(20L);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("Consumer start。。。。。。");

    }
}

2)消息轨迹内容

消息轨迹内容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存储时间、处理消息的客户端IP、存储服务器IP、发送/消费耗时、消息轨迹状态、跟踪类型。

在RocketMQ-Console中的消息轨迹内容如下:
rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

3) RocketMQ-Console中查看消息轨迹

在MessageTrace大分类下有两种方式可以查看消息轨迹,一种是根据 原消息Topic + MessageKey、另一种是根据 原消息Topic + MessageID;
rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

所以建议如果启用了消息轨迹,在消息发送时尽量为消息指定Key属性,以便在RocketMQ-Console中对消息进行高性能的查询。

1> 根据Message Key查询:
rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

2> 根据Message ID查询:
rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

3、消息轨迹实现原理

1)消息轨迹数据结构

1> 消息轨迹主体内容采用TraceContext类存储;

public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;
  1. traceType:跟踪类型,可选值为Pub(消息发送)、SubBefore(消息拉取到客户端,在执行业务定义的消费逻辑之前)、SubAfter(消费后)。

  2. timeStamp:当前时间戳。

  3. regionId:Broker所在的区域ID,取自BrokerConfig#regionId()。

  4. groupName:组名称,traceType为Pub时表示生产者组的名称,traceType为subBefore或subAfter时表示消费组名称。

  5. requestId:在traceType为subBefore、subAfter时使用,消

    费端的请求ID。

  6. contextCode:消费状态码,可选值为SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。

2> 针对消息信息采用TraceBean类维护;

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
}
  1. topic:消息主题
  2. msgId:消息唯一ID
  3. offsetMsgId:消息偏移量ID,该ID中包含了Broker的IP以及偏移量
  4. tags:消息标志
  5. keys:消息索引key,根据该key可快速检索消息
  6. storeHost:跟踪类型为Pub时存储该消息的Broker服务器IP,跟踪类型为subBefore、subAfter时存储消费者IP
  7. bodyLength:消息体的长度
  8. msgType:消息的类型,可选值为Normal_Msg(普通消息)、Trans_Msg_Half(预提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延迟消息)

2)轨迹消息存储

RocketMQ选择将消息轨迹数据当作一条消息,存储在Broker服务器中。

RocketMQ提供了两种方式来定义消息轨迹存储的topic:

  1. 系统默认topic:如果Broker的traceTopicEnable配置项设为true,表示在该Broker上创建名为RMQ_SYS_TRACE_TOPIC的topic,队列个数为1,默认该值为false。
  2. 自定义topic:在创建消息生产者或消息消费者时,可以通过参数自定义用于记录消息轨迹的topic名称;
    • 注意:RokcetMQ控制台(rocketmq-console)中只支持配置一个消息轨迹topic,建议使用系统默认的topic。

另外:为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议单独使用一个Broker用于开启消息轨迹跟踪。消息轨迹数据只会发送到这一台Broker服务器上,不影响原业务Broker的负载压力。

4、如何采集消息轨迹数据

MQ的核心操作是消息发送和消息存储,数据载体为消息。消息轨迹主要是记录消息何时发送到哪台Broker?发送耗时是多少?在什么时候被哪个消费者消费?消费耗时是多少?…

1)消息发送

在消息发送前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息发送钩子函数(SendMessageHook)实现,并且为了不明显增加消息发送的时间延迟,使用异步的方式记录消息轨迹。

1> 实例化Producer

在案例中我们知道在实例化DefaultMQProducer时,需要将入参enableMsgTrace设置为true,才能开启消息轨迹。当enableMsgTrace为true时,看DefaultMQProducer的构造函数:

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
                         boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

其中:

  • SendMessageTraceHookImpl:消息发送的钩子函数,用于跟踪消息轨迹;
    • 对应的消息消费钩子函数为:ConsumeMessageTraceHookImpl
  • AsyncTraceDispatcher:消息轨迹异步转发器。用于在消息生产时,异步将消息轨迹保存到特定topic(默认为RMQ_SYS_TRACE_TOPIC
2> Producer发送消息

在DefaultMqProducerImple#sendKernelImpl方法中,会首先判断是否有发送消息钩子函数(SendMessageHook);如果有:

  • 在发送消息之前调用钩子函数SendMessageHook#sendMessageBefore()方法,将消息轨迹数据先存储在调用上下文中。
  • 在发送消息之后调用钩子函数SendMessageHook#sendMessageAfter()方法,使用AsyncTraceDispatcher异步将消息轨迹数据发送到消息服务器(Broker)上。

rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

无论是DefaultMQProducerImpl中的executeSendMessageHookBefore()方法还是executeSendMessageHookAfter()方法,内部都是调用所有SendMessageHook的相应before()、after()方法。

public void executeSendMessageHookBefore(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageBefore(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookBefore", e);
            }
        }
    }
}

public void executeSendMessageHookAfter(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageAfter(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookAfter", e);
            }
        }
    }
}
<1> sendMessageBefore()
@Override
public void sendMessageBefore(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();
    traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
    tuxeContext.getTraceBeans().add(traceBean);
}

方法逻辑很简单:只是在消息发送之前先收集消息的topic、tag、key、存储Broker的IP地址、消息体的长度等基础信息,并将消息轨迹数据先存储在调用上下文中。

<2> sendMessageAfter()
@Override
public void sendMessageAfter(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
        || context.getMqTraceContext() == null) {
        return;
    }
    if (context.getSendResult() == null) {
        return;
    }

    if (context.getSendResult().getRegionId() == null
        || !context.getSendResult().isTraceOn()) {
        // if switch is false,skip it
        return;
    }

    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
    tuxeContext.setCostTime(costTime);
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
        tuxeContext.setSuccess(true);
    } else {
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());
    traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);
}

消息发送到Broker返回之后会调用到sendMessageAfter()方法。

  1. 进行一些校验:

    • 如果消息发送上下为空 或 发送消息结果为空,则直接返回,不记录消息轨迹数据。
  2. 进一步维护消息轨迹数据;

    • 从MqTraceContext中获取跟踪的TraceBean,虽然TraceContext中将TraceBean设计成List结构,但在消息发送时,这里的数据永远只有一条,即使是批量发送。

    • 设置costTime(消息发送耗时)、success(是否发送成功)、regionId(发送到Broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime。

    • 注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。

      traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
      
  3. 关键点:消息轨迹的异步发送;

    • 将消息轨迹添加到AsyncTraceDispatcher中的阻塞队列traceContextQueue中,以供后续异步发送消息轨迹使用;

      rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

    • 在实例化Producer时,如果开启了消息轨迹,会实例化AsyncTraceDispatcher;并且在启动Producer时也会启动AsyncTraceDispatcher

      • 最终目的是从阻塞队列traceContextQueue中找到待异步发送的轨迹消息,然后发送到相应Broker中。

      rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

<3> 消息轨迹异步发送

进一步看AsyncTraceDispatcher#start()方法:

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

其中启动一个后台线程,线程的具体逻辑体现在AsyncRunnable中。

class AsyncRunnable implements Runnable {
        private boolean stopped;

        @Override
        public void run() {
            while (!stopped) {
                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue — traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    traceExecutor.submit(request);
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }

        }
    }
  • 从待发送队列traceContextQueue中不断获取消息轨迹的数据,并将其异步发送到消息服务器。
  • 为了提高消息的发送效率引入批量机制,即:一次从队列中获取一批消息
    • 然后封装成AsyncAppenderRequest任务并提交到线程池中异步执行;
    • 真正的发送消息轨迹数据的逻辑被封装在AsyncTraceDispatcher的内部类AsyncAppenderRequest#run()方法中。
class AsyncAppenderRequest implements Runnable {
    List<TraceContext> contextList;

    public AsyncAppenderRequest(final List<TraceContext> contextList) {
        if (contextList != null) {
            this.contextList = contextList;
        } else {
            this.contextList = new ArrayList<TraceContext>(1);
        }
    }

    @Override
    public void run() {
        sendTraceData(contextList);
    }

    public void sendTraceData(List<TraceContext> contextList) {
        Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
        for (TraceContext context : contextList) {
            if (context.getTraceBeans().isEmpty()) {
                continue;
            }
            // Topic value corresponding to original message entity content
            String topic = context.getTraceBeans().get(0).getTopic();
            String regionId = context.getRegionId();
            // Use  original message entity's topic as key
            String key = topic;
            if (!StringUtils.isBlank(regionId)) {
                key = key + TraceConstants.CONTENT_SPLITOR + regionId;
            }
            List<TraceTransferBean> transBeanList = transBeanMap.get(key);
            if (transBeanList == null) {
                transBeanList = new ArrayList<TraceTransferBean>();
                transBeanMap.put(key, transBeanList);
            }
            TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
            transBeanList.add(traceData);
        }
        for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
            String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
            String dataTopic = entry.getKey();
            String regionId = null;
            if (key.length > 1) {
                dataTopic = key[0];
                regionId = key[1];
            }
            flushData(entry.getValue(), dataTopic, regionId);
        }
    }
  • 按照消息轨迹的存储协议对消息轨迹内容进行组装、编码;
  • 按照topic分批调用flushData()方法将消息发送到Broker中,完成消息轨迹数据的存储。

2)消息消费

同样,在消息消费前后RocketMQ会将本次调用的信息进行采集。RocketMQ通过提供消息消费钩子函数(ConsumeMessageHook)实现,并且为了不明显增加消息消费的时间延迟,使用异步的方式记录消息轨迹。

消息消费和消息发送是一样的机制,这里就不冗余介绍了,贴几个代码截图也就一目了然了。

  • 实例化消费者

    rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

  • 启动消费者

    rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

  • 以并行消费消息为例:

    rocketmq 显示message花费的时间,精通消息队列MQ,rocketmq,spring cloud

    public void executeHookBefore(final ConsumeMessageContext context) {
            if (!this.consumeMessageHookList.isEmpty()) {
                for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                    try {
                        hook.consumeMessageBefore(context);
                    } catch (Throwable e) {
                    }
                }
            }
        }
    
        public void executeHookAfter(final ConsumeMessageContext context) {
            if (!this.consumeMessageHookList.isEmpty()) {
                for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                    try {
                        hook.consumeMessageAfter(context);
                    } catch (Throwable e) {
                    }
                }
            }
        }
    

三、总结

消息轨迹其实就是记录消息从发送 到 存储 再到 消费,整个消息生命周期中的一些关键信息,比如:谁发送的、发送耗时多久、消息保存在哪了、谁消费了、消费耗时多久。

在RocketMQ中的实现方式也很简单,在消息发送/消费前后基于钩子函数,做before()、after()逻辑,进而记录消息轨迹信息。

特别注意:storeTime并不是真实的消息存储时间,而是一个估算值,取自:客户端发送消息耗时的一半。

消息轨迹功能涉及到的关键类:文章来源地址https://www.toymoban.com/news/detail-795973.html

  • AsyncTraceDispatcher:负责异步发送消息轨迹到Broker。
  • ConsumeMessageHook:消费消息钩子函数
  • SendMessageHook:生产消息钩子函数
  • TraceContext、TraceBean:两者一起用于表述消息轨迹

到了这里,关于RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 发送批量消息、过滤消息和事务消息

    前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。 消息的生产者 消息的消费者 消息分割 如果

    2023年04月21日
    浏览(52)
  • RocketMQ发送消息失败排查

    错误信息: 错误截图: 查看结果: 说明:发现对应的订阅组已经离线(查看对应的项目MQ地址和配置都是正确的),然后从服务日志中也看不出更多的问题 说明:调整服务日志级别到info,通过详细的日志信息定位发送失败的原因 日志截图: 说明:日志不断打印 closeChanne

    2024年02月04日
    浏览(38)
  • RocketMQ发送消息超时异常

    说明:在使用RocketMQ发送消息时,出现下面这个异常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解决:修改RocketMQ中broke.conf配置,添加下面这两行配置,重启服务后再试就可以了; 启动时,注意使用下面的命令,带上配置文件

    2024年02月13日
    浏览(73)
  • RocketMQ发送消息

    目录 一.消费模式​编辑 二.发送消息 1.普通消息 同步消息(***)  异步消息(***) 单向消息(*) 日志服务的编写思路 2.延迟消息(***) 延迟等级  3.批量消息 4.顺序消息(*) 三.Tag过滤 订阅关系的一致性 ①订阅一个Topic且订阅一个Tag ②订阅一个Topic且订阅多个Tag ③订阅多个Topic且订阅多

    2024年02月11日
    浏览(38)
  • rocketMQ-console 发送消息

    rocketMQ-console是一款非常使用的rocketMQ扩展工具 工具代码仓 mirrors / apache / rocketmq-externals · GitCode 安装详细教程 ​​​​​​rocketMQ学习笔记二:RocketMQ-Console安装、使用详解_麦田里的码农-CSDN博客_rocketmq-consoled 直接来到工具页面 ,右上角可以切换语言 发送消息流程 1.点击 最

    2024年02月14日
    浏览(37)
  • 13.RocketMQ之消息的存储与发送

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。 消息生成者发送消息 Broker收到消息,将消息进行持久化,在存储中新增一条记录 返回ACK给生产者 Broker消息给对应的消费者,然后等待消费者返回ACK 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消

    2024年02月11日
    浏览(46)
  • [RocketMQ] Producer发送消息的总体流程 (七)

    单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服

    2024年02月11日
    浏览(46)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(45)
  • RocketMQ教程-(5)-功能特性-消息发送重试和流控机制

    本文为您介绍 Apache RocketMQ 的消息发送重试机制和消息流控机制。 消息发送重试 Apache RocketM Q的消息发送重试机制主要为您解答如下问题: 部分节点异常是否影响消息发送? 请求重试是否会阻塞业务调用? 请求重试会带来什么不足? 消息流控 Apache RocketMQ 的流控机制主要为

    2024年02月15日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包