RabbitMq介绍和使用

这篇具有很好参考价值的文章主要介绍了RabbitMq介绍和使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概念

MQ,是Message Queue的缩写,遵循先进先出的原则

在项目中用于流量削峰,应用解耦,异步处理

流量削峰:处理大量订单时,将订单分散,部分用户下单后需要进行等待,但却防止了服务宕机

应用解耦:当进行服务之间的通信时,普通方式不能保证当前模块调用其他模块出错后还能正确运行,增加消息队列后,当前模块执行完后可直接返回,等待其他模块执行完毕,如果其他模块执行异常,也不会影响当前流程。

异步处理:在异步调用服务中,部分服务处理时间较长,此时可交给消息队列,当服务执行完毕后,发送一条消息告诉当前服务执行完毕了

核心1-生产者:消息发送者

核心2-交换机:接收生产者发送过来的数据,并将消息发送到一个队列或者多个队列,或者丢弃消息

核心3-队列:接收来自交换机的数据,并保存消息直到消费者消费消息

核心4-消费者:消费从队列中获取到的消息,大多数时等待接收消息

安装

这里我将RabbitMQ安装在docker中,使用版本为3.9.5

docker run -d --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management

解释:

  • 容器名字为some-rabbit
  • 5672是rabbitmq中的标准端口,将暴露在外部供我们使用
  • 15672 是 RabbitMQ 管理插件使用的端口,暴露出来为了使用 Web 应用程序进行管理,在 http://localhost:15672/ 中,可以找到一个用于 RabbitMQ 配置的管理页面。来自 Docker 容器的端口 15672 将暴露在宿主机上以供使用
  • docker中运行的容器将在后台进行运行

这里我是将docker安装在虚拟机中,所以需要将两个端口暴露出来方便使用

ufw allow 5672 
ufw allow 15672

意思就是允许放行5672和15672两个端口

AMQP

AMQP是高级消息队列协议的缩写。它是一个开放的标准协议,允许系统之间的信息传递。不同的服务器/系统可以相互通信,不受技术限制。AMQP通过TCP/IP连接的代理服务实现消息传递。它同时定义了网络层协议和消息代理的高层架构。

模型
rabbitmq端口15672和5672,rabbitmq,rabbitmq
1.一条包括生产者指定的路由密钥的信息
2.生产者发送信息
3.绑定在交换机上的队列收到消息
4.消息一直保存在队列中,知道消费者消费消息

引入jar包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建连接

