微服务RabbitMQ高级篇

这篇具有很好参考价值的文章主要介绍了微服务RabbitMQ高级篇。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一.消息可靠性传递概述

二.生产者消息确认机制

三.publisher-comfirm

四.publisher-return

五.消息持久化

六.消费者消息确认机制

七.如何确保RabbitMQ消息的可靠性?

八.死信交换机

九.延迟队列

十.惰性队列

十一.MQ集群


一.消息可靠性传递概述

生产者发送消息到交换机,交换机将消息路由到队列,消费者从队列获取消息。哪些环节会导致消息丢失。

1.生产者发送消息丢失

  • 生产者没有将消息发送到交换机
  • 交换机没有成功将消息路由到队列

2.MQ宕机导致消息丢失

3.消费者处理消息丢失

消费者获取到消息后,未来得及处理,宕机

消费者获取到消息后,处理消息抛异常。

二.生产者消息确认机制

生产者消息确认机制一共有2种方式

  • publisher-comfirm
  • publisher-return

在publisher这个微服务的application.yml中添加配置

spring:  
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
      template:
        mandatory: true

1.publish-confirm-type:

开启publisher-confirm,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

2.publish-returns:

开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

3.template.mandatory:

定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

三.publisher-comfirm

作用:开启后,生产者发送消息到RabbitMQ交换机,RabbitMQ会进行结果返回。

  • ack:生产者成功将消息发送到队列
  • nack:生产者发送到交换机失败

如何使用

1.在生产者配置文件中开启

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 异步回调

2.如何接受RabbitMQ结果返回

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });

correlationData:消息的唯一标识

ack

  • true:生产者成功将消息发送到交换机
  • false:生产者发送到交换机失败

cause:失败原因

注意点:rabbitTemplate.setConfirmCallback方法的调用,只能调用一次。

微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java

3.如何保证rabbitTemplate.setConfirmCallback方法只会被调用一次

方案1:初始化方法

@PostConstruct

public void init() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            }
        });
    }

方案2:CommandLineRunner(推荐)

@Component
public class MyComandLineRunner implements CommandLineRunner {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void run(String... args) throws Exception {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });
    }
}

方案3:实现ApplicationContextAware实现类

@Component
public class MyApplicationContext implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });
    }
}

4.发送消息的时候,需要给消息指定一个唯一标识
 

rabbitTemplate.convertAndSend(exchangeName,routingKey,message,new CorrelationData(id));

5.逻辑问题:如果生产者发送消息到交换机失败了?怎么重发

  • 在发送消息之前,将消息先存储到数据库(MySQL,Redis)
  • 如果消息发送交换机失败,读取Redis中信息,重新发送

6.测试

1.成功向交换机发送消息:观察

  • correlationData
  • ack
  • cause

2.发送消息到交换机失败:删除交换机

四.publisher-return

作用:开启后,交换机将消息路由到消息队列失败,RabbitMQ会进行结果返回。

如何使用

1.在生产者配置文件中开启

spring:
  rabbitmq:
    publisher-returns: true
      template:
        mandatory: true

template:mandatory: true定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2.如何接受RabbitMQ结果返回

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
  • message:交换机路由队列失败的那个消息
  • replyCode:错误码
  • replyText:错误信息
  • exchange:交换机
  • routingKey:路由key

3.注意点

rabbitTemplate.setReturnCallback方法的调用,只能调用一次。

方案1:初始化方法

@PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }

方案2:

@Component
public class MyComandLineRunner implements CommandLineRunner {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void run(String... args) throws Exception {
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }
}

方案3:实现ApplicationContextAware实现类

@Component
public class MyApplicationContext implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }
}

4.逻辑问题:交换机路由消息到队列失败了,如何重新发送

直接调用rabbitTemplate.convertAndSet发送

5.测试

将路由key故意改错

  • message:交换机路由队列失败的那个消息
  • replyCode:错误码
  • replyText:错误信息
  • exchange:交换机
  • routingKey:路由key

五.消息持久化

交换机、队列、消息持久化(都默认持久化)

交换机

ExchangeBuilder.directExchange(ITCAST_DIRECT ).durable(true).build();
new DirectExchange(ITCAST_DIRECT,true,false); 

队列

QueueBuilder.durable(DIRECT_QUEUE1).build()
new Queue(DIRECT_QUEUE2,true);

消息持久化

1.如果发送普通字符串,默认持久化

2.如果期望消息不持久化。

Message msg = MessageBuilder.withBody(message.getBytes("utf-8"))
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                            .build();

六.消费者消息确认机制

解决问题:

  • 消费者处理消息丢失
  • 消费者获取到消息后,未来得及处理,宕机
  • 消费者获取到消息后,处理消息抛异常。

1.开启消费者消息确认机制

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

