RabbitMq整合springboot超详细,超适合新手

这篇具有很好参考价值的文章主要介绍了RabbitMq整合springboot超详细,超适合新手。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、引入springboot整合amqp的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml 配置
server:
  port: 8111
spring:
  rabbitmq:
    port: 5672
    host: localhost
    username: guest
    password: guest
    #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
    publisher-confirm-type: correlated
    #保证交换机能把消息推送到队列中
    publisher-returns: true
    virtual-host: /
    #这个配置是保证消费者会消费消息,手动确认
    listener:
      simple:
        acknowledge-mode: manual
    template:
      mandatory: true
3、RabbitConfig.java (自定义Rabbitmq配置类)

配置详细解释都写在注解上了

//常用的三个配置如下
//1---设置手动应答(acknowledge-mode: manual)
// 2---设置生产者消息发送的确认回调机制 (  #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
//    publisher-confirm-type: correlated
//    #保证交换机能把消息推送到队列中
//    publisher-returns: true
//     template:
//      #以下是rabbitmqTemplate配置
//      mandatory: true)
// 3---设置重试
@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    //@Bean  缓存连接池
    //public CachingConnectionFactory rabbitConnectionFactory

    @Autowired
    private RabbitProperties properties;

	//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
    // 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
//    @Bean
//    public ConnectionFactory connectionFactory() throws Exception {
//        //创建工厂类
//        CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
//        //用户名
//        cachingConnectionFactory.setUsername("gust");
//        //密码
//        cachingConnectionFactory.setPassword("gust");
//        //rabbitMQ地址
//        cachingConnectionFactory.setHost("127.0.0.1");
//        //rabbitMQ端口
//        cachingConnectionFactory.setPort(Integer.parseInt("5672"));
//
//        //设置发布消息后回调
//        cachingConnectionFactory.setPublisherReturns(true);
//        //设置发布后确认类型,此处确认类型为交互
//        cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//
//        cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
//        return  cachingConnectionFactory;
//    }


    // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(rabbitConnectionFactory);

        // 并发消费者数量
        containerFactory.setConcurrentConsumers(1);
        containerFactory.setMaxConcurrentConsumers(20);
        // 预加载消息数量 -- QOS
        containerFactory.setPrefetchCount(1);
        // 应答模式(此处设置为手动)
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //消息序列化方式
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 设置通知调用链 (这里设置的是重试机制的调用链)
        containerFactory.setAdviceChain(
                RetryInterceptorBuilder
                        .stateless()
                        .recoverer(new RejectAndDontRequeueRecoverer())
                        .retryOperations(rabbitRetryTemplate())
                        .build()
        );
        return containerFactory;
    }

    // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
        //默认是用jdk序列化
        //数据转换为json存入消息队列,方便可视化界面查看消息数据
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
        rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
        //CorrelationData correlationData, boolean b, String s
        rabbitTemplate.setConfirmCallback(
                (correlationData, b, s) -> {
                    System.out.println("ConfirmCallback     "+"相关数据:"+  correlationData);
                    System.out.println("ConfirmCallback     "+"确认情况:"+b);
                    System.out.println("ConfirmCallback     "+"原因:"+s);
                });
        //Message message, int i, String s, String s1, String s2
        rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {
            System.out.println("ReturnCallback:     "+"消息:"+message);
            System.out.println("ReturnCallback:     "+"回应码:"+i);
            System.out.println("ReturnCallback:     "+"回应消息:"+s);
            System.out.println("ReturnCallback:     "+"交换机:"+s1);
            System.out.println("ReturnCallback:     "+"路由键:"+s2);
        });

        return rabbitTemplate;
    }

    //重试的Template
    @Bean
    public RetryTemplate rabbitRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // 设置监听  调用重试处理过程
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 执行之前调用 (返回false时会终止执行)
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 重试结束的时候调用 (最后一次重试 )
                System.out.println("---------------最后一次调用");

                return ;
            }
            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                //  异常 都会调用
                System.err.println("-----第{}次调用"+retryContext.getRetryCount());
            }
        });
        retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
        retryTemplate.setRetryPolicy(retryPolicyByProperties());
        return retryTemplate;
    }

    @Bean
    public ExponentialBackOffPolicy backOffPolicyByProperties() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
        long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
        double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
        // 重试间隔
        backOffPolicy.setInitialInterval(initialInterval * 1000);
        // 重试最大间隔
        backOffPolicy.setMaxInterval(maxInterval * 1000);
        // 重试间隔乘法策略
        backOffPolicy.setMultiplier(multiplier);
        return backOffPolicy;
    }

    @Bean
    public SimpleRetryPolicy retryPolicyByProperties() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
        retryPolicy.setMaxAttempts(maxAttempts);
        return retryPolicy;
    }
}
4、在程序中创建交换机,队列,并且绑定

DirectRabbitConfig.java(创建direct类型的交换机)

@Configuration
public class DirectRabbitConfig {
    //创建一个名为TestDirectQueue的队列
    @Bean
    public Queue TestDirectQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
        // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
        return new Queue("TestDirectQueue",true);
    }
    //创建一个名为TestDirectExchange的Direct类型的交换机
    @Bean
    DirectExchange TestDirectExchange(){
        // durable:是否持久化,默认是false,持久化交换机。
        // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
        // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //绑定交换机和队列
    @Bean
    Binding bindingDirect(){
        //bind队列to交换机中with路由key(routing key)
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("123");
    }
}

PS:一定要在该类上加@Configuration该注解,使得程序启动的时候运行配置类。

5、创建生产者测试
@RestController
public class TestController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/sendMessage")
    public String sendDirectMessage(){
        String messageId = UUID.randomUUID().toString();
        String messageData = "test message,hello!";
        String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("data",messageData);
        map.put("current",current);
        rabbitTemplate.convertAndSend("TestDirectExchange", "123", map, new CorrelationData(UUID.randomUUID().toString()));
        return "ok";
    }
}
6、访问localhost:8111/sendMessage推送消息到消息队列中。

由于设置了消息发送确认,所以控制台会输出回调函数调用的内容。
rabbitmq springboot配置,java,前端,spring

登录RabbitMq后台查看消息情况。

rabbitmq springboot配置,java,前端,spring

7、创建一个消费者,来消费队列中的消息。
@RabbitListener(queues = "TestDirectQueue")
@Component
public class DirectConsumer {

    @RabbitHandler
    public void process(Map map , Channel channel, Message message) throws IOException {
        System.out.println("消费者接受到的消息是:"+map.toString());
        //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

PS :@RabbitListener不仅可以加在类上,还可以加载方法体上。上述消费者,在程序启动后,如果该队列不存在,那么会报org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s) 错误。
        所以在消费者端,健壮的写法就是也创建队列和交换机,如果队列和交换机存在,那么就拿来使用,不存在则创建,这样就不会报该错误。
因此@RabbitListener有另一种用法,如下:
 

@Component
public class DirectConsumer2 {
    @RabbitHandler
    @RabbitListener(bindings = {@QueueBinding(
    		value = @Queue(value = "q5",durable = "true"),//如果不括号中不指定队列名称,那么这时候创建的就是临时队列,当消费者连接断开的时候,该队列就会消失
            exchange = @Exchange(value = "myexchange",durable = "true",type = "direct"),
            key = "123")})
    public void process(Map map , Channel channel, Message message) throws IOException {
        System.out.println("消费者接收到的消息是"+map.toString());
        //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

上述列子在@RabbitListener中声明了队列和交换机,并且指定了routing key,当这一对关系存在时,那么会直接使用,不存在就会创建。

上述消费者和生产者都只是使用了Exchange的Direct模式。下面再介绍Fanout和Topic模式。

Fanout模式
fanout会忽略routingkey(路由键)的规则,只要绑定到该exchange上的队列都会收到该消息。所以fanout也相当于广播,队列只要订阅绑定了这个Exchange,那么消息都会被转发到这些队列中。
创建FanoutConfig.java,配置创建Fanout类型的Exchange,再创建三个队列FanoutA、FanoutB、FanoutC。将这三个队列绑定到创建的FanoutExchange中。

@Configuration
public class FanoutConfig {
    //创建FanoutExchange
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("FanoutExchange",true,false);
    }

    //创建队列A
    @Bean
    Queue queueA(){
        return new Queue("FanoutA",true,false,false);
    }

    //创建队列B
    @Bean
    Queue queueB(){
        return new Queue("FanoutB",true,false,false);
    }

    //创建队列C
    @Bean
    Queue queueC(){
        return new Queue("FanoutC",true,false,false);
    }
    
    //将创建的队列绑定到创建的交换机上
    @Bean
    Binding bindingA(){
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }
    @Bean
    Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }
    @Bean
    Binding bindingC(){
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

在TestController.java中新加一个请求地址用来发送消息到Fanout交换机中

@RestController
public class TestController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage2")
    public String sendFanoutMessage(){
        String messageId = UUID.randomUUID().toString();
        String messageData = "test message,hello!";
        String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("data",messageData);
        map.put("current",current);
        rabbitTemplate.convertAndSend("FanoutExchange", "", map, new CorrelationData(UUID.randomUUID().toString()));
        return "ok";
    }
}

访问localhost:8111/sendMessage2,查看消息推送情况。
rabbitmq springboot配置,java,前端,spring
看到一条消息被转发到了这三个队列中。

之后创建一个消费者,消费消息,消费者的创建和上述差不多,只不过换了一些监听的队列而已。
FanoutConsumer.java

@Component
public class FanoutConsumer {
    //这里把两种@RabbitListener的注解都写出来了,这两种写法都要认得,第二种写法比较健壮
    //@RabbitListener(queues = "FanoutA")
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(value = "FanoutA",durable = "true"),
            exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
            key = ""
    )})
    @RabbitHandler
    public void processA(Map map, Channel channel, Message message) throws IOException {
        System.out.println("收到的FanoutA队列的消息是:"+map.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }


    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(value = "FanoutB",durable = "true"),
            exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
            key = ""
    )})
    @RabbitHandler
    public void processB(Map map, Channel channel, Message message) throws IOException {
        System.out.println("收到的FanoutB队列的消息是:"+map.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(value = "FanoutC",durable = "true"),
            exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
            key = ""
    )})
    @RabbitHandler
    public void processC(Map map, Channel channel, Message message) throws IOException {
        System.out.println("收到的FanoutC队列的消息是:"+map.toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

 

运行程序后结果如下:


Topic模式
exchange会转发到符合routingkey的消息队列中。也就是说发送消息的routingkey符合队列和exchange绑定的routingkey规则,那么这个消息就会被转发到这些队列中。一个消息有可能被多个队列消费。

创建TopicConfig.java,配置创建Topic类型的Exchange,再创建三个队列TopicA、TopicB、TopicC。将这三个队列绑定到创建的TopicExchange中。TopicA绑定的routing key为test.#,TopicB绑定的routing key为test.*,TopicC绑定的routing key为test.topic。
 

@Configuration
public class TopicConifg {

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("TopicExchange",true,false);
    }

    @Bean
    Queue TopicqueueA(){
        return new Queue("TopicA",true,false,false);
    }

    @Bean
    Queue TopicqueueB(){
        return new Queue("TopicB",true,false,false);
    }

    @Bean
    Queue TopicqueueC(){
        return new Queue("TopicC",true,false,false);
    }

    @Bean
    Binding TopicbindingA(){
        return BindingBuilder.bind(TopicqueueA()).to(topicExchange()).with("test.#");
    }
    @Bean
    Binding TopicbindingB(){
        return BindingBuilder.bind(TopicqueueB()).to(topicExchange()).with("test.*");
    }

    @Bean
    Binding TopicbindingC(){
        return BindingBuilder.bind(TopicqueueC()).to(topicExchange()).with("test.topic");
    }
}

在TestController.java中新加一个请求地址用来发送消息到Topic交换机中。

 @GetMapping("/sendMessage3")
    public String sendTopicMessage(){
        String messageId = UUID.randomUUID().toString();
        String messageData = "test message,hello!";
        String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("data",messageData);
        map.put("current",current);
        rabbitTemplate.convertAndSend("TopicExchange", "test.topic.a", map, new CorrelationData(UUID.randomUUID().toString()));
        return "ok";
    }

访问localhost:8111/sendMessage3,推送消息到TopicExchange中,并且routing key 为test.topic.a。查看消息转发情况。

可以看到只有队列TopicA接收到消息了。因为在Topic模式下的Exchange,转发消息的routing key规则是:

#:匹配一个或者多个词
*:匹配一个或者0个词
比如test.topic.a 只会匹配test.#,test.a会匹配test. * 和test.#,test只会匹配test. *

1、修改发送消息的routing key 为test,会发现只有TopicA接收到消息。
2、修改发送消息的routing key 为test.topic时,会发现三个队列都接收到消息。

总结:

在Exchange中,有三种模式:Direct,Fanout,Topic。
Direct模式只会将消息转发到符合绑定routing key的队列中,如果没有符合routing key的队列,那么消息会丢失。而且Direct发送的消息是唯一的,也就是说再Direct中的一个消息,最后只会发送到一个队列中被消费。
Fanout模式会无视routing key,会把消息转发到所有绑定到该交换机上的队列中。所以Fanout中的一个消息,会转发到所有的队列中,也就是如果绑定了多个队列,那么一个相同的消息会在多个队列中。
Topic模式有一套转发的routing key规则,只会把消息转发到符合routing key 的队列中。所以在Topic中的一个消息有可能也会被转发到多个队列中进行消费。
 文章来源地址https://www.toymoban.com/news/detail-697012.html

到了这里,关于RabbitMq整合springboot超详细,超适合新手的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】4 Spring/SpringBoot整合RabbitMQ

    spring-amqp 是对AMQP的一些概念的一些抽象, spring-rabbit 是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等。 RabbitAdmin 类完成对Exchange,Queue,Binding的操作,在容器中管理了 RabbitAdmin 类的时候,可以对Exchange,Queue,Binding进行自

    2024年01月22日
    浏览(43)
  • SpringBoot项目整合RabbitMQ

    消息队列(Message Queue)是分布式系统中常用的组件,它允许不同的应用程序之间通过发送和接收消息进行通信。Spring Boot提供了简单且强大的方式来整合消息队列,其中包括RabbitMQ、ActiveMQ、Kafka等多种消息队列实现。 本文将以RabbitMQ为例,详细介绍如何使用Spring Boot来整合消

    2024年02月09日
    浏览(55)
  • SpringBoot整合RabbitMQ

    🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,计算机系列(火速更新中) 💭 格言:种一棵树最好的时间是十年前,其次是现在 🏡动动小手,点个关注不迷路,感

    2024年02月21日
    浏览(44)
  • SpringBoot 整合 RabbitMQ

    由于有的 Idea 不选择插线无法创建 Spring Boot 项目,这里我们先随便选一个插件,大家也可以根据需求选择~~ 把版本改为 2.7.14 引入这两个依赖: 配置 application.yml文件 Config 类 : RabbitMQConfig 测试类: RabbitMQConfigTests 结果 当我们启动 测试类 之后就可以发现我们的 rabbitmq 界面里的

    2024年02月10日
    浏览(38)
  • SpringBoot整合RabbitMQ(基础)

    一.环境准备 1、在pom文件中引入对应的依赖: 2、在application.yml配置文件中配置RabbitMQ: 二、整合 点对点,简单模式 ①配置文件中声明队列 ②创建生产者 消息发送成功后,在web管理页面查看: 可以看到对应队列中产生了消息 ③创建消费者 启动项目,可以看到消息成功消费:

    2024年02月11日
    浏览(36)
  • 单个springboot整合rabbitmq

    rabbitmq是一种消息中间件,是基于erlang语言开发的AMQP(高级消息队列协议)的开源实现。 本质是个队列,FIFO先入先出。 1.1.1 rabbitmq特性: 开源,性能优秀,稳定保障 提供可靠的消息投递模式,返回模式 于Spring AMQP完美整合,API丰富 集群模式丰富 高可用 1.1.2 rabbitmq主要结构 生产

    2024年02月10日
    浏览(65)
  • SpringBoot整合实现RabbitMQ

    本文大纲 一.RabbitMQ介绍 二.RabbitMQ的工作原理 2.1 RabbitMQ的基本结构 2.2 组成部分说明 2.3 生产者发送消息流程 2.4 消费者接收消息流程 三.SpringBoot 整合实现RabbitMQ 3.1创建mq-rabbitmq-producer(生产者)发送消息 3.1.1pom.xml中添加相关的依赖 3.1.2 配置application.yml 3.1.3 配置RabbitMQ常量类

    2024年02月17日
    浏览(43)
  • SpringBoot 整合RabbitMQ

    2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出

    2024年02月15日
    浏览(51)
  • RabbitMQ整合Springboot

    目录 一、配置 二、使用 (1)创建普通交换机 (2) 创建普通队列 (3)绑定 交换机--队列 (4)创建带有死信交换机的队列 (5)生产者 (6)消费者 (7)Message对象 (8)延时队列优化(死信实现延时,有缺陷) 三、Rabbitmq插件实现延迟队列(重点) 四、发布确认 (1)确认回调

    2024年02月15日
    浏览(40)
  • springboot整合rabbitmq死信队列

    什么是死信 需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置: 从上面的逻辑图中,可以发现大致的思路: .1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。 2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包