class CreateChannel {
    public static Channel getChannel() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setPort(5672);
        factory.setUsername("");
        factory.setPassword("");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

配置文件

spring.rabbitmq.host=id address
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

队列

工作队列

工作队列(任务队列):避免大量消息集中处理,造成消息滞留无法处理,并且一个消息只能被处理一次

引入轮询的工作方式,工作线程之间竞争资源通过轮训的方式处理消息,防止消息被处理多次

在存在两个相同的队列时,生产者发送的数据是由消费者轮询消费

消息应答

自动应答

需要在高吞吐量和数据传输安全性方面进行权衡,这种模式仅试用于消费者可以高效并且在某种速率能够处理这些消息的情况下使用。不推荐使用

手动应答

手动应答分为确认应答和拒绝应答,确认应答是已经知道该消息且成功处理消息后将消息进行丢弃(basicAck)。拒绝应答分为单个消息拒绝和批量消息拒绝,即消费者获取通道中的首条消息或队列中存在的所有消息并拒绝(basicNack/basicReject)

手动应答可进行批量应答且减少网络拥堵

消息重新入队

消费者在消费信息过程中,可能出现丢失连接,导致消息在确认应答发送失败,此时该消息就处于未完全处理,可将消息重新入队,如果有其他消费者可进行处理,会将消息发送给可处理的消费者。保证消息不丢失。

持久化

为了保证消息不丢失,需要同时确保消息队列和交换机不丢失

队列持久化

在创建队列是修改durable参数为true

消息持久化

在生产者发送消息时声明消息是持久化的,在其他参数中添加MessageProperties.PERSISTENT_TEXT_PLAIN,这里声明的消息持久化并不能完全保证消息不丢失,可能在准备存储到磁盘但是没有完全存储完过程中出现问题

消息不公平分发

如果采用轮询方法,当某个消费者处理消息很慢时,会导致消息的堆积

在消费者消费消息过程中,设置不公平消费消息channel.basicQos,默认值为0,公平/轮询分发,1为不公平分发

使用不公平分发,绑定的消费者将不再一次消费消息,能者多劳

预取值

指定每个消费者消费多少条消息channel.basicQos(),当该方法中参数大于1后,表示当前消费者能消费多少条消息,该方法使用的前提是队列非自动应答,并且最好写在DeliverCallback中,我写在里面才生效

DeliverCallback deliverCallback = (consumeTag, message) -> {
    // 接收4条消息
    channel.basicQos(4);
    try {
        Thread.sleep(10000);
        System.out.println(new String(message.getBody()));
        // 增加消息是否批量处理
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

};

发布确认

能够解决消息丢失的问题

在生产者发送消息时,即使保证队列持久化、队列中的消息也保证了持久化,但是在传送到队列保存磁盘的过程中会导致信息丢失

发布确认即为生产者将消息发送到队列中,队列将消息保存到磁盘后,再给生产者发送一个消息

单个确认

是一种同步确认方法,能够确保每条消息持久化成功,但是发布速度慢,需要等待每条消息完成确认后才能对后续消息进行确认,造成消息堆积

开启确认 channel.confirmSelect()

消息发布后,通过channel.waitForConfirms()获取当前消息是否确认发布了

批量确认

相比单个确认来说,批量确认极大提高了吞吐量,但是当发生故障时,无法确认是那条消息出现问题,会造成信息丢失。需要将消息预先保存在内存中,为出现故障时数据可重新进行操作。

通过发送计数的方式,当达到某一阈值后再发送请求确认的消息

异步确认

通过异步方式,在不占用主要请求资源的同时,完成发送确认。相对于单个确认和批量确认,处理速度进一步的提升,可以明确的知道处理成功和处理失败的消息

解决发送失败的消息

处理未完成发送的消息时,最好将这些消息放到一个基于内存能够被发布线程访问的队列中,如ConcurrentLinkedQueue

在发送消息时,将所有消息以id为key,消息体为value放入ConcurrentSkipMap中,在确认回调时,如果是批量确认,通过headMap(Long deliveryTag)方法获取到所有发送成功的消息,然后clear(),单个删除就通过id进行删除。成功回调处理完后,剩下的就为确认失败的消息。

交换机

消息传递模型的核心思想是,生产者生产的消息从来不会直接发送到队列。并且生产者不知道这些消息传递到那些队列中。

生产者只能将消息发送到交换机中。交换机的主要工作是接收生产者发送的消息,再将它们推入队列中。交换机必须准确地知道如何处理这些消息,如放入特定队列还是许多队列或者是丢弃他们

类型

默认交换机
默认交换是一个预先声明的直接交换,其名称被设置为空字符串""。当一个消息被发送到默认交换所时,它将被路由到队列,队列名称等于消息路由键。每个创建的队列都会自动绑定到默认交换所,其路由键与队列名称相同。

Direct-直接
直接交换使用消息的路由密钥,将消息路由到一个队列中。路由键是由生产者设置的,作为消息头。消息中的路由密钥必须与绑定中指定的路由密钥完全匹配–它就像一个地址,告诉交换所消息应该去哪里。

Fanout-广播(发布订阅)
广播/扇出交换复制并将收到的消息发送到与之绑定的所有队列。提供的路由密钥将被简单地忽略。

Topic-主题
订阅交换机根据路由键通配符对消息进行路由。消息被传递到一个或多个队列。尽管支持带有通配符的路由键,但它不是强制性的–你仍然可以隐含地提供路由键,不需要任何通配符。

订阅模式中路由键是一个用.字符分隔的单词列表,例如,taxi.type.small和taxi.type.large。

并且可以使用 * 符号来匹配特定位置的任何单词,如taxi.*.large。# 表示匹配0个或更多的词,如taxi.type.# ,它将匹配所有以taxi.type开始的所有路由键。

绑定

让交换机与队列之间通过routingkey产生联系,根据绑定之将消息发送到对应的队列中

交换机使用

Direct Exchange
当你想进行一次交换时,直接交换很有用,并根据路由密钥过滤,指定信息的目标 “地址”。

一个例子是一个报告系统,它以指定的格式异步生成报告。用户点击 "生成报告 "按钮并选择目标格式–PDF或DOCX文件。系统通过向x.generate-report交换发送消息来安排报告的生成,并指定目标目的地为路由键–pdf或docx。在这种情况下,生产者为信息提供一个地址,它应该被传送到哪里。

Fanout Exchange
当你想把消息发送到一个或多个队列时,Fanout模式很有用,它是广播的理想选择。它类似于发布/订阅模型–作为消费者,你订阅了一些东西,而发布者将消息发送给所有订阅者。消息可以被不同的消费者以不同的方式处理。路由键将被忽略。

// 声明交换机,队列,RouteKey
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE_A = "fanout.queue.a";
public static final String FANOUT_QUEUE_B = "fanout.queue.b";
public static final String FANOUT_ROUTE_PARENT = "fanout.route.*";

// 创建一个fanout类型的交换机
// 参数为:交换机名,交换机类型,是否持久化
channel.exchangeDeclare(FANOUT_EXCHANGE, “fanout”, true);
// 创建两个队列
// 参数为:队列名,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare(FANOUT_QUEUE_A, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_B, true, false, false, null);

// 绑定交换机和队列
channel.queueBind(FANOUT_QUEUE_A, FANOUT_EXCHANGE, FANOUT_ROUTE_PARENT);
channel.queueBind(FANOUT_QUEUE_B, FANOUT_EXCHANGE, FANOUT_ROUTE_PARENT);

// 发送消息到交换机
channel.basicPublish(FANOUT_EXCHANGE,"",null,message,getBytes(StandardCharsets.UTF_8));
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));
        };

