【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性

这篇具有很好参考价值的文章主要介绍了【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

1 RabbitMQ原理剖析

1.1 消息队列执行过程

1.客户端连接到消息队列服务器,打开一个Channel。

2.客户端声明一个Exchange,并设置相关属性。

3.客户端声明一个Queue,并设置相关属性。

4.客户端使用Routing key,在Exchange和Queue之间建立好绑定关系。

5.客户端投递消息到Exchange。

6.Exchange接收到消息后,就根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个队列里。有三种类型的Exchanges:direct,fanout,topic,每个实现了不同的路由算法(routing algorithm):

  • Direct exchange:完全根据key进行投递的叫做Direct交换机。如果Routing key匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。例如,绑定时设置了Routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
  • Fanout exchange:不需要key的叫做Fanout交换机。它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
  • Topic exchange:对key进行模式匹配后进行投递的叫做Topic交换机。比如符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。

1.2 消息队列的创建

        Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

        那么谁应该负责创建这个queue呢?是Consumer,还是Producer?

        如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。

        Queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用轮循的方式(round-robin)的方式均衡的发送给不同的Consumer。

1.3 消息的ack机制

        默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer。

        如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。

        那么什么是正确收到呢?通过ack。

        每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack(Consumer的basic.ack),也可以自动的ack(订阅Queue时指定auto_ack为true)。

        如果有数据没有被ack,那么RabbitMQ Server会把这个信息发送到下一个Consumer。

        如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。

        而且ack的机制可以起到限流的作用(Benefit to throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。

        当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。

        没有正确响应呢?

        如果Consumer接收了一个消息就还没有发送ack就与RabbitMQ断开了,RabbitMQ会认为这条消息没有投递成功会重新投递到别的Consumer。

        如果Consumer本身逻辑有问题没有发送ack的处理,RabbitMQ不会再向该Consumer发送消息。RabbitMQ会认为这个Consumer还没有处理完上一条消息,没有能力继续接收新消息。

        我们可以善加利用这一机制,如果需要处理过程是相当复杂的,应用程序可以延迟发送ack直到处理完成为止。这可以有效控制应用程序这边的负载,不致于被大量消息冲击。

1.4 消息拒绝

        由于要拒绝消息,所以ack响应消息还没有发出,这里拒绝消息可以有两种选择:

        Consumer直接断开RabbitMQ,这样RabbitMQ将把这条消息重新排队,交由其它Consumer处理。这个方法在RabbitMQ各版本都支持,这样做的坏处就是连接断开增加了RabbitMQ的额外负担,特别是consumer出现异常每条消息都无法正常处理的时候。

        RabbitMQ 2.0.0可以使用 basic.reject 命令,收到该命令RabbitMQ会重新投递到其它的Consumer。如果设置requeue为false,RabbitMQ会直接将消息从queue中移除。

        其实还有一种选择就是直接忽略这条消息并发送ACK,当你明确知道这条消息是异常的不会有Consumer能处理,可以这样做抛弃异常数据。

        为什么要发送basic.reject消息而不是ACK?RabbitMQ后面的版本可能会引入”dead letter”队列,如果想利用dead letter做点文章就使用basic.reject并设置requeue为false。

1.5 消息持久化

        RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:

1.Exchange持久化,在声明时指定durable => 1

2.Queue持久化,在声明时指定durable => 1

3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

        若Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;而Exchange和Queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

        Consumer从durable queue中取回一条消息之后并发回了ack消息,RabbitMQ就会将其标记,方便后续垃圾回收。如果一条持久化的消息没有被consumer取走,RabbitMQ重启之后会自动重建exchange和queue(以及bingding关系),消息通过持久化日志重建再次进入对应的queues,exchanges。

2 进阶特性

2.1 消费者并发消费

        让消费者可以开启多个线程并发去消费消息,可以配合上方工作队列,只需要加配置:

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    #host: 127.0.0.1
    #port: 5672
    username: 你的账号
    password: 你的密码
    virtual-host: /
    # 消费者配置
    listener:
      simple:
        concurrency: 2 # 并发数
        max-concurrency: 10  #最大并发数

生产者

正常发送消息,发送10个

// 并发消费
@GetMapping("/test13")
public void test13(){
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("concurrency","测试并发消费消息");
    }
}

