高级篇-rabbitmq的高级特性

这篇具有很好参考价值的文章主要介绍了高级篇-rabbitmq的高级特性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

setreturncallback,rabbitmq,mq,rabbitmq

 setreturncallback,rabbitmq,mq,rabbitmq

 setreturncallback,rabbitmq,mq,rabbitmq

1.消息可靠性

三种丢失的情形:

setreturncallback,rabbitmq,mq,rabbitmq

1.1  生产者确认机制 

setreturncallback,rabbitmq,mq,rabbitmq

 启动MQ

setreturncallback,rabbitmq,mq,rabbitmq

创建Queues: 

setreturncallback,rabbitmq,mq,rabbitmq两种Callback:

1.ReturnCallback:全局callback setreturncallback,rabbitmq,mq,rabbitmq

 2.ComfirmCallback: 发送信息时候设置

setreturncallback,rabbitmq,mq,rabbitmq setreturncallback,rabbitmq,mq,rabbitmq

 @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }

 执行成功:

setreturncallback,rabbitmq,mq,rabbitmq

 监控页面:

setreturncallback,rabbitmq,mq,rabbitmq

模拟失败:

 1.投递到交互机失败

setreturncallback,rabbitmq,mq,rabbitmq

2.投递到交换机了,但是没有进入队列 

setreturncallback,rabbitmq,mq,rabbitmq setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

1.2 消息持久化 

注意: 生产者确认只能保证数据放到队列当中,但是无法保证数据不丢失(比如所在的机器宕机了),
所以还需要保证数据的持久化

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}
@Test
public void testDurableMessage() {
   // 1.准备消息 消息持久化
   Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
   // 2.发送消息
   rabbitTemplate.convertAndSend("simple.queue", message);
}

 注意:

    //交换机不传值默认就是持久化  
    //交换机、队列、消息默认都是持久化   
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }

    public AbstractExchange(String name) {
        this(name, true, false);
    }

  演示数据是否默认持久化: setreturncallback,rabbitmq,mq,rabbitmq

     重启mq:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq 1. 交互机、队列、消息都做持久化

  2.消费者端关闭防止被消费

  3.重启mq后看队列中数据是否还在(是否持久化)

setreturncallback,rabbitmq,mq,rabbitmq

 1.3  消费者消息确认

生产者确认:能确定消息投递到队列
消息持久化:能避免MQ宕机造成的消息丢失
生产者确认和消息持久化能保证消息能投递到消费者,但是无法保证消息被消费者消费(比如投递消费者的
同时,消费者所在机器宕机了)

setreturncallback,rabbitmq,mq,rabbitmq

1.manual:不推荐 代码侵入
try{
  //业务逻辑
  ack
} catch(ex){
  nack
}
2.auto:推荐 spring全权完成,不需要手动写代码
3.none:不推荐 投递完成立马删除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener { 
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //模拟出现异常情况
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
}

默认为none:抛出异常后消息立即被删除:setreturncallback,rabbitmq,mq,rabbitmq

 修改为auto模式:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

队列返回nack会再去发送信息: 

setreturncallback,rabbitmq,mq,rabbitmq

1.4 失败重试机制

setreturncallback,rabbitmq,mq,rabbitmq

 演示失败重试机制:

listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true
          initial-interval: 1000
          multiplier: 3
          max-attempts: 4

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmqsetreturncallback,rabbitmq,mq,rabbitmq

 默认重试到达最大次数后消息就丢弃:

       但是对于一些比较重要不能丢弃的消息需要使用以下策略:    setreturncallback,rabbitmq,mq,rabbitmq 

推荐使用第三种方案:将失败的消息发送到失败的交换机和失败的队列中,后面可以告知管理员然后重新
人工去处理

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

 演示:

发送消息:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq setreturncallback,rabbitmq,mq,rabbitmq

 面试题:最后一分钟的总结

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

 2. 死信交换机

 2.1  初识死信交换机

setreturncallback,rabbitmq,mq,rabbitmq

1.发送信息到消费者默认的retry重试机制,达到最大次数就会被reject
2.队列中绑定一个死信交换机,接收被reject的信息,然后发送到dl.queue
3.这样就不担心死信会丢失

对比消息失败信息处理策略:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