channel.basicConsume(FANOUT_QUEUE_A, true, deliverCallback, consumerTag -> {}); 

运行后,虽然使用的空串路由键,但是当前交换机绑定的两个队列都能收到消息

Topic-主题

相对于路由交换机和广播交换机,主题交换机在使用时更加的灵活

主题交换将多个消息消费者与过滤结合在一起,基于消息路由键和通配符匹配。当你想把一个消息发送到一个以上的有限队列,但不是所有的队列时。

该交换机使用的路由键必须为列表,以点号进行隔开,可以为任意单词进行拼接。其中还可以使用替换符,*可以代替一个单词,#可以替代零个或多个单词

如果路由键为**#**,那么这个队列能接收到所有消息;如果路由键中没有出现 ***** 和 # 时,这个队列就相当于直连模式

public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE_A = "topic.queue.a";
public static final String TOPIC_QUEUE_B = "topic.queue.b";
public static final String TOPIC_QUEUE_C = "topic.queue.c";
public static final String TOPIC_ROUTE_PARENT_A = "topic.route.*";
public static final String TOPIC_ROUTE_PARENT_B = "*.topic.route";
public static final String TOPIC_ROUTE_PARENT_C = "topic.#.route";
public static final String TOPIC_ROUTE_A = "topic.route.a";
public static final String TOPIC_ROUTE_B = "b.topic.route";
public static final String TOPIC_ROUTE_C = "topic.dd.aa.c";

channel.exchangeDeclare(TOPIC_EXCHANGE, TOPIC, true);

channel.queueDeclare(TOPIC_QUEUE_A, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_B, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_C, true, false, false, null);