消费者

消费者也正常消费。

@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
    long l = System.currentTimeMillis();
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name = " + name);
    Thread.sleep(1000); // 休眠一秒,好看效果
    long l2 = System.currentTimeMillis();
    System.out.println("time" + (l2-l)/1000);
}

打印日志发现,并发消费生效:

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

配合工作队列,消费速度就非常快了。

多个消费者

@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name = "+new Date() + name);
    Thread.sleep(1000); // 休眠一秒,好看效果
}

@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency1(String msg) throws InterruptedException {
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name1 = "+new Date() + name);
    Thread.sleep(1000); // 休眠一秒,好看效果
}

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

2.2 批量发送

实现多个消息批量发送,可配置每次发送几个,不足发送数时,等待超时后也继续发送.

批量发送配置类

// 批量发送配置
@Configuration
public class RabbitBatchSendConfig {
    @Resource
    ConnectionFactory connectionFactory;
    /**
     * 注入一个批量 template
     * Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
     * <p>
     * 【数量】batchSize :超过收集的消息数量的最大条数。
     * 【空间】bufferLimit :超过收集的消息占用的最大内存。
     * 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。
     * 不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
     *
     * @return BatchingRabbitTemplate
    */
    @Bean
    public BatchingRabbitTemplate batchRabbitTemplate() {
        // 创建 BatchingStrategy 对象,代表批量策略
        // 超过收集的消息数量的最大条数。
        int batchSize = 10; 
        // 例:每次发送10条
        // 每次批量发送消息的最大内存 
        bint bufferLimit = 1024 * 1024;
        // 超过收集的时间的最大等待时长,单位:毫秒
        int timeout = 10 * 1000; 
        // 例:不足10条时,等待10秒继续发送
        BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);
        // 创建 TaskScheduler 对象,用于实现超时发送的定时器
        TaskScheduler taskScheduler = new ConcurrentTaskScheduler();
        // 创建 BatchingRabbitTemplate 对象
        BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
        batchTemplate.setConnectionFactory(connectionFactory);
        return batchTemplate;
    }
}

生产者

@Resourceprivate BatchingRabbitTemplate batchingRabbitTemplate;
// 批量发送
@GetMapping("/test14")
public void test14(){
    for (int i = 0; i < 15; i++) {
        batchingRabbitTemplate.convertAndSend("batchSend","批量发送");
    }
}

消费者

@RabbitListener(queuesToDeclare = @Queue(value = "batchSend"))
public void batchSend(String msg){
    System.out.println("msg = " +new Date() + msg);
}

        可以看到消息批量发送已实现,不足10条的按配置等待10秒后发送。

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

2.3 批量消费

        实现多个消息批量消费,可配置每次消费几个,不足消费数时,等待超时后也继续消费

批量消费配置类

/**
 *批量消费    
*/
@Configuration
public class RabbitBatchConsumerConfig {
    @Resource
    ConnectionFactory connectionFactory;
    
    @Resource
    SimpleRabbitListenerContainerFactoryConfigurer configurer;
    
    /**
     *配置一个批量消费的 SimpleRabbitListenerContainerFactory
    */
    @Bean(name = "consumer10BatchContainerFactory")

    public SimpleRabbitListenerContainerFactory consumer10BatchContainerFactory() {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();  
        configurer.configure(factory, connectionFactory);

        // 这里是重点 配置消费者的监听器是批量消费消息的类型
        factory.setBatchListener(true);

        // 一批十个

        factory.setBatchSize(10);

        // 等待时间 毫秒 , 这里其实是单个消息的等待时间 指的是单个消息的等待时间

        // 也就是说极端情况下,你会等待 BatchSize * ReceiveTimeout 的时间才会收到消息

        factory.setReceiveTimeout(10 * 1000L);

        factory.setConsumerBatchEnabled(true);
        
        return factory;

    }
}

