RabbitMQ延时队列的实现原理和应用实例

这篇具有很好参考价值的文章主要介绍了RabbitMQ延时队列的实现原理和应用实例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、延时队列应用场景

二、RabbitMQ实现原理

1、RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
目前有两种方法可以设置消息的 TTL:
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间;
第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。
如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。

2、死信(Dead Letter)队列

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

1)消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
2)消息在队列的存活时间超过设置的TTL时间。
3)消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

3、应用RabbiMQ死信,实现延时队列功能

1)给消息m设置TTL,通过一个普通交换机X1路由到一个普通队列Q1;
2)这个普通队列Q1并没有配置消费者,但Q1事先配置了一个死信交换机X2,当TTL时间到了之后、消息m变成死信,就会被丢到死信交互机X2中;
3)X2会将消息m路由到对应的队列、也叫死信队列Q2中,Q2事先配置了消费者C,这样消息m就会被投递到消费者那里。

总结一下:消息m,发送到rabbitmq后,经过了设置的ttl时间,被投递到消费者C那里。

这时,也许有小伙伴会说,这有什么,不就是一个简单的定时吗?
非也、非也,我们说一个需求场景,帮助大家理解:假设商城要做拼团活动功能,你的任务是在活动开始时间,给发起人发一个消息,很简单明了的一个功能。
发短信功能简单,这个需求的关键,是在活动开始时要触发发短信接口,也就是活动开始时你要能感知到;注意啊、是活动开始时间,不是活动创建时间,我完全可以创建一个2天后开始的拼团活动,所以肯定不能在创建活动时调用发短信接口。
这时你会说,这还不简单,创建活动时根据开始时间与当前时间的差值,设置一个定时器不就可以了?
你仔细想一下,活动有很多个,活动的开始时间千变万化,难道你要为每一个活动创建一个定时器吗?这显然不可能。
当然,定时器配合数据库表,可以实现动态配置;这样做是可以,但一来这样做费老鼻子劲儿了,二来这么多同时运行的定时器,对系统性能的耗费是巨大的,为了这么一个小功能显然得不偿失。
这时你再看rabbimq的延时队列,是不是怎么看怎么顺眼?可靠,又因为是中间件耦合性小,写成延时功能后可以像工具方法一样到处使用,高可用、但对系统性能牺牲却很小,完美。
在多说一点,RabbitMQ除了支持给消息设置TTL,还支持给队列设置TTL;但是给队列设置TTL和普通的定时器一样,不能重复使用,所以也就不多说了。

4、延时消息插件rabbitmq_delayed_message_exchange

1)rabbitmq_delayed_message_exchange插件的介绍和使用方法

3中讲述的延时队列模型有一个问题,比如我们给队列Q1中发送了两个消息,第一个是5天后执行(即TTL等于5天的毫秒值)的消息m1,第二个是2天后执行的消息m2;结果你执行后发现,消息m1和m2都是5天后收到的、这就是说消息m2并没有被按时发送。
这是由于,rabbitmqRabbitMQ 只会检查第一个消息是否过期,如果前边的消息TTL比后边的大,那么后面的消息即使已经超时也不能变成死信、也就不能按时投递到消费者了。
为了更方便的实现延时队列功能,RabbitMQ提供了rabbitmq_delayed_message_exchange插件;需要开发者自己进行安装。
这个插件,相当于提供了一个新类型的交换机,所以插件安装成功后,在rabbitmq控制台页面,查看exchange类型,如果出现x-delayed-message类型的选项
rabbitmq延时队列实现原理,中间件,后端,rabbitmq,分布式

a)实现机制

x-delayed-message类型的交换机支持延迟投递机制,消息传到交换机后、并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中

b)使用方法

因为x-delayed-message类型的交换机支持延迟投递机制,所以rabbitmq延时队列模型得到简化
我们只需要新建一个死信队列与x-delayed-message类型的交换机进行绑定,即可实现上面3中的延时队列功能。
需要注意:用户必须使用名为x-delay的特殊header,向交换机发布延迟消息,该header需要一个整数,表示RabbitMQ应延迟消息的毫秒数。

rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> {
            Long current = DateUtil.getDateline();
            //设置延时时间,目标时间多5秒
            Long time = (triggerTime - current) * 1000 + 5000 ;
                message.getMessageProperties().setHeader("x-delay", time);
            return message;
        });