channel.queueBind(TOPIC_QUEUE_A, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_A);
channel.queueBind(TOPIC_QUEUE_B, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_B);
channel.queueBind(TOPIC_QUEUE_C, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_C);

String sendMsg = "队列C能收到的消息";
channel.basicPublish(TOPIC_EXCHANGE,
                     TOPIC_ROUTE_C,
                     new AMQP.BasicProperties.Builder()
                     .headers(headerC)
                     .build(),
                  sendMsg.getBytes(StandardCharsets.UTF_8));

DeliverCallback deliverCallback = (consumerTag, message)->{
	System.out.println("收到的消息为:"+new 			String(message.getBody(),"UTF-8"));
};
channel.basicConsume(TOPIC_QUEUE_C,true,deliverCallback,consumerTag -> {});

该模式发送了消息后,能够在管理页面中看到,只有队列c接收到了信息,使用的TOPIC_ROUTE_PARENT_C路由键,其中的 # 替代了dd.aa,所以只有队列c能收到消息。

死信队列

死信队列表示消费者在某些特定的情况下无法消费队列中的消息,并且这些消息没有后续处理,这些消息就变为死信。当消息成为死信后,可以将它们发送到一个新的队列中,这个队列就称为死信队列。即死信队列信息来源于普通队列

在实际使用过程中,死信队列用来保证业务不丢失。或者处理设置了时间过期的消息(延迟队列)

当对消息设置了过期时间、队列满了无法再添加数据到队列中、消息被拒绝,都是出现死信的来源

一个简单的死信队列结构包括,一个生产者,两个消费者,两个队列。当普通队列中的消息发生过期等情况,将该条消息所在的队列通过绑定的队列发送到新的队列中

设置TTL

class ConsumerA {
    static final String NORMALEXCHANGE = "normal_exchange";
    static final String DEADEXCHANGE = "dead_exchange";

    static final String NORMALQUEUE = "normal_queue";
    static final String DEADQUEUE = "dead_queue";
    
    public static void main(String[] arg) throws Exception {
        Channel channel = CreateChannel.getCahnnel();
        channel.exchangeDeclare(NORMALEXCHANGE,BuiltinExchangeType.Direct,false);
        channel.exchangeDeclare(DEADEXCHANGE,BuiltinExchangeType.Direct,false);
        
        Map<String,Objcet> properties = new HashMap<>();
        // 普通队列设置死信交换机
        properties.put("x-dead-letter-exchange",DEADEXCHANGE);
        // 死信路由
        properties.put("x-dead-letter-routing-key","dead.key");
        // 过期时间
        properties.put("x-message-ttl",10000);
        channel.queueDeclare(NORMALQUEUE,false,false,false,properties);
        channel.queueDeclare(DEADQUEUE,false,false,false,null);
        
        DeliverCallback deliverCallback = (consumerTag,message) ->{
            System.out.println("接受到的消息为:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMALQUEUE,true,deliverCallback,consumerTag -> {});
    }
}
class Producter {
    static final String NORMALEXCHANGE = "normal_exchange";
    
    public static void main(String[] args) throws Exception {
        Channel channel = CreateChannel.getChannel();
        AMQP.BasicProperties properties = 
            new AMQP.BasicProperties().builder().expiration("4000").build();
        // 发送消息
        for(int i = 0; i < 10; i++) {
            String message = "发送的消息为:"+i;
            channel.basicPublish(NORMALEXCHANGE,
                                 "normal.key",
                                 properties,
                                 message.getBytes(StandardCharsets.UTF_8))
        }
    }
}

先启动ConsumerA,用来创建两个交换机和两个队列,然后启动Producter发送消息,此时ConsumerA能接收到消息

此时ConsumerA断开连接,Producter再次发送消息,此时消息堆积在队列中没有消费者消费消息,当过了设置的消息过期时间后,普通队列中的消息将会转移到死信队列中,这时可以在创建一个消费者来处理这些未完成消费的消息。

队列达到最大长度

class ConsumerA {
    static final String DEADEXCHANGE = "dead_exchange";

    static final String NORMALQUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = CreateChannel.getChannel();

        Map<String, Object> properties = new HashMap<>();
        properties.put("x-dead-letter-exchange",DEADEXCHANGE);
        properties.put("x-dead-letter-routing-key","dead.key");
        // 设置队列最大消息数
        properties.put("x-max-length",4);
        channel.queueDeclare(NORMALQUEUE, false, false, false, properties);
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            System.out.println("ConsumerA收到的消息:"+new String(message.getBody(),"UTF-8"));
        });
        channel.basicConsume(NORMALQUEUE,true,deliverCallback,consumerTag -> {});
    }
}
class Product {
    static final String NORMALEXCHANGE = "normal_exchange";
    static final String DEADEXCHANGE = "dead_exchange";
    static final String NORMALQUEUE = "normal_queue";
    static final String DEADQUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = CreateChannel.getChannel();
        channel.exchangeDeclare(NORMALEXCHANGE, BuiltinExchangeType.DIRECT, false);
        channel.exchangeDeclare(DEADEXCHANGE, BuiltinExchangeType.DIRECT, false);