生产者

@GetMapping("/test13")
public void test13(){
    for (int i = 0; i < 16; i++) {
        rabbitTemplate.convertAndSend("batchConsume","测试批量消费消息");
    }
}

消费者

// 指定containerFactory
@RabbitListener(queuesToDeclare = @Queue(value = "batchConsume"),containerFactory = "consumer10BatchContainerFactory")
public void batchSend(List<Message> msg){ 
    //接收换成List
    for (int i = 0; i < msg.size(); i++) {
    System.out.println("msg = " +new Date() + msg.get(i).getBody());

    }

}

        可以看到消息批量消费已实现,不足10条的按配置等待10秒后消费:

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

2.4 基于插件延迟队列

        延迟队列非常常用且好用,可以将消息发送后使消费者延迟接收。

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

        RabbitAdmin配置 RabbitAdmin是用于对交换机和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息的组件。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;

    
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

        connectionFactory.setAddresses(host);

        connectionFactory.setUsername(username);

        connectionFactory.setPassword(password);

        connectionFactory.setVirtualHost(virtualhost);

        
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

封装发送延迟队列工具类

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class DelayedQueue {
    // routingKey
    private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    // 延迟队列交换机
    private static final String DELAYED_EXCHANGE = "delayed.exchange";
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;
    /**
     *发送延迟队列
     *@param queueName 队列名称
     *@param params 消息内容
     *@param expiration 延迟时间 毫秒
    */
    public void sendDelayedQueue(String queueName, Object params, Integer expiration) {
        // 先创建一个队列
        Queue queue = new Queue(queueName);
        rabbitAdmin.declareQueue(queue);
        // 创建延迟队列交换机
        CustomExchange customExchange = createCustomExchange();
        rabbitAdmin.declareExchange(customExchange);
        // 将队列和交换机绑定
        Binding binding = BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
        rabbitAdmin.declareBinding(binding);
        // 发送延迟消息
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE, DELAYED_ROUTING_KEY, params, msg -> {

            // 发送消息的时候 延迟时长
            msg.getMessageProperties().setDelay(expiration);
            return msg;

        });
    }
    
     public CustomExchange createCustomExchange() {
         Map<String, Object> arguments = new HashMap<>();
         
         /*
          *参数说明:
          *1.交换机的名称
          *2.交换机的类型
          *3.是否需要持久化
          *4.是否自动删除
          *5.其它参数
        */
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);
    }
}

生产者

@Autowired
private DelayedQueue delayedQueue;
/**
*发送延迟队列
*@param queueName 队列名称
*@param params 消息内容
*@param expiration 延迟时间 毫秒
*/

@GetMapping("/test9")
public void topicTest8() {

    delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);
}

消费者

@RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))
public void declareExchange2(String message){
    System.out.println("delayTest2 = " + message);
}

2.5 TTL队列

        TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除。 封装发送TTL队列工具类

import org.springframework.amqp.core.;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class TtlQueue {
    // routingKey
    private static final String TTL_KEY = "ttl.routingkey";
    private static final String TTL_EXCHANGE = "ttl.exchange";
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;
    /**
     *发送TTL队列
     *@param queueName 队列名称
     *@param params 消息内容
     *@param expiration 过期时间 毫秒
    */

    public void sendTtlQueue(String queueName, Object params, Integer expiration) {
        /**
         *----------------------------------先创建一个ttl队列--------------------------------------------
        */
        
       Map<String, Object> map = new HashMap<>();
       // 队列设置存活时间,单位ms,必须是整形数据。
       map.put("x-message-ttl",expiration);
       /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
       Queue queue = new Queue(queueName,true,false,false,map);
       rabbitAdmin.declareQueue(queue);
   
       /**
        *---------------------------------创建交换机---------------------------------------------
       */
    
        DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);
        rabbitAdmin.declareExchange(directExchange);
    
        /**
         *---------------------------------队列绑定交换机---------------------------------------------
         */
     
         // 将队列和交换机绑定
         Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);
         rabbitAdmin.declareBinding(binding);
         // 发送消息
         rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);
     }
 }