作用:消费者获取到消息后,如果处理消息出现异常,会给MQ返回nack. MQ将消息放入队列头部再给消费者。
消费者获取到消息后,如果处理消息正常,会给MQ返回ack. MQ将消息从消息队列中删除。

2.消费者消息确认机制-问题

如果消费者代码写的有问题

无限重试,导致MQ压力过大

3.开启消费者消息重试机制

优势

1.重试在消费者本地重试。

2.重试可以有延迟时间。

3.重试有次数限制

如何使用

rabbitmq:
    listener:
      simple:
        retry:
          enabled: true  #开启失败重试
          initial-interval: 100 # 初次失败,间隔时间
          multiplier: 2 # 间隔时间倍数
          max-attempts: 3 #最大重试次数
          stateless: true #是否是无状态,true无状态,和事务相关,有事务写false

重试耗尽

触发重试耗尽策略

MessageRecover

RejectAndDonotMessageRecover(默认)重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediaRequeueMessageRecover重试耗尽后,返回nack,消息重新入队

RepublishMessageRecover

1.创建错误交换机

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");}

2.错误队列

@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); }

3.绑定

@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}

4.RepublishMessageRecover交由spring管理,进行重发。

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }

5.重试耗尽后,将失败消息投递到指定的交换机

七.如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

八.死信交换机

死信 

死信满足之一

  • 消息被消费者拒绝,不让重新入队
  • 消息队列满了,溢出的消息。
  • 消息在消息队列中超时过期

去哪里?

  • 被丢弃
  • 如果队列指定了死信交换机。

死信交换机

普通交换机

怎么给队列指定死信交换机

  • 给队列设置dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

死信交换机 + 消息ttl实现延迟消息队列

延迟消息队列

生产者----->消息,消费者不能立即消费,需要等待一定时间才能消费。

如何实现
微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java

给消息设置ttl有2种方式

1.创建队列设置消息过期时间         ttl()           x-message-ttl

2.创建消息的时候可以指定过期时间

Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
                .setContentType("text/plain")
                .setExpiration("5000").build();

实现方式

我们声明消费者一组死信交换机和队列,基于注解方式:

@RabbitListener(bindings = @QueueBinding(       
                value = @Queue(name = "dl.queue", durable = "true"),
                exchange = @Exchange(name = "dl.direct"),      
                key = "dl"))
public void listenDlQueue(String msg){ log.info("接收到 dl.queue的延迟消息:{}", msg);}

消费者config中要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public DirectExchange ttlExchange(){   
    return new DirectExchange("ttl.direct");
}

@Bean
public Queue ttlQueue(){   
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
          .ttl(10000) // 设置队列的超时时间,10秒       
          .deadLetterExchange("dl.direct") // 指定死信交换机
          .deadLetterRoutingKey("dl") // 指定死信RoutingKey 
          .build();}

@Bean  public Binding simpleBinding(){

return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}

发送消息时,给消息本身设置超时时间

@Test  public void testTTLMsg() {   // 创建消息

Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                                .setExpiration("5000") 
                                .build();  // 消息ID,需要封装到CorrelationData中

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息

rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);}

如何实现发送一个消息20秒后消费者才收到消息?

  1. 给消息的目标队列指定死信交换机
  2. 消费者监听与死信交换机绑定的队列
  3. 发送消息时给消息设置ttl为20秒

九.延迟队列

使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

延迟队列插件的使用步骤包括哪些?

  1. 声明一个交换机,添加delayed属性为true
  2. 发送消息时,添加x-delay头,值为超时时间

安装DelayExchange插件

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

1.基于注解方式:
微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java

2.基于java代码的方式:
微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java

然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒:
                

十.惰性队列

消息堆积问题

  • 生产者>消费者消费速度
  • 如果消息堆积超过队列容量上限,溢出的消息就会称为死信。死信会被丢弃。

怎么解决

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积(使用惰性队列),提高堆积上限

惰性队列特点

  • 将消息直接存入磁盘,不存储内存
  • 支持海量消息存储
  • 消费者要获取消息,MQ将消息加载到内容。

创建

微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java

  • lazy()
  • 注解
  • 管理控制台

优点

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

缺点

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

十一.MQ集群

普通集群(分布式)

  1. 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  2. 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  3. 队列所在节点宕机,队列中的消息就会丢失

镜像集群(主从)数据存在延迟

1.主从架构集群,队列可以在多个节点上有

2.主节点:在那个节点上创建队列,那个节点就是主节点

3.镜像节点:备份主节点上队列的节点

4.创建备份策略:
                exactly
                all
                nodes

5.创建队列,根据队列名称,指定那些节点作为镜像节点。

仲裁队列代替镜像集群

1.生产者----->主节点队列------->镜像节点队列