        channel.queueBind(NORMALQUEUE, NORMALEXCHANGE, "normal.key");
        channel.queueBind(DEADQUEUE, DEADEXCHANGE, "dead.key");

        for (int i = 0; i < 9; i++) {
            String message = "消息"+i;
            channel.basicPublish(NORMALEXCHANGE,
                                 "normal.key",
                                 null,
                                 message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

当ConsumerA出现宕机等情况导致无法接收消息时,Product发送的消息将堆积在normal_queue中,但是normal_queue中设置了队列的最大容量,所以当队列堆满后,其他的消息将被发送到死信队列中。

消息被拒

class ConsumerA {
    static final String DEADEXCHANGE = "dead_exchange";

    static final String NORMALQUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = CreateChannel.getChannel();

        Map<String, Object> properties = new HashMap<>();
        properties.put("x-dead-letter-exchange", DEADEXCHANGE);
        properties.put("x-dead-letter-routing-key", "dead.key");
        channel.queueDeclare(NORMALQUEUE, false, false, false, properties);
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if ("消息5".equals(msg)) {
                System.out.println("拒绝的消息为:" + msg);
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("ConsumerA收到的消息:" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        });
        // 开启拒绝应答时,这里不能再自动应答,否则不生效
        channel.basicConsume(NORMALQUEUE, false, deliverCallback, consumerTag -> {
        });
    }
}

开启拒绝后,其他消息需要开启手动应答,而channel.basicConsume中的自动应答因为false,否则所有消息将会被当前消费者消费,不存在消息拒绝了。

延迟队列

延迟队列是死信队列中的一种,死信队列中的TTL就是延迟队列。

延迟队列内部是有序的

延迟队列使用场景:订单未在指定时间内支付就取消;发起申请未审批进行提醒;…

延迟队列在处理这些指定时间的消息时,相当于是定时任务,但是如果在这期间服务宕机,之前的定时任务将不再生效,且定时任务在处理大量任务时是通过轮询的方式进行处理,性能较低

Springboot中TTL延迟队列

模型:交换机X通过XA、XB路由键分别绑定QA、QB队列;QA、QB设置死信交换机Y,和死信路由键QABCY;死信队列Y通过路由键QABCY绑定队列QC

当QA、QB设置了不同的消息过期时间,当收到消息后,没有消费者消费,到达过期时间后,将消息发送给死信队列

声明配置交换机和队列的配置类

@Configuration
class TTLQueueConfig {
    // 交换机
    private static final String X_EXCHANGE = "X";
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    // 队列
    private static final String A_QUEUE = "QA";
    private static final String B_QUEUE = "QB";
    private static final String C_DEAD_LETTER_QUEUE = "QC";

    private static final String QAEX = "XA";
    private static final String QBEX = "XB";

    private static final String QABCEY = "YC";
    
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 队列A并绑定死信交换机
     *
     * @return {@link Queue}
     */
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> properties = new HashMap<>();
        // 设置死信交换机和routingKey,过期时间(10s)
        properties.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        properties.put("x-dead-letter-routing-key", QABCEY);
        properties.put("x-message-ttl", 10000);
        return QueueBuilder.durable(A_QUEUE).withArguments(properties).build();
    }

    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> properties = new HashMap<>();
        // 设置死信交换机和routingKey,过期时间(40s)
        properties.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        properties.put("x-dead-letter-routing-key", QABCEY);
        properties.put("x-message-ttl", 40000);
        return QueueBuilder.durable(B_QUEUE).withArguments(properties).build();
    }