c)优缺点

插件优点:
不需要为延迟消息单独创建路由、交换器、队列;

插件缺点:
1、不支持对已发送消息进行管理,只能在Web管理页面查看发送的数量DM;
2、集群中只有一个副本(保存在当前节点下的Mnesia表中),如果节点不可用或关闭插件会丢失消息;
3、目前该插件只支持disk节点,不支持ram节点;
4、性能比原生的差一点(普通的Exchange收到消息后直接路由到队列,而延迟队列需要判断消息是否过期,未过期的需要保存在表中,时间到了再捞出来路由);

2)rabbitmq_delayed_message_exchange插件的安装

三、应用实例——生产者

1、延时任务接口(消息生产者使用的接口)

这里生产者模块化为一个接口,我觉得直接写一个类也可以,因为这个接口只有一个实现。

注意下面延时队列触发器的add方法,第一个参数executerName。因为之后消息消费者,调用执行接口是高可用的设计,即执行接口有多个实现,所以这里将本次使用延时队列需要调用的实现类的BeanId、也就是首字母小写的类名(默认是这样,有配置另说)放入消息体传过去;这样执行器执行前,先通过ApplicationContext类的getBean(beanId);方法得到一个指定执行器实现的对象。这个地方,正是本延时队列、代码高可用设计的关键所在。

/**
 * 延时执行接口
 */
public interface TimeTrigger {

    /**
     * 添加延时任务
     *
     * @param executerName 执行器beanid
     * @param param        执行参数
     * @param triggerTime  执行时间 时间戳 秒为单位
     * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
     *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
     *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
     *                     业务内全局唯一
     */
    void add(String executerName, Object param, Long triggerTime, String uniqueKey);

    /**
     * 修改延时任务
     *
     * @param executerName   执行器beanid
     * @param param          执行参数
     * @param triggerTime    执行时间 时间戳 秒为单位
     * @param oldTriggerTime 旧的任务执行时间
     * @param uniqueKey      添加任务时的唯一凭证
     */
    void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey);

    /**
     * 删除延时任务
     *
     * @param executerName 执行器
     * @param triggerTime  执行时间
     * @param uniqueKey    添加任务时的唯一凭证
     */
    void delete(String executerName, Long triggerTime, String uniqueKey);
}

2、延时任务接口实现(消息生产者使用的接口实现)

/**
 * 延时任务生产 rabbitmq实现
 *
 * @Description: 原理:利用amqp的死信队列的超时属性,将超时的任务转到普通队列交给消费者执行。
 * 添加任务,将任务执行标识、beanid、执行时间,hash值存入redis,标识任务需要执行
 * 任务编辑,将之前的标识删除,重新添加任务
 * 添加删除,删除redis中的任务标识,消费者执行时获取不到 redis中的标识,则不会执行延时任务
 */
@Component
public class RabbitmqTimeTrigger implements TimeTrigger {

    /**
     * 引入rabbit的操作模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Cache cache;


    private final Logger logger = LoggerFactory.getLogger(getClass());


    /**
     * 添加延时任务
     *
     * @param executerName 执行器
     * @param param        执行参数
     * @param triggerTime  执行时间
     * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
     *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
     *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
     *                     业务内全局唯一
     */
    @Override
    public void add(String executerName, Object param, Long triggerTime, String uniqueKey) {

        if (StringUtil.isEmpty(uniqueKey)) {
            uniqueKey = StringUtil.getRandStr(10);
        }
        //标识任务需要执行
        cache.put(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey), 1);

        TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executerName, param, triggerTime, uniqueKey);
        logger.debug("定时执行在【" + DateUtil.toString(triggerTime, "yyyy-MM-dd HH:mm:ss") + "】,消费【" + param.toString() + "】");
        rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> {

            Long current = DateUtil.getDateline();
            //如果执行的延时任务应该是在现在日期之前执行的,那么补救一下,要求系统一秒钟后执行
            if (triggerTime < current) {
                message.getMessageProperties().setDelay(1000);
            } else {
                Long time = (triggerTime - current) * 1000 + 5000 ;
                message.getMessageProperties().setHeader("x-delay", time);
            }
            logger.debug("还有【" + message.getMessageProperties().getExpiration() + "】执行任务");

            return message;
        });
    }

    /**
     * 修改延时任务
     *
     * @param executerName 执行器
     * @param param        执行参数
     * @param triggerTime  执行时间
     * @param uniqueKey    添加任务时的唯一凭证
     */
    @Override
    public void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey) {

        //标识任务放弃
        cache.remove(RabbitmqTriggerUtil.generate(executerName, oldTriggerTime, uniqueKey));
        //重新添加任务
        this.add(executerName, param, triggerTime, uniqueKey);
    }

    /**
     * 删除延时任务
     *
     * @param executerName 执行器
     * @param triggerTime  执行时间
     * @param uniqueKey    添加任务时的唯一凭证
     */
    @Override
    public void delete(String executerName, Long triggerTime, String uniqueKey) {
        cache.remove(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey));
    }
}

3、其中一个生产者的具体实例

/**
 * 拼团业务类
 */
@Service
public class PintuanManagerImpl implements PintuanManager {

    @Autowired
    private PintuanMapper pintuanMapper;
    ...
    
//只记录消息生产部分代码
	@Override
    @Transactional(value = "tradeTransactionManager", propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public Pintuan add(Pintuan pintuan) {
        //检测开始时间和结束时间
        PromotionValid.paramValid(pintuan.getStartTime(), pintuan.getEndTime(), 1, null);
        this.verifyParam(pintuan.getStartTime(), pintuan.getEndTime());
        pintuan.setStatus(PromotionStatusEnum.WAIT.name());
        pintuan.setSellerName(UserContext.getSeller().getSellerName());
        pintuan.setCreateTime(DateUtil.getDateline());
        pintuan.setSellerId(UserContext.getSeller().getSellerId());
        //可操作状态为nothing,代表活动不可以执行任何操作
        pintuan.setOptionStatus(PintuanOptionEnum.NOTHING.name());
        this.pintuanMapper.insert(pintuan);

        //创建活动 启用延时任务,活动开始时启动拼团
        PintuanChangeMsg pintuanChangeMsg = new PintuanChangeMsg();
        pintuanChangeMsg.setPintuanId(pintuan.getPromotionId());
        pintuanChangeMsg.setOptionType(1);
        timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getStartTime(), TRIGGER_PREFIX + pintuan.getPromotionId());

        return pintuan;
    }

}

四、应用实例——消费者

1、消费者

/**
 * 延时任务 消息消费者
 */
@Component
public class TimeTriggerConsumer {


    private final Logger logger = LoggerFactory.getLogger(getClass());


    @Autowired
    private Cache cache;

    /**
     * 接收消息,监听 CONSUMPTION_QUEUE 队列
     */
    @RabbitListener(queues = TimeTriggerConfig.IMMEDIATE_QUEUE_XDELAY)
    public void consume(TimeTriggerMsg timeTriggerMsg) {

        try {
            String key = RabbitmqTriggerUtil.generate(timeTriggerMsg.getTriggerExecuter(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());

            //如果这个任务被标识不执行
            if (cache.get(key) == null) {

                logger.debug("执行器执行被取消:" + timeTriggerMsg.getTriggerExecuter() + "|任务标识:" + timeTriggerMsg.getUniqueKey());
                return;
            }
            logger.debug("执行器执行:" + timeTriggerMsg.getTriggerExecuter());
            logger.debug("执行器参数:" + JsonUtil.objectToJson(timeTriggerMsg.getParam()));

            //执行任务前 清除标识
            cache.remove(key);

            TimeTriggerExecuter timeTriggerExecuter = (TimeTriggerExecuter) ApplicationContextHolder.getBean(timeTriggerMsg.getTriggerExecuter());
            timeTriggerExecuter.execute(timeTriggerMsg.getParam());

        } catch (Exception e) {
            logger.error("延时任务异常:", e);
        }
    }

}

2、消费者调用接口

/**
 * 延时任务执行器接口
 */
public interface TimeTriggerExecuter {