生产者

@Autowired
private TtlQueue ttlQueue;
/**
 *发送TTL队列
 *@param queueName 队列名称
 *@param params 消息内容
 *@param expiration 过期时间 毫秒
*/

@GetMapping("/test10")
public void topicTest10() {
    ttlQueue.sendTtlQueue("ttlQueue","这是消息内容",5000);
}

消费者

@RabbitListener(queues = "ttlQueue" )
public void ttlQueue(String message){
    System.out.println("message = " + message);
}

2.6 死信队列

        DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

消息变成死信的几种情况:

  1. 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
  2. 消息TTL过期
  3. 队列达到最大长度

流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。

【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性,云原生进阶-PaaS专栏,云原生,paas,中间件,rabbitmq,java

封装发送死信队列工具类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class DLXQueue {
    // routingKey
    private static final String DEAD_ROUTING_KEY = "dead.routingkey";
    private static final String ROUTING_KEY = "routingkey";
    private static final String DEAD_EXCHANGE = "dead.exchange";
    private static final String EXCHANGE = "common.exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;

    /**
     * 发送死信队列,过期后进入死信交换机,进入死信队列
     * @param queueName 队列名称
     * @param deadQueueName 死信队列名称
     * @param params 消息内容
     * @param expiration 过期时间 毫秒
    */
    public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){
        /**
         * ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
        */
        Map<String, Object> map = new HashMap<>();
        // 队列设置存活时间,单位ms,必须是整形数据。
        map.put("x-message-ttl",expiration);
        // 设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信交换器路由键
        map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
        Queue queue = new Queue(queueName,true,false,false,map);rabbitAdmin.declareQueue(queue);

        /**
         * ---------------------------------创建交换机---------------------------------------------
        */
        DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
        rabbitAdmin.declareExchange(directExchange);

        /**
         * ---------------------------------队列绑定交换机---------------------------------------------
        */
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
        rabbitAdmin.declareBinding(binding);

        /**
         * ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
        */
        DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
        rabbitAdmin.declareExchange(deadExchange);
        Queue deadQueue = new Queue(deadQueueName,true,false,false);
        rabbitAdmin.declareQueue(deadQueue);

        /**
         * ---------------------------------队列绑定死信交换机---------------------------------------------
        */
        // 将队列和交换机绑定
        Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
        rabbitAdmin.declareBinding(deadbinding);
        // 发送消息
        rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);
    }
}

生产者

@Autowiredprivate DLXQueue dlxQueue;
/**
 * 发送死信队列,过期后进入死信交换机,进入死信队列
 * @param queueName 队列名称
 * @param deadQueueName 死信队列名称
 * @param params 消息内容
 * @param expiration 过期时间 毫秒
*/
@GetMapping("/test11")
public void topicTest11() {
    dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);
}

消费者

// 接收转移后的队列消息
@RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))
public void ttlQueue(String message){
    System.out.println("message = " + message);
}

2.7 消息确认

2.7.1 发送消息确认机制

为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器。

配置

spring:
    rabbitmq:
        host: 47.99.110.29
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        listener:
            simple:
                prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
        publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
        publisher-returns: true  #确认消息已发送到队列(Queue)

        如果有使用rabbitAdmin配置的话,那里也需要加配置。

修改RabbitAdmin配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;
    
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
     return rabbitAdmin;
    }
}

实现发送消息确认接口

        消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调。

/**
 *消息发送确认配置
*/

@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct 
    // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
行
    public void init(){
        rabbitTemplate.setConfirmCallback(this);

    }
    
    /**
     *交换机不管是否收到消息的一个回调方法
     *@param correlationData 消息相关数据
     *@param ack 交换机是否收到消息
     *@param cause 失败原因
    */
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){ 
            // 消息投递到broker 的状态,true表示成功
            System.out.println("消息发送成功!");
        }else { 
            // 发送异常
            System.out.println("发送异常原因 = " + cause);
        }
    }
}