    @Bean("queueC")
    public Queue queueC() {
        return QueueBuilder.durable(C_DEAD_LETTER_QUEUE).build();
    }

    @Bean
    public Binding queueABindX(@Qualifier("queueA") Queue queueA,
                               @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queueA).to(exchange).with(QAEX);
    }

    @Bean
    public Binding queueBBindX(@Qualifier("queueB") Queue queueB,
                               @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queueB).to(exchange).with(QBEX);
    }

    @Bean
    public Binding queueCBindY(@Qualifier("queueC") Queue queueC,
                               @Qualifier("yExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queueC).to(exchange).with(QABCEY);
    }
}

配置类用来监听死信队列QC

@Component
@Slf4j
class QueueCListenner {
    @RabbitListener(queues = "QC")
    public void consumerC(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), "UTF-8");
        log.info("当前时间{},死信队列收到消息{}", new Date().toString(), msg);
    }
}

通过接口发送消息

@RestController
@RequestMapping("send")
class SendController {
    @Autowired
    RabbitTemplate template;
    
    @GetMapping("info/{message}")
    public void sendMessage(@PathVariable String message) {
        template.convertAndSend("X", "XA", "10s队列:" + message);
        template.convertAndSend("X", "XB", "40s队列:" + message);
    }
}

现象

消耗时间Tue Jan 10 08:55:33 CST 2023,发送的消息为:卡上的纠纷
当前时间Tue Jan 10 08:55:52 CST 2023,死信队列收到消息10s队列:卡上的纠纷
当前时间Tue Jan 10 08:56:13 CST 2023,死信队列收到消息40s队列:卡上的纠纷

注意:这里QA、QB、QC使用的路由键需要一致,我刚开始操作的过程中可能没保持一致,导致发消息无法转发到死信队列中;QA、QB中配置的死信信息注意检查,配置死信交换机为x-dead-letter-exchange,死信路由x-dead-letter-routing-key

TTL的优化

以上的方式是通过在队列中设置延迟时间完成消息过期,但是在实际生产中,如果再产生一个新的队列,可能需要再添加一个队列的配置,所以接下来通过交换机发送消息时为队列设置过期时间。声明队列时,同样设置死信队列和死信路由,但是不设置过期时间。

template.convertAndSend("交换机","路由键","消息",pro->{pro.getMessageProperties().setExpiration("过期时间")})

TTL中存在的问题

RabbitMQ指挥检查第一个消息是否过期,当第二个消息时间比第一个消息时间短时,也不会优先执行第二个消息

解决-基于插件化实现延迟队列

发布确认

正常情况下,消息生产者发送的消息到交换机后,只要有消费者,消息就会被消费

在未知情况下,可能出现rabbitmq出现宕机,在此期间消息投递失败,进而消息丢失,使用消息确认机制,将消息放入缓存中

交换机确认

交换机通过回调接口,感知消息

回调接口处于RabbitTemplate中的内部接口ConfirmCallback,自定义回调函数时需要实现这个内部接口,重写conrim方法,如下

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 交换机消息确认回调函数
     *
     * @param correlationData 保存回调消息的ID即相关信息,需要在发送消息时设置其中的数据,否则为null
     * @param ack             是否接收成功,成功为true,失败为false
     * @param cause           原因,成功时为null,失败时为失败原因,想要失败时,可以在发送消息时用为申明的交换机
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String dataId = correlationData.getId() == null ? "" : correlationData.getId();
        if (ack) {
            log.info("交换机成功接收信息,消息ID为:{}", dataId);
        } else {
            log.info("交换机接收消息失败,失败原因为:{}",cause);
        }
    }
}