2.2  TTL 

 注意: 存活时间取消息所在队列中存货时间 、消息本身存活时间的以短的时间为准

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
        log.info("消费者接收到了dl.queue的延迟消息");
    }
}

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

 setreturncallback,rabbitmq,mq,rabbitmq

    @Test
    public void testTTLMessage() {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
        // 3.记录日志
        log.info("消息已经成功发送!");
    }

  演示延时队列:

  1.启动消费者 

  2.发送消息:testTTLMessage() 

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

2.3 延迟队列 

setreturncallback,rabbitmq,mq,rabbitmqp159 27:18 

setreturncallback,rabbitmq,mq,rabbitmq

 setreturncallback,rabbitmq,mq,rabbitmq

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }
}

 setreturncallback,rabbitmq,mq,rabbitmq

 setreturncallback,rabbitmq,mq,rabbitmq

 文章来源地址https://www.toymoban.com/news/detail-701346.html

    @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }

演示延时队列:

1.启动消费者

setreturncallback,rabbitmq,mq,rabbitmq

 2.运行testSendDelayMessage

报错原因:消息没有做路由

setreturncallback,rabbitmq,mq,rabbitmq

 如何不报错:添加延迟的判断: 

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

setreturncallback,rabbitmq,mq,rabbitmq

 

到了这里,关于高级篇-rabbitmq的高级特性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ消息队列高级特性

    在线上生产环境中,RabbitMQ可能会产生消息丢失或者是投递失败的一个场景,RabbitMQ为了避免这种场景的发生,提供了两种方式来控制消息传递的可靠性。 Confirm确认模式 消息从生产者到MQ的Exchange过程中,如果消息成功到达,则会返回一个ConfirmCallback的确认函数。 Return退回模

    2024年02月12日
    浏览(35)
  • RabbitMQ之高级特性

    提示:以下是本篇文章正文内容,RabbitMQ 系列学习将会持续更新 官网 :https://www.rabbitmq.com RabbitMQ 消息确定主要分为两部分: 第一种是 消息发送确认 。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。 确认发送的第一步是确认是

    2023年04月10日
    浏览(33)
  • RabbitMQ的高级特性及其特点

    1、应用解耦 提高系统容错性和可维护性 在订单系统中,可以通过远程调用直接调用库存系统,支付系统,物流系统。 但是这三个系统耦合度太高了,因为订单系统下完订单首先去库存系统将库存-1,然后将返回值返回给订单系统,然后通过订单系统的返回结果来在支付系统

    2024年02月08日
    浏览(31)
  • RabbitMQ——高级特性(SpringBoot实现)

    本篇文章的内容与我之前如下这篇文章一样,只是使用技术不同,本篇文章使用SpringBoot实现RabbitMQ的高级特性! RabbitMQ——高级特性_小曹爱编程!的博客-CSDN博客 RabbitMQ——高级特性:1、RabbitMQ高级特性;2、RabbitMQ应用问题;3、RabbitMQ集群搭建 https://blog.csdn.net/weixin_62993347/

    2023年04月21日
    浏览(28)
  • rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

    消息何去何从 mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。 mandatory参数 true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者 生产者调用channel.

    2024年02月10日
    浏览(41)
  • 4.RabbitMQ高级特性 幂等 可靠消息 等等

    保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点(Broker)确认应答 完善的消息进行补偿机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。 生产者进行接收应答,用来确定这条消息是否正常的发送到了Broker,这种方式也是

    2024年02月11日
    浏览(42)
  • RabbitMQ养成记 (10.高级特性:死信队列,延迟队列)

    这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点 什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。 注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。 队列的消息长度 到达限制

    2024年02月05日
    浏览(38)
  • RabbitMQ高级特性2 、TTL、死信队列和延迟队列

    设置 消费者 测试 添加多条消息 拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费 Time To Live(存活时间/过期时间)。 当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可

    2024年01月16日
    浏览(41)
  • RabbitMQ高级特性解析:消息投递的可靠性保证与消费者ACK机制探究

    学习RabbitMQ高级特性,涵盖消息的持久化、确认模式、退回模式以及消费者ACK机制等方面,助您构建高可靠性的消息队列系统。

    2024年01月16日
    浏览(64)
  • (四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。 1、 生产者批量发送消息 2、消费端配置限流机制 3、消费者监听队列 在RabbitMQ中,多个消费者监听同一条队列,则队列

    2024年02月15日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包