实现发送消息回调接口

        如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct 
    // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执行
    public void init(){
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"+"退回原因为:"+returnedMessage.getReplyText());
        // 回退了所有的信息,可做补偿机制
    }
}

2.7.2 消费者消息确认机制

        为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。

配置

spring:
  rabbitmq:
    host: 47.99.110.29
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #消费者配置
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
        acknowledge-mode: manual # 设置消费端手动ack确认
        retry:
          enabled: true # 是否支持重试
    #生产者配置
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)

channel.basicAck消息确认

        消费者修改,利用消费者参数Channel 进行消息确认操作。

@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) 
// queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
    // 消息
    System.out.println("msg = " + msg);

    /**
     * 确认
     * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
     * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
    */
    
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

channel.basicNack消息回退

        将消息重返队列

@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) 
 // queuesToDeclare 自动声明队列
 public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
     try {
         // 消息
         System.out.println("msg = " + msg);
         throw new RuntimeException("来个异常");
     } catch (Exception e) {
         e.printStackTrace();
        System.out.println("消息消费异常,重回队列");
        
         /**
          *deliveryTag:表示消息投递序号。
          *multiple:是否批量确认。
          *requeue:值为 true 消息将重新入队列。
        */
        
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    }
        
    // 确认
    /**
     * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
     * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
    */
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

channel.basicReject消息拒绝

        拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