虽然实现了内部接口且重写了这个方法,但是这个方法实际上并没有生效,此时启动,RabbitTemplate依然会找自己内部的这个接口方法,所以我们需要将自定义的回调方法类注入到RabbitTemplage中

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate template;

    /**
     * 将当前实现的类注入到RabbitTemplate中的内部实现类
     * 如果不注入,将继续走原有的内部接口
     */
    @PostConstruct
    public void init() {
        template.setConfirmCallback(this);
    }
    ...
}

此时注入了,但依然没有收到这个回调函数发送的信息,需要添加配置,开启交换机确认,实现回调功能

spring.rabbitmq.publisher-confirm-type=correlated,当前使用

spring.rabbitmq.publisher-confirm-type=none,禁用发布模式,默认值

spring.rabbitmq.publisher-confirm-type=simple,第一种效果于correlated一致,第二种效果是消息发布成功后使用RabbitTemplate中的waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果判断下一步逻辑,如果waitForConfirmsOrDie返回了false,也就将channel关闭,后续不能进行操作。且该模式为同步确认消息(发送一条确认一条),消耗时间较长。

队列确认

生产者发送消息,如果交换机确认收到了,会直接发送给队列,在这过程中出现路由不存在,队列断开等情况,消息将被丢弃,而生产者不知道丢弃这个事件。

解决

开启回退功能spring.rabbitmq.publisher-returns = true

实现RabbitTemplate中的ReturnCallBack内部接口,重写returnedMessage方法,并将当前类注入到RabbitTemplate中,让其调用自定义的returnedMessage方法

class MyCallBack implements RabbitTemplate.ReturnsCallback{
    
     @Autowired
    RabbitTemplate template;

    /**
     * 将当前实现的类注入到RabbitTemplate中的内部实现类
     * 如果不注入,将继续走原有的内部接口
     */
    @PostConstruct
    public void init() {
        template.setReturnsCallback(this);
    }
    
    @Override
	public void returnedMessage(ReturnedMessage returned) {
        Message message = returned.getMessage();
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);

        String exchange = returned.getExchange();
        int replyCode = returned.getReplyCode();
        String replyText = returned.getReplyText();
        String routingKey = returned.getRoutingKey();


        log.info("从交换机{}退回的消息为{},失败状态为{},失败原因是{},使用的路由键为{}", exchange, msg, replyCode, replyText, routingKey);
    }
}

验证上述功能可在发送消息时修改一个不存在的路由键,以下为测试结果

从交换机confirm_exchange退回的消息为第二条消息,失败状态为312,失败原因是NO_ROUTE,使用的路由键为confirm.key12

备份交换机

备份交换机本质是当前交换机失效,就转给其他交换机继续处理消息,可实现备份和报警功能

模型:直连交换机无法将消息路由到队列时,将消息转发给其他交换机,通过其他交换机进行消息的备份或者预警

此时直连交换机将重新声明,指定备份交换机

public DirectExchange declareExchange() {
    return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE)
        .durable(true)
        .withArgument("alternate-exchange",BACKUP_EXCHANGE)
        .build();
}

其他

幂等性

同一操作发起的一次或多次请求的结果是一致的,不会产生副作用

想要达成可通过设置全局ID,写入唯一标识。

消费端的幂等性:唯一ID+指纹码机制、Redis的原子性文章来源地址https://www.toymoban.com/news/detail-741123.html

