RocketMQ系列5——4.X版本延迟消息

这篇具有很好参考价值的文章主要介绍了RocketMQ系列5——4.X版本延迟消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        延迟消息是在业务场景中比较常用的功能,可以作为延迟队列。比如订单n分钟未支付自动取消,活动倒计时,定时发消息都可以使用到延迟消息。

一、延迟消息介绍

        生产者将消息发送到Broker,消费端不会立即消费,需要到达指定延迟时间才能被消费端消费。RocketMQ的延迟消息,默认支持设置18个延迟等级,每一个等级对应一个延迟时间。在Broker中会创建一个默认的SCHEDULE_TOPIC_XXXX的topic,topic下有18个队列,对应18个延迟等级。消息发送过来,会先把消息存储在topic名字为SCHEDULE_TOPIC_XXXX的队列内,等着延迟时间到了,在转发到目标队列,之后消费者在消费。

        由于broker是集群模式部署,每个节点都有18个队列。默认的延迟级别不满足业务需求也可以通过Message.setDelayTimeLevel( )方法进行设置,如下代码所示。

//默认
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//修改成20h
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 20h";
//新增20h,等级就变成了19
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 20h";

二、延迟消息举例

生产者发送消息时,设置Message的DelayTimeLevel字段,设置消息的延迟等级。比如设置DelayTimeLevel=4 延迟等级,则对应的延迟为30s。消费者30s之后可以消费到该消息。

Message msg = new Message(topic, tag, body);
//设置延迟level为4 对应30s。传输对应等级,不传输具体时间限制。
//level设置为0表示不延迟
msg.setDelayTimeLevel(level);

三、延时消息实现原理

RocketMQ延迟消息在Broker内部流程图:

RocketMQ系列5——4.X版本延迟消息

 步骤说明:

1.Broker接收到写入的消息,先将目标Topic和队列信息存储到消息的属性(Message对象内的Map properties)中,之后修改Message内的目标的Topic为SCHEDULE_TOPIC_XXXX,并根据生产者设置的延迟级别(delayTimeLevel)存入特定的queue。QueueId=delayTimeLevel-1。把消息内容写入到CommitLog中。

对应源码位置:org.apache.rocketmq.store.CommitLog#putMessage

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    ......
    // Delay Delivery 是否有延迟级别
    if (msg.getDelayTimeLevel() > 0) {
        //判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        //修改topic为延迟队列topic SCHEDULE_TOPIC_XXXX
        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        //根据延迟等级获取队列id msg.getDelayTimeLevel() - 1。确定放到SCHEDULE_TOPIC_XXXX中的那个队列
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
        // Backup real topic, queueId 记录原始的topic 和 queueId。
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        //更新消息发送的topic和queueId
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
    .....
}

2.异步的把CommitLog中的信息存储到Topic为SCHEDULE_TOPIC_XXXX的CosumeQueue中。CosumeQueue存储结构为CommitLog offset记录在commitLog中的位置、size记录存储消息的大小和MessageTag HashCode记录消息的Tag的哈希值,对于延迟消息记录的是投递时间的时间戳。投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间。

对应源码位置:org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
    // Timing message processing 定时消息处理逻辑
    {
        String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        //判断topic是否是延迟队列
        if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
            int delayLevel = Integer.parseInt(t);
            //判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值
            if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
            }
            //延迟界别大于0需要 计算 投递时间戳  当作消息Tag的哈希值存储到CosumeQueue中。
            if (delayLevel > 0) {
                tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                    storeTimestamp);
            }
        }
    }
}

3.broker内部的ScheduleMessageService类,作为延迟服务消费SCHEDULE_TOPIC_XXXX的Topic消息,启动时会创建一个定时器timer。根据延迟级别的个数,启动数量相等的TimerTask,每一个TimerTask负责一个延迟级别的消费和投递。每隔1秒钟执行一次DeliverDelayedMessageTimerTask任务,将到期的消息从延迟队列中写入正常Topic中。每个TimerTask检查消息是否到期,只会检查第一条,如果没到期,其他的不会继续检查。如果到期了,投递成功继续检查下一条消息是否过期。

对应源码位置:org.apache.rocketmq.store.schedule.ScheduleMessageService#start

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

public void start() {
    //CAS操作 保证了同一时间只会有一个DeliverDelayedMessageTimerTask执行。executeOnTimeup
    if (started.compareAndSet(false, true)) {
        //创建定时器timer
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        //遍历延迟队列针对每个级别创建一个TimerTask
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            if (timeDelay != null) {
                //每隔1秒钟执行一次延迟队列任务,将到期的消息从延迟队列中写入正常Topic中
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

4.消息到期后需要投递到目标的topic中。把第一步记录的目标Topic和队列信息获取到,重新设置Topic存储到CommitLog中,由于之前tagsCode存储的是消息投递时间,需要重新计算tag的哈希值之后在存储。

对应源码位置org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#messageTimeup

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
    //重新计算tagcode
    long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodeValue);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
    msgInner.setWaitStoreMsgOK(false);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
    String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
    int queueId = Integer.parseInt(queueIdStr);
    msgInner.setQueueId(queueId);
    return msgInner;
}

5.将消息直接投递到目标Topic的CosumeQueue中。

6.消费者消费目标topic中的数据。文章来源地址https://www.toymoban.com/news/detail-485056.html

到了这里,关于RocketMQ系列5——4.X版本延迟消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析

    RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。 分析的总体技术范围发送到存储,本文的主要目的是

    2024年02月10日
    浏览(32)
  • 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

    使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数,使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后,系统会自动保存Offset,并且如果加入了新的DefaultMQPushConsumer,系统会

    2024年02月11日
    浏览(26)
  • 【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析

    首先, DefaultMQPushConsumerImpl 是一个实现了 RocketMQ 的消费者客户端接口的类。该类的主要作用是从 RocketMQ 的 Broker 获取消息并进行消费。 主要可以通过pullMessage方法进行获取对应的操作,如下图所示。 在消费消息时, DefaultMQPushConsumerImpl 会将获取到的消息放入一个 processQueue

    2024年02月11日
    浏览(29)
  • RocketMQ 延迟队列

    指消息发送到某个队列后,在指定多长时间之后才能被消费。 定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。 可以配置自定

    2024年02月05日
    浏览(18)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)
  • RabbitMQ+springboot用延迟插件实现延迟消息的发送

    延迟队列:其实就是死信队列中消息过期的特殊情况 延迟队列应用场景: 可以用死信队列来实现,不过死信队列要等上一个消息消费成功,才会进行下一个消息的消费,这时候就需要用到延迟插件了,不过要线在docker上装一个插件 前置条件是在Docker中部署过RabbitMq。 1、打开

    2024年02月10日
    浏览(36)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(56)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(38)
  • Rabbitmq延迟消息

    延迟消息有两种实现方案: 1,基于死信队列 2,集成延迟插件 使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念: 消息的TTL(存活时间)和死信交换机Exchange,通过这两者的组合来实现延迟队列 1.1 消息的TTL(Time To Live) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和

    2024年02月13日
    浏览(35)
  • kafka实现延迟消息

    我们知道消息中间件mq是支持延迟消息的发送功能的,但是kafka不支持这种直接的用法,所以我们需要独立实现这个功能,以下是在kafka中实现消息延时投递功能的一种方案 主要的思路是增加一个检测服务,这个检测服务会每分钟定时从延时队列中获取消息,然后判断这些延迟

    2024年04月26日
    浏览(21)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包