/**
 *消息拒绝
 *deliveryTag:表示消息投递序号。
 *requeue:值为 true 消息将重新入队列。
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

封装消息确认处理类

链接: 封装rabbitmq消息确认、异常回退类_@rabbitlistener 异常处理方式-CSDN博客

参考链接

rabbitmq专栏

史上最透彻的 RabbitMQ 可靠消息传输实战 - 掘金

RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读 - 掘金

RabbitMQ原理及实现_rabbitmq实现-CSDN博客

RabbitMQ的工作模式及原理

RabbitMQ如何保证消息的可靠性投递与消费?

Linux安装Erlang和RabbitMQ详细步骤

rabbitmq详解-CSDN博客

RabbitMQ(一)——常见消息中间件

深入理解:RabbitMQ的前世今生

RabbitMQ技术详解-架构

透彻rabbitmq - 知乎

消息队列的使用场景是怎样的? - 知乎

RabbitMQ原理 - 知乎

RabbitMQ 原理解析

RabbitMQ原理详解_rabbitmq工作原理-CSDN博客

RabbitMQ 基本概念介绍_rabbitmq基本概念-CSDN博客

RabbitMQ系列二(构建消息队列机制)_rabbitmq系列二(构建消息队列)-CSDN博客

RabbitMQ消息队列(二)-RabbitMQ消息队列架构与基本概念

RabbitMQ基础概念详解文章来源地址https://www.toymoban.com/news/detail-833206.html

到了这里,关于【云原生进阶之PaaS中间件】第四章RabbitMQ-4.1-原理机制与进阶特性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【云原生进阶之PaaS中间件】第一章Redis-1.4过期策略

    【云原生进阶之PaaS中间件】第一章Redis-1.4过期策略

            除了string独有设置过期时间的方法,其他类型都需依靠expire方法设置时间,若: 未设置时间,则缓存永不过期 设置过期时间,但之后又想让缓存永不过期,使用persist         设置key的过期时间。超时后,将会自动删除该key。在Redis的术语中一个key的相关超时

    2024年02月09日
    浏览(29)
  • 【云原生进阶之PaaS中间件】第一章Redis-2.3.2哨兵模式

    【云原生进阶之PaaS中间件】第一章Redis-2.3.2哨兵模式

            由于无法进行主动恢复,因此主从模式衍生出了哨兵模式。哨兵模式基于主从复制模式,只是引入了哨兵来监控与自动处理故障。Redis Sentinel是社区版本推出的原生高可用解决方案,Redis Sentinel部署架构主要包括两部分:Redis Sentinel集群和Redis数据集群,其中Redis

    2024年02月06日
    浏览(29)
  • 【云原生进阶之PaaS中间件】第一章Redis-1.3Redis配置

    【云原生进阶之PaaS中间件】第一章Redis-1.3Redis配置

            Redis支持采用其内置默认配置的方式来进行启动,而不需要提前配置任何文件,但是这种启动方式只推荐在测试和开发环境中使用,但更好的方式是通过提供一个Redis的配置文件来对Redis进行配置, 这个配置文件一般命名为’redis.conf’。         Redis的配置文件

    2024年02月09日
    浏览(34)
  • 【云原生进阶之PaaS中间件】第二章Zookeeper-3.2架构详解

    【云原生进阶之PaaS中间件】第二章Zookeeper-3.2架构详解

    » 领导者(leader),负责进行投票的发起和决议,更新系统状态 » 学习者(learner),包括跟随者(follower)和观察者(observer),follower用于接受客户端请求并想客户端返回结果,在选主过程中参与投票 » Observer可以接受客户端连接,将写请求转发给leader,但observer不参加投票

    2024年02月08日
    浏览(32)
  • 【云原生进阶之PaaS中间件】第一章Redis-2.4缓存更新机制

    【云原生进阶之PaaS中间件】第一章Redis-2.4缓存更新机制

            无论先操作db还是cache,都会有各自的问题,根本原因是cache和db的更新不是一个原子操作,因此总会有不一致的问题。想要彻底解决这种问题必须将cache和db的更新操作归在一个事务之下(例如使用一些分布式事务,或者强一致性的分布式协议)。或者采用串行化,

    2024年02月10日
    浏览(28)
  • 【云原生进阶之PaaS中间件】第一章Redis-2.3.3集群模式

    【云原生进阶之PaaS中间件】第一章Redis-2.3.3集群模式

            Redis集群是一个提供在多个Redis节点之间共享数据的程序集。它并不像Redis主从复制模式那样只提供一个master节点提供写服务,而是会提供多个master节点提供写服务,每个master节点中存储的数据都不一样,这些数据通过数据分片的方式被自动分割到不同的master节点上

    2024年02月10日
    浏览(31)
  • 【云原生进阶之PaaS中间件】第一章Redis-1.5.1安装配置

    【云原生进阶之PaaS中间件】第一章Redis-1.5.1安装配置

            在本节中,您将了解和学习Redis的环境安装设置。         要在Ubuntu上安装Redis,打开终端并键入以下命令 -         这将在Ubuntu机器上安装Redis。 1.2.1 安装步骤 1、首先使用 sudo brew install redis 命令一键安装Redis, 默认会安装在 /usr/local/bin 路径中。 2、cd 进

    2024年02月09日
    浏览(7)
  • 【云原生进阶之PaaS中间件】第一章Redis-1.6.1Java项目使用Redis

    【云原生进阶之PaaS中间件】第一章Redis-1.6.1Java项目使用Redis

            redis的java客户端很多,官方推荐的有三种: Jedis Lettuce Redisson Spring 对Redis 客户端进行了整合,提供了Spring Date Redis ,在Spring Boot项目中还提供了对应的Starter,即spring-boot-starter-data-redis。         使用Jedis操作Redis的步骤: 1.获取链接; 2.执行操作; 3.关闭连接

    2024年02月09日
    浏览(9)
  • 【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

    【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

    1.1.1 消费者群组         Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。         如上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。      

    2024年02月22日
    浏览(10)
  • 云原生中间件开源现状分析与华为中间件案例解读

    云原生中间件开源现状分析与华为中间件案例解读

    开源中间件在企业分布式架构搭建和服务治理中扮演着重要的角色,尤其是在解决我国网络高并发和业务复杂性问题方面。然而,尽管中间件市场由商业闭源厂商主导,提供了一系列基础中间件和数据类中间件以支持稳定的应用程序运行环境,开源中间件生态却相对分散和薄

    2024年02月02日
    浏览(7)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包