2.与镜像队列一样,都是主从模式,支持主从数据同步

3.使用非常简单,没有复杂的配置

4.主从同步基于Raft协议,强一致

5.创建队列

        指定类型quorum

        java代码 quorum();

        默认5个镜像节点

java代码怎么操作集群

和单机区别;

spring:
  rabbitmq:
    addresses: 192.168.200.128:8071,192.168.200.128:8072,192.168.200.128:8073
    username: itcast
    password: 123
    virtual-host: /

微服务RabbitMQ高级篇,微服务,入门到大神,微服务,架构,spring cloud,rabbitmq,java
    文章来源地址https://www.toymoban.com/news/detail-828470.html

到了这里,关于微服务RabbitMQ高级篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务Spring Cloud Config配置中心与RabbitMQ安装指南

    本文档详细描述了如何在Spring Cloud微服务架构中设置Config配置中心,将项目配置文件存储在Git服务器上(如GitHub或Gitee),并在微服务启动时从Config配置中心获取配置文件。同时,提供了RabbitMQ消息队列的安装指南,为微服务之间的通信提供可靠的消息传递机制。

    2024年02月11日
    浏览(35)
  • Spring Cloud微服务架构组件【Java培训】

    SpringCloud是一系列框架的有序集合,为开发人员构建微服务架构提供了完整的解决方案。Spring Cloud根据分布式服务协调治理的需求成立了许多子项目,每个项目通过特定的组件去实现,下面我们讲解一下Spring Cloud 包含的常用组件以及模块。 (1)Spring Cloud Config:分布式配置中心

    2023年04月25日
    浏览(28)
  • Spring Cloud 与dubbo微服务架构选型

    Spring Cloud:适用于中小型项目,轻量级,易于入门,快速开发。 Dubbo:适用于大规模、复杂性高的项目,具有强大的性能和扩展性。 Spring Cloud:适合已经熟悉Spring框架的团队,可以利用Spring Boot、Spring Cloud Netflix等组件。 Dubbo:适合Java生态系统,特别是阿里巴巴旗下的技术栈

    2024年02月09日
    浏览(30)
  • 在Spring Cloud中使用RabbitMQ完成一个消息驱动的微服务

    Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的

    2024年02月04日
    浏览(35)
  • 微服务·架构组件之网关- Spring Cloud Gateway

    微服务架构已成为构建现代化应用程序的关键范式之一,它将应用程序拆分成多个小型、可独立部署的服务。Spring Cloud Gateway是Spring Cloud生态系统中的一个关键组件,用于构建和管理微服务架构中的网关。本报告旨在调查和介绍Spring Cloud Gateway的核心概念、架构、功能以及其在

    2024年02月09日
    浏览(40)
  • Spring Cloud微服务架构:实现分布式系统的无缝协作

    🎉欢迎来到架构设计专栏~Spring Cloud微服务架构:实现分布式系统的无缝协作 ☆* o(≧▽≦)o *☆嗨~我是IT·陈寒🍹 ✨博客主页:IT·陈寒的博客 🎈该系列文章专栏:架构设计 📜其他专栏:Java学习路线 Java面试技巧 Java实战项目 AIGC人工智能 数据结构学习 🍹文章作者技术和水

    2024年02月08日
    浏览(33)
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级

    直击微服务作用     遇到了什么问题?         将单体架构拆分成微服务架构后,如果保证多个服务(项目)正常运行?     哪个技术可以解决这个问题?         微服务技术         服务治理: 服务管理,维护服务与服务之间的关系     这个技术如何使用?         netflix/网飞:

    2024年02月03日
    浏览(32)
  • Spring Cloud微服务入门(二)

    微服务的技术栈 服务治理: 服务注册、发现、调用。 负载均衡: 高可用、集群部署。 容错: 避免雪崩、削峰、服务降级。 消息总线: 消息队列、异步通信,数据一致性。 网关: 校验路径、请求转发、服务集成。 配置管理: 统一配置管理,数据源配置、端口配置等。

    2024年04月15日
    浏览(30)
  • Spring Cloud微服务基础入门

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。 微服务架构是当前软件开发领域的一种流行趋势,它将一个大型应用程序分解成多个小型、独立、可互相调用的服务。Spring Cloud是一个基于Spring Boot的开源微服务框架

    2024年01月17日
    浏览(27)
  • 从零开始的Spring Cloud Gateway指南:构建强大微服务架构

    微服务架构的兴起已经改变了软件开发的面貌,使得开发者能够更灵活地构建、部署和维护应用程序。而在这个微服务的时代,强大而灵活的网关是确保微服务之间通信顺畅的关键之一。在本文中,我们将深入研究Spring Cloud Gateway,一款开源的、基于Spring Framework的微服务网关

    2024年02月02日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包