RabbitMQ学习笔记

这篇具有很好参考价值的文章主要介绍了RabbitMQ学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

介绍

名词解释

Broker:接受和分发消息的应用,例如RabbitMQ Server

Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

Connection:生产者/消费者与broker之间的TCP连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

Exchange:交换机,按一定的规则将消息路由转发到某个队列。

Queue:消息队列

RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

安装

参考:https://www.rabbitmq.com/install-debian.html#apt-cloudsmith

最后安装server时不要带-y --fix-missing

用户权限配置参考:https://blog.csdn.net/theRengar/article/details/118933418

RabbitMQ命令参考:https://computingforgeeks.com/how-to-install-latest-rabbitmq-server-on-ubuntu-linux

工作模式

simple

一个消息的发布者和一个消费者

RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

<!--        AMQP依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
<!--        测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

可以将基本的RabbitMQ配置打包模块使用

字段配置类

@Configuration
public class RabbitMQConfig {
    public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq_demo_topic";
}

统一MQ配置

spring:
  rabbitmq:
    host: 
    port: 
    username: 
    password: 

新模块使用前面的模块做依赖,统一配置

RabbitMQ对象配置

@Configuration
public class RabbitMQConfiguration {
    /**
     * rabbitmq演示直接队列
     * @return {@code Queue}
     * 1、name:    队列名称
     * 2、durable: 是否持久化
     * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
     * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
     *
     */
    @Bean
    public Queue rabbitmqDemoDirectQueue() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, true);
    }
}

接收者

@Component
@RabbitListener(queues = RabbitMQConfig.RABBITMQ_DEMO_TOPIC)
public class Receiver {

    @RabbitHandler
    public void receive(String msg) {
        System.out.println("接收到消息:" + msg);
    }
}

使用@RabbitListener注解指定某方法作为接收器;或加在类上,并在相应方法上添加@RabbitHandler进行标记,可以根据接受的参数类型进入具体的方法中。

参考:https://blog.csdn.net/sliver1836/article/details/119734239

eg:

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
 
    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }
 
    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

发送者

也可以直接用spring的aqmpTemplate接口,但是少了一些方法

@Component
public class Sender {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
        rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, msg);
    }
}

Work Queues

多个消费端共同消费同一个队列的消息,消费者之间是竞争关系

RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

对绑定一个队列的消费者创建多个对象即可

Publish/Subscribe

消息发布者发布到交换机,交换机根据相应规则转发到对应队列

RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

交换机类型:direct, topic, headers,fanout

  1. Direct Exchange:直连交换机,根据Routing Key进行投递到不同队列。

    • 单个绑定,一个路由键对应一个队列。

      RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

    • 多个绑定,一个路由键对应多个队列,则消息会分别投递到两个队列中

      RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

  2. Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。此模式下Routing Key会被忽略

    RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

  3. Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示零个或多个词,*表示一个词。

    eg:“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”

    RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

  4. Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

    匹配规则x-match有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息

    RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

声明交换机和队列的Bean,之后进行绑定。

@Configuration
public class RabbitMQConfiguration {
    /**
     * rabbitmq演示直接队列
     * @return {@code Queue}
     * 1、name:    队列名称
     * 2、durable: 是否持久化
     * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
     * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
     *
     */
    @Bean
    public Queue rabbitmqDemoQueue() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }

    @Bean
    public Queue rabbitmqDemoQueue1() {

        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC+"1", true, false, false);
    }

    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, true);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        //Fanout交换机
        return new FanoutExchange(RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, true, true);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoQueue())
                //到交换机
                .to(rabbitmqDemoFanoutExchange());
                //并设置匹配键
//                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }

    @Bean
    public Binding bindDirect1() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoQueue1())
                //到交换机
                .to(rabbitmqDemoFanoutExchange());
        //并设置匹配键
//                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }

    @Bean
    public Receiver receiver1() {
        return new Receiver(1);
    }

    @Bean
    public Receiver receiver2() {
        return new Receiver(2);
    }

    @Bean
    public Receiver receiver3() {
        return new Receiver(3);
    }

    @Bean
    public Receiver receiver4() {
        return new Receiver(4);
    }
}

统一配置中也可添加部分键值等

@Configuration
public class RabbitMQConfig {
    public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq_demo_topic";

    public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbitmq_demo_direct_exchange";

    public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq_demo_direct_routing";

    public static final String RABBITMQ_DEMO_FANOUT_EXCHANGE = "rabbitmq_demo_fanout_exchange";
}

Routing

使用direct交换机,用RoutingKey进行绑定配置

Topics

使用topic类型交换机,在RoutingKey中使用特殊字符

RPC

Publisher Confirms

配置详解

基础信息

spring.rabbitmq.host: 默认localhost
spring.rabbitmq.port: 默认5672
spring.rabbitmq.username: 用户名
spring.rabbitmq.password: 密码
spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机
spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresses后host 
spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false
spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false
spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时 

SSL

spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false
spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 访问key store的密码
spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码
spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 类型.
spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置
spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证
spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证

Cache

spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel 和 connection

Listener

simple为前两种工作模式

direct为后四种

spring.rabbitmq.listener.type=simple: 容器类型.simple或direct
 
spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器
spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒
spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数
spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.stateless=true: 重试时是否无状态
 
spring.rabbitmq.listener.direct.acknowledge-mode= ack模式
spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器
spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量.
spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队.
spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔.
spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败.
spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量.
spring.rabbitmq.listener.direct.retry.enabled=false  是否启用发布重试机制.
spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # 第一次和第二次尝试传递消息的时间间隔
spring.rabbitmq.listener.direct.retry.max-attempts=3 # 发送消息的最大尝试次数
spring.rabbitmq.listener.direct.retry.max-interval=10000ms # 最大重试时间间隔
spring.rabbitmq.listener.direct.retry.multiplier=1 # 应用于上一重试间隔的乘数
spring.rabbitmq.listener.direct.retry.stateless=true # 重试是否无状态

Template

spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled=false: 发送重试是否可用 
spring.rabbitmq.template.retry.max-attempts=3: 最大重试次数
spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier=1: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval=10000: 最大重试时间间隔

消息转换器

RabbitMQ默认使用SimpleMessageConverter,基于]DK的ObjectOutputStream序列化转换消息,有速度和安全性的缺陷。

Json序列化

建议在common包做统一配置

导包

		<dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

注入bean

	@Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

注解式用法

不需要配置队列,交换机和绑定的Bean,改在消费者的@RabbitListener上进行配置。如果交换机和队列已存在,并和此处声明的配置不同会报错。

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC, durable = "true",autoDelete = "false",exclusive = "false"),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT),
                key = "s"),
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC + "1", durable = "true",autoDelete = "false",exclusive = "false"),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT),
                key = "s")

})

由于未注入Bean,发送方此时需要显式使用交换机名/队列名等进行发送。

@Component
public class Sender {

    @Resource
    private RabbitTemplate amqpTemplate;


    public void send(String msg) {
        amqpTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_FANOUT_EXCHANGE,"", msg);
    }

}

QueueBuilder继承了AbstractBuilder,包括一个封装的map,存储了@Queue构建时的参数。包括ttl,死信交换机的配置等。在做绑定时可以用参数arguments和注解@Arguments做参数配置。具体可用参数可从QueueBuilder中查看。设定ttl要注意指定参数类型,不然会报错。

同样的,@QueueBinding也可以使用参数arguments和注解@Arguments做参数配置,但似乎没有可用参数。

同样的,@Exchange也可以使用参数arguments和注解@Arguments做参数配置。可用参数只有alternate-exchange,如果发送消息的时候根据routingkey并没有把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上。参考文章

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(name = RabbitMQConfig.RABBITMQ_DEMO_TOPIC, autoDelete = "true",exclusive = "false"
                        ,arguments = {@Argument(name = "x-message-ttl", value = "10000",type = "java.lang.Integer"),
                                @Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),
                                @Argument(name = "x-dead-letter-routing-key", value = "dead")
                                }),
                exchange = @Exchange(name = RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, type = ExchangeTypes.DIRECT,autoDelete = "true"
                        ,arguments = {@Argument(name = "alternate-exchange", value = "alternate.exchange")}),
                key = "s")
})

常见场景

消息可靠性

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack

    • 消息未投递到交换机,返回nack

  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  #验证消息有没有顺利到达MQ simple:同步等待confirm结果直到超时 correlated:异步回调,定义回调类,返回结果时会回调这个类
    publisher-returns: true #验证消息有没有正确路由到相应的队列的功能
    template:
      mandatory: true  # 定义消息路由失败时的策略 true:调用ReturnCallBack false:丢弃消息

配置confirm

  • 给单条发送的消息配置
public void send(String msg) {
        //设置消息唯一id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //设置消息投递到交换机的回调
        correlationData.getFuture().addCallback(result -> {
            //success callback
            if (result != null && result.isAck()) {
                log.debug("消息投递到交换机成功:{}" , correlationData.getId());
            }else{
                log.error("消息投递到交换机失败:{}" , correlationData.getId());
            }
            //failure callback
        }, ex -> {
            log.error("消息发送失败" + ex.getMessage());
        });
        amqpTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE,"es", msg,correlationData);
    }
  • 全局配置

    在bean配置中给RabbitTemplate做配置

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack){
                    log.debug("消息投递到交换机成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }else {
                    log.error("消息投递到交换机失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
                }
            });
        }
    }
    
    

全局bean配置ReturnCallsBack 发送者回执

//setReturnCallback()从springboot2.3已弃用,将多个参数封装到ReturnedMessage中
        rabbitTemplate.setReturnsCallback(returned -> {
            // 日志
            log.error("消息发送失败,消息内容为:{},错误原因为:{},错误编码为:{},交换机为:{},路由键为:{}",
                    returned.getMessage().toString(),
                    returned.getReplyText(),
                    returned.getReplyCode(),
                    returned.getExchange(),
                    returned.getRoutingKey());
        });