到了这里,关于RabbitMq介绍和使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ安装、端口修改、简单的角色介绍

    本文介绍RabbitMQ安装的环境是CentOS7版本的Linux云服务器。 官网:https://www.rabbitmq.com/ 由于RabbitMQ是使用Erlang语言开发的,所以我们在安装RabbitMQ之前需要在服务器中安装Erlang语言的环境。在Linux中执行下面命令: 在执行完上面语句后,我们就可以安装RabbitMQ了,也是比较简单的

    2024年02月17日
    浏览(39)
  • rabbitMQ和Erlang安装后无法访问localhost:15672解决方法

    这个是我rabbitMQ安装在电脑上的位置,具体的要看你最近安装的位置,总之找到 sbin. 就在这里输入cmd即可打开 正常启动的服务应该是如下所示,E 和e 分别表示显性和隐性启动,如果没有E和e,这也就是你打不开localhost:15672的原因所在,可能性很大。那么,就再输入 rabbitmq-plu

    2024年01月16日
    浏览(45)
  • windows环境下RabbitMQ无法访问http://localhost:15672的问题

    1.问题 成功安装RabbitMQ后,RabbitMQ管理模块的插件也启动了但是还是无法访问http://localhost:15672。 网上很多回答都是,如下: 其实在打开节点的时候就提示了问题: 在dos下使用rabbitmqctl status查看原因 查看原因发现是Erlang新版本的cookie位置换了 异常解释来源于:https://www.cnblogs.com/h

    2024年02月11日
    浏览(51)
  • SpringBoot+RabbitMQ实现超时未支付订单自动取消,localhost:15672没有登录页面。

    简介 安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配。 RabbitMQ官网:RabbitMQ: One broker to queue them all | RabbitMQ RabbitMQ与Erlang/OTP版本对照表:Erlang Version Requirements | RabbitMQ Erlang官网下载:Downloads - Erlang/OTP 1.Windows上安装RabbitMQ前需要安装Erlang。(下载安装不做叙述,除了需要自定义安

    2024年04月15日
    浏览(55)
  • RabbitMQ使用哪些端口?

    对于节点群集,RabbitMQ服务器使用或需要在防火墙上打开哪些端口? 我的 /usr/lib/rabbitmq/bin/rabbitmq-env 设置在下面,我假设是需要的(35197)。 复制 我没有接触 rabbitmq.config 来设置自定义的 tcp_listener ,所以它应该在默认的5672上侦听。 以下是相关的netstat行: 端口4369: Erlang使用P

    2024年02月09日
    浏览(32)
  • rabbitmq的介绍、使用、案例

    rabbitmq简单来说就是个消息中间件,可以让不同的应用程序之间进行异步的通信,通过消息传递来实现解耦和分布式处理。 消息队列:允许将消息发到队列,然后进行取出、处理等操作,使得生产者和消费者之间能够解耦,异步地进行通信。 持久性,可靠性的消息传递机制。

    2024年01月20日
    浏览(54)
  • rabbitMq介绍及使用

    点击跳转https://blog.csdn.net/qq_43410878/article/details/123656765

    2024年02月10日
    浏览(31)
  • RabbitMq介绍和使用

    MQ,是Message Queue的缩写,遵循先进先出的原则 在项目中用于流量削峰,应用解耦,异步处理 流量削峰:处理大量订单时,将订单分散,部分用户下单后需要进行等待,但却防止了服务宕机 应用解耦:当进行服务之间的通信时,普通方式不能保证当前模块调用其他模块出错后

    2024年02月06日
    浏览(34)
  • RabbitMQ特性介绍和使用案例

    ❤ 作者主页:李奕赫揍小邰的博客 ❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~* 🍊 记得点赞、收藏、评论⭐️⭐️⭐️ 📣 认真学习!!!🎉🎉   RabbitMQ特性 AMQP (高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),

    2024年02月11日
    浏览(32)
  • 初识RabbitMQ(RMQ的五种消息模型介绍以及使用演示)

    RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型: 简单队列 Work模式 广播模式 路由模式 主题模式 其实我比较喜欢将他们分成三类: 1.简单队列:一个生产者,一个消费者。(名师指导,1V1) 2.Work模式:一个生产者,多个消费者。但是一条消息只能够给到一个人

    2023年04月08日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包