    /**
     * 执行任务
     * @param object 任务参数
     */
    void execute(Object object);

}

接口高可用、解耦合,有多个实现,用以支持不同的模块使用延时队列功能
rabbitmq延时队列实现原理,中间件,后端,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-742872.html

3、其中一个实现的具体操作,第一步都是将泛型类转成一个具体的、特定的实体类对象

/**
 * 拼团定时开启关闭活动 延时任务执行器
 */
@Component("pintuanTimeTriggerExecute")
public class PintuanTimeTriggerExecuter implements TimeTriggerExecuter {

    @Autowired
    private TimeTrigger timeTrigger;

    @Autowired
    private PintuanClient pintuanClient;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 执行任务
     */
    @Override
    public void execute(Object object) {
        PintuanChangeMsg pintuanChangeMsg = (PintuanChangeMsg) object;

        //如果是要开启活动
        if (pintuanChangeMsg.getOptionType() == 1) {
            Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
            if (PromotionStatusEnum.WAIT.name().equals(pintuan.getStatus()) ||
                    (PromotionStatusEnum.END.name().equals(pintuan.getStatus()) && PintuanOptionEnum.CAN_OPEN.name().equals(pintuan.getOptionStatus()))) {
                pintuanClient.openPromotion(pintuanChangeMsg.getPintuanId());
                //开启活动后,立马设置一个关闭的流程
                pintuanChangeMsg.setOptionType(0);
                timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getEndTime(), "{TIME_TRIGGER}_" + pintuan.getPromotionId());
                this.logger.debug("活动[" + pintuan.getPromotionName() + "]开始,id=[" + pintuan.getPromotionId() + "]");
            }
        } else {
            //拼团活动结束
            Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
            if (pintuan.getStatus().equals(PromotionStatusEnum.UNDERWAY.name())) {
                pintuanClient.closePromotion(pintuanChangeMsg.getPintuanId());
            }
            this.logger.debug("活动[" + pintuan.getPromotionName() + "]结束,id=[" + pintuan.getPromotionId() + "]");
        }
    }
}

到了这里,关于RabbitMQ延时队列的实现原理和应用实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一种多策略下RabbitMQ的延时队列实现

    场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开通成功,则关闭订

    2024年02月12日
    浏览(42)
  • RabbitMQ延时队列的详细介绍以及Java代码实现

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的延时队列以及其详细代码实现。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大

    2024年02月01日
    浏览(40)
  • RabbitMQ - 死信队列,延时队列

    死信队列: DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在 x-dead-letter-exchange 参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列

    2024年02月09日
    浏览(48)
  • 【技术分享】四、RabbitMQ “延时队列”

    延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。 ex: 定时任务:十分钟后执行某种操作。 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

    2024年02月08日
    浏览(53)
  • .NET中使用RabbitMQ延时队列和死信队列

    延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。 延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果

    2024年02月15日
    浏览(44)
  • 深入浅出RabbitMQ:顺序消费、死信队列和延时队列

    大家好,我是小❤,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 上篇文章(应对流量高峰的利器——消息中间件)中,我们已经介绍了消息中间件的用途,主要用作:解耦、削峰、异步通信、应用解耦,并介绍了业界常

    2024年02月03日
    浏览(37)
  • 创建延时队列、springboot配置多个rabbitmq

    type选择fanout (图中已经绑定,红框为绑定过程) (图中已经绑定,红框为绑定过程) 延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作 RabbitConfig配置类 mq1 mq2 application-prod.yaml mq1消费端,发消息给mq2 mq2消费端用于递归删除文件 FileHelper工具类递归删除文件或文

    2024年02月11日
    浏览(41)
  • rabbitmq延时队列自动解锁库存

    一、库存服务自动解锁库存 使用了最终一致性来解决分布式事务 当order服务出现异常回滚,此时ware服务无法回滚,怎么办? 使用seata全局事务虽然能在order服务出现异常导致 回滚 时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率 低不适合并发量大

    2024年02月16日
    浏览(56)
  • SpringBoot + RabbitMQ从延时队列中删除指定的值【RabbitMQ中的basicAck和basicNack的区别以及basicReject又是什么?】

    业务需求是,就是我本来是有一个order-queue队列绑定到了死信队列交换机order-dead-direct-exchange上,然后我的业务是,现在有一个用户下单但是没有付款,order-queue队列写入该条信息并计时24小时后如果用户还是未付款状态则移除到死信队列order-dead-queue中。问题来了,如果在这个

    2024年02月16日
    浏览(42)
  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包