消息持久化

默认交换机,队列,消息都是持久化

消费者消息确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每个消费者未确认的消息的最大数量。默认值为3。建议设置为1,以便在消费者处理消息时不会将其分配给其他消费者。
        acknowledge-mode: auto #none:关闭ack; manual:手动ack; auto:自动ack

消费失败重试

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 是否开启重试
          initial-interval: 1000 #第一次重试间隔时间
          multiplier: 3 # 重试间隔时间递增倍数
          max-attempts: 3 # 最大重试次数
          stateless: true # 是否是无状态的重试 如果业务包含事务则改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

eg:

@Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }

延迟消息

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信:

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机。

RabbitMQ学习笔记,消息队列,rabbitmq,学习,笔记,spring boot,spring,java,分布式

延迟消息

给消息或队列设置ttl,通过过期转给(死信)交换机,实现延迟消息。若两方法共存则取短

Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setExpiration("10000")
                .build();

也可以给MQ安装官方DelayExchange插件实现

https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

消息堆积

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃。

解决方案:

  • 队列上绑定多个消费者,提高消费速度
  • 给消费者开启线程池,提高消费速度
  • 使用惰性队列,可以保存更多消息

惰性队列

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

注解声明:

在声明队列时加

@Argument(name = "x-queue-mode" ,value = "lazy")

Bean声明:

用QueueBuilder构造队列,添加QueueBuilder.lazy()

高可用

搭设集群文章来源地址https://www.toymoban.com/news/detail-797940.html

到了这里,关于RabbitMQ学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot项目使用RabbitMQ队列

    一、Rabbitmq的安装 RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,**即需要先安装部署Erlang环境再安装RabbitMQ环境。 erlang的安装在windows中直接点击安装即可。 安装完erlang后设置erlang的环境变量ERLANG_HOME。 然后安装rabbitmq。 安装成功后。 可以在浏览器中输入http:

    2024年02月02日
    浏览(40)
  • Spring Boot 中的 RabbitMQ 消息发送配置

    RabbitMQ 是一个开源的消息代理系统,它实现了 AMQP(高级消息队列协议)标准,并支持多种消息传输协议。它具有高可用性、可扩展性和可靠性,广泛应用于分布式系统、微服务架构、异步任务处理、日志收集等场景。 RabbitMQ 的核心概念包括: Producer:消息生产者,负责将消

    2024年02月07日
    浏览(43)
  • Spring Boot 整合 RabbitMQ 实现延迟消息

    消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。 随着 AMQP 草案的发布,两个月

    2024年04月08日
    浏览(50)
  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(57)
  • Java RabbitMQ消息队列简单使用

    消息队列,即MQ,Message Queue。 消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    2024年02月12日
    浏览(61)
  • RabbitMQ消息队列__学习报告

    一、Linux部署RabbitMQ (一)环境 1.rabbitmq1:192.168.163.128 [root@rabbitmq1 ~]# hostnamectl set-hostname client 2.rabbitmq2:192.168.163.132 [root@rabbitmq2 ~]# hostnamectl set-hostname haproxy 3.rabbitmq3:192.168.163.135 [root@rabbitmq3~]#  hostnamectl set-hostname rabbitmq3 4.域名解析 [root@localhost ~]# vim /etc/hosts 192.168.163.128

    2023年04月25日
    浏览(31)
  • 【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

    这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。 目录 一、防止消息丢失 1.1、消息确认机制(生产者) (1)生产者丢失消息 (2)生产者消息确认机制 1.2、消息确认机制(消费者) (1)消费者丢失消息

    2024年02月02日
    浏览(51)
  • Java开发 - 消息队列之RabbitMQ初体验

    目录 前言 RabbitMQ 什么是RabbitMQ RabbitMQ特点 安装启动 RabbitMQ和Kafka的消息收发区别 RabbitMQ使用案例 添加依赖 添加配置 创建RabbitMQ配置类 RabbitMQ消息的发送 RabbitMQ消息的接收 测试 结语 前一篇,我们学习了Kafka的基本使用,这一篇,我们来学习RabbitMQ。他们作为消息队列本身都具

    2024年02月03日
    浏览(44)
  • GoLong的学习之路,进阶,RabbitMQ (消息队列)

    快有一周没有写博客了。前面几天正在做项目。正好,项目中需要MQ(消息队列),这里我就补充一下我对mq的理解。其实在学习java中的时候,自己也仿照RabbitMQ自己实现了一个单机的mq,但是mq其中一个特点也就是,分布式我在项目中没有涉及。这里我用go语言将RabbitMQ的操作

    2024年02月03日
    浏览(47)
  • Spring Boot 项目应用消息服务器RabbitMQ(简单介绍)

    本章讲述的是在用户下单环节,消息服务器RabbitMQ 的应用 在写一个电商项目的小demo,在电商项目中,消息服务器的应用: 1、订单状态通知:当用户下单、支付成功、订单发货、订单完成等关键节点时,可以通过消息服务器向用户发送相应的订单状态通知。 2、消息推送:通

    2024年02月13日
    浏览(87)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包