SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践

这篇具有很好参考价值的文章主要介绍了SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

这篇是关于我使用Spring Cloud Steam操作RabbitMQ采用ttl+死信队列的方式实现的延迟队列。

前言

在公司项目中遇到了需要延迟队列的需求,为了以后可维护性和扩展性要求必须要用Springcloud Stream组件来操作mq,而且公司的rabbit也不允许安装延迟插件,只能用最原始的ttl+死信来实现,在搭建过程中遇到很多问题,最终成功实现,下面是代码,采用的是spring cloud steam3.1后的函数式编程实现。

先科普下原理:生产者发送消息到普通交换机绑定了个设置ttl时间的队列,这个队列绑定了个死信交换机且没人消费,如果消息过期就会发送到死信队列里,消费者就监听这个死信队列实现延迟队列的效果。

yaml配置文件

function中definition就是定义你的函数名称(后面发送和接收方法的名字)

bindings中的xxx-in/out-0,分为三个部分。第一个部分就是方法名,第二部分是输入或输出代表生产者out还是消费者in的意思,第三部分的0在rabbitmq里面是固定的,他是为了兼容kafkaf提供的,我们死写0就行。destination里面配置交换机的名字,后面的变量是环境的意思,到时候会变成dev、test、uat这些。group分组要设置一下我这里就叫模块的名字了。

重点:下面的producer:required-groups一定要配置,就是那个分组名字。

然后就是配置消费者,绑定的交换机要是最下面为生产者配置的死信交换机,分组名字也要记得填上不然不会消费。

后面的content-type是类型,可以不指定,默认就是这个

最下面是rabbit的配置,bingdings里面我配置了为生产者邦迪一个死信交换机,然后设置生产者多久没消费就会到死信的ttl。dead-letter-exchange是指定交换机名字,你就设置为原本交换机名称_DLX就好了为了规范,然后配置死信队列的名字,就是在交换机后面加上那个分组,一定要和上面的分组一致。

因为这里死信队列出来的消息是direct的,消费者是监听死信队列已经过期的消息的,所以交换机类型也要设置为direct,不然会报:direct消息转化topic异常,但是这个异常只会第一个消息报,后面的消息都不会了,可能是springcloud stream底层帮我们处理了,但为了避免第一次报错还是设置一下交换机类型。

  cloud:
    function:
      definition: memberAccountUpdateTaskDelayed;handleTaskDelayed;
    stream:
      bindings:
        memberAccountUpdateTaskDelayed-out-0:
          destination: MEMBER_TOPIC_${spring.profiles.active} #延迟exchange,交换模式是topic
          content-type: application/json #设置消息的类型为json
          group: ${spring.application.name}
          producer:
            required-groups: ${spring.application.name} #必须要制定生产者分组,不然发不过去
        handleTaskDelayed-in-0:
          destination: MEMBER_TOPIC_DLX_${spring.profiles.active}
          content-type: application/json
          group: ${spring.application.name}
      rabbit:
        bindings:
          memberAccountUpdateTaskDelayed-out-0:
            producer:
              ttl: 10000 #延时队列的延时时间,单位毫秒
              auto-bind-dlq: true #开启死信队列
              dead-letter-exchange: MEMBER_TOPIC_DLX_${spring.profiles.active} #死信交换机
              dead-letter-queueName: MEMBER_TOPIC_DLX_${spring.profiles.active}.${spring.application.name} #死信队列名称
          handleTaskDelayed-in-0:
            consumer:
              exchange-type: direct #死信交换机必须是direct类型的才能接受

生产者发送消息 

这里我发送的是对象,在TaskDelayMessage就是发送的对象,实体类里面的属性上记得加上JSON反序列化的注解,不然消费者监听的时候会报反序列化错误。

方法名要和配置文件你配置的那个方法名一样

@Repository
@Slf4j
public class MemberAccountUpdateTaskHandleTimeoutTaskRepositoryImpl implements MemberAccountUpdateTaskHandleTimeoutTaskRepository {

    private final Sinks.Many<Message<TaskDelayMessage>> sinks =
            Sinks.many().multicast().onBackpressureBuffer();

    @Bean
    public Supplier<Flux<Message<TaskDelayMessage>>> memberAccountUpdateTaskDelayed(){
        return sinks::asFlux;
    }

    @Override
    public void sendDelayMessage(TaskDelayMessage message) {
        log.info("生产者准备发送消息:{}", message+" -"+System.currentTimeMillis());
        Message<TaskDelayMessage> msg = MessageBuilder.withPayload(message).build();
        while (sinks.tryEmitNext(msg).isFailure()) {
            LockSupport.parkNanos(10);
        }
        log.info("生产者成功发送消息:{}", message+" -"+System.currentTimeMillis());
    }
}

消费者监听消息

方法名要和配置文件你配置的那个方法名一样

    @Bean
	Consumer<TaskDelayMessage> handleTaskDelayed() {
		return message-> {
			log.info("消费者监听到了消息:{}", message);
			memberAccountUpdateTaskService.handleTimeOutTasks(message);
		};
	}

总结 

总结:其实就是配置麻烦,国内springcloud steam的教程和博客是真的很少,搭建过程中遇到了很多问题csdn和b站也都没找到解决办法。最后看了油管和springcloud官方文档还有追进去看源码才解决。如果想学习的stream这个组件推荐去油管看教程,或者直接看springcloud官网文档。文章来源地址https://www.toymoban.com/news/detail-659089.html

到了这里,关于SpringCloudStream整合RabbitMQ用ttl+死信实现延迟队列的实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

    SpringBoot整合SpringCloudStream3.1+版本Kafka 添加死信队列配置文件,添加对应channel 通道绑定配置对应的channel位置添加重试配置 Kafka基本配置(application-mq.yml) 创建死信队列配置文件(application-dql.yml) 注意:这里的valueSerde使用了对象类型,需要搭配 application/json 使用,consumer接收

    2024年02月16日
    浏览(38)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(60)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(53)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(59)
  • RabbitMQ详解(五):过期时间TTL、死信队列、磁盘监控

    过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。 目前有两种方法可以设置 第一种方法是通过队列属性设置,队列中所有消息都有相同的TTL。 第二种方法是对消息进行单独设置,每条消息TTL可以不同。 当同时

    2024年02月04日
    浏览(43)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(56)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • RabbitMQ养成记 (10.高级特性:死信队列,延迟队列)

    这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点 什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。 注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。 队列的消息长度 到达限制

    2024年02月05日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包