RabbitMQ整合Springboot

这篇具有很好参考价值的文章主要介绍了RabbitMQ整合Springboot。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、配置

二、使用

(1)创建普通交换机

(2) 创建普通队列

(3)绑定 交换机--队列

(4)创建带有死信交换机的队列

(5)生产者

(6)消费者

(7)Message对象

(8)延时队列优化(死信实现延时,有缺陷)

三、Rabbitmq插件实现延迟队列(重点)

四、发布确认

(1)确认回调接口(监控消息是否被收到)

(2)回退消息

五、备份交换机

六、其他知识

(1)幂等性(消息重复消费)

(2)队列优先级

(3)惰性队列


一、配置

<!--RabbitMQ 依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

yaml

    #RabbitMQ 配置
    #地址
    spring.rabbitmq.host=192.168.1.5
    #端口
    spring.rabbitmq.port=5672
    #用户
    spring.rabbitmq.username=usermq
    #密码
    spring.rabbitmq.password=111111
    #开启发布确认模式 生产者--交换机
    spring.rabbitmq.publisher-confirm-type=correlated
    #允许消息回退  交换机--队列
    spring.rabbitmq.publisher-returns=true
     #(5672为端口,15672为web端口)

二、使用

配置类通过bean注入
创建队列,创建路由,创建队列绑定 (都是通过xxxBuider工具类创建)

创建交换机,创建队列(同时声名对应的死信交换机),绑定路由,绑定死信交换机和队列

交换机类型
DirectExchange (直接交换机,路由模式)
TopicExchange (主题模式 路由的路由)
FanoutExchange (发布/订阅,和路由id没关系,广播方式)
HeadersExchange
CustomExchange (自定义交换机 ,使用延迟消息插件的时候配置)

全部配置在@Configuration类中

(1)创建普通交换机

    @Bean("yExchange") //起别名
    public DirectExchange yExchange(){
        //new DirectExchange(X_EXCHANGE)
        //除了创建对象方式创建路由或者队列外 还可以通过工具类创建
        return  ExchangeBuilder.directExchange(Y_DEAD_LETTER_EXCHANGE).build();
    }

(2) 创建普通队列

    @Bean("queueYC")
    public Queue queueYC(){
        //队列
        return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
    }

(3)绑定 交换机--队列

    //@Qualifier 自动注入bean
    //交换机--队列 通过路由id绑定(直接交换机类型) with
    //绑定
    @Bean
    public Binding binding(@Qualifier("queueYC") Queue queue,@Qualifier("yExchange")DirectExchange  exchange){
        return BindingBuilder.bind(queue).to(exchange).with("YCXX");
    }

(4)创建带有死信交换机的队列

        死信交换机也是普通的交换机,创建方式和普通交换机一致

        死信交换机绑定的队列也是普通的队列

        带有死信交换机的为死信队列

        死信队列中的消息成为死信后会被转发到死信交换机中

        死信消息:比如ttl过期(存活时间) 

示例

  //声名普通队列列B  QB--Y
    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> map =new HashMap<>();
        //设置死信息交换机
        map.put("x-dead-letter-exchange","Y_DEAD_LETTER_EXCHANGE");
        //设置RotingKey
        map.put("x-dead-letter-routing-key","YD"); //(需要是死信交换机和 死信队列之间的路由id,不是当前队列和死刑死信路由的id,当前队列和死信交换机不需要路由id)
        //设置ttl 消息存活时间 单位是ms,设置当前队列消息的过期时间,也可以在生产者发送消息时候设置
        map.put("x-message-ttl",4000);
        //配置队列 可以通过工具类创建
                                //队列名字                  //其他配置
        return QueueBuilder.durable("QUEUE_B").withArguments(map).build();
    }

(5)生产者

创建RabbitTemplate对象(自动注入)
调用发送消息
rabbitTemplate.convertAndSend("交换机","路由和队列绑定的路由id","消息");

例如

  @RequestMapping("/send/{msg}/{ttl}")
    String TtlMsg(@PathVariable("msg") String msg,@PathVariable("ttl") String ttl){
        log.info("发送消息给QC:{},ttl:{},当前时间:{}",msg,ttl,new Date());

                                        //发送消息的交换机   路由id      消息   其他设置
        rabbitTemplate.convertAndSend("X", "XC", msg,new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置过期时间
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        return "发送成功";

    }

(6)消费者

需要消息监听器监听器
给方法加上 @RabbitListener(queues = "QD") 注解监听QD队列,即接收QD队列的消息

@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = "confirm_queue")
    public void receiveConfigMEssage(Message message){
        log.info("接收到的队列confirm_queue消息:{}",new String(message.getBody()));

    }


}

(7)Message对象

message.getBody():
消息内容bytes类型 使用new String(message.getBody())装箱成String类型

message.getMessageProperties().setExpiration(ttl)
发送消息时候设置过期时间

(8)延时队列优化(死信实现延时,有缺陷)

死信消息做延迟有巨大缺陷
RabbitMQ只会检测第一个消息是否过期
后面的ttl会受到前一条消息ttl影响
如果前一条消息ttl为10s 后一条消息ttl为2s ,那么后一条消息ttl也会变成ttl(后一条消息需要等待前一天消息发去出去)
总结:如果第一个的延时消息时长很长,而且第二个消息的延时时长很短,第二个消息并不会有序执行

三、Rabbitmq插件实现延迟队列(重点)

  下载插件
      插件放在这个目录下
          cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.13/plugins
    安装插件
      rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    重启RabbitMQ
     systemctl restart rabbitmq-server

消息延迟的是交换机
安装插件后新增交换机类型
x-delayed-message

  声名延迟交换机
         声名交换机
          交换机类型
            DirectExchange
            TopicExchange
            FanoutExchange
            HeadersExchange
            CustomExchange (自定义类型)

 

 延迟交换机需要使用 CustomExchange (自定义类型) 需要设置为延迟类型(直接设置) ,也需要设置类型(直接还是路由模式,map设置)
         
     CustomExchange  不能使用ExchangeBuilder 创建,因为没有对应方法
     只能 new CustomExchange("交换机名字","x-delayed-message","是否持久化","是否自动删除",map)
      Map<String,Object> map=new HashMap<>();
     
     //设置延迟 交换机的类型(设置为路由模式)
     map.put("x-delayed-type","direct"); //设置了 (x-delayed-message)延迟交换机后,还要设置它的模式 (设置路由模式)

设置延迟交换机

    //声名交换机
    //DirectExchange
    //TopicExchange
    //FanoutExchange
    //HeadersExchange
    //CustomExchange
    @Bean("ycExchange")
    public CustomExchange customExchange(){

        Map<String,Object> map=new HashMap<>();
        //设置延迟 交换机的类型(设置为路由模式)
        map.put("x-delayed-type","direct");

        //不能使用ExchangeBuilder工具类创建
        // ExchangeBuilder.
        //1、交换机名称
        //2、交换机类型
        //3、是否需要持久化
        //4、是否需要自动删除
        //5、其他配置参数
        return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,map);
    }

绑定

不是用Buider创建的需要使用noargs()适配

 //绑定
 @Bean
 public Binding binding(@Qualifier("queueYC") Queue queue,@Qualifier("ycExchange")CustomExchange exchange){
     return BindingBuilder.bind(queue).to(exchange).with("YCXX").noargs();
 }

延迟消息总结

 使用RabbitMQ来实现延迟消息很好的利用RabbitMQ的特性
     消息可靠发送,消息可靠投递,死信消息至少被消费一次以及未被真确处理的消息不会被丢弃。另外,通过RabbltMq集群的特性可以很好
     的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失

四、发布确认

生产者发送消息给交换机--队列--消费者
如果消息发送识失败,把消息放入缓存中
队列或者交换机出问题就算交换机问题

(1)确认回调接口(监控消息是否被收到)

针对全部交换机
监控: 生产者--交换机 过程

  在配置文件当中开启发布确认模式()
      spring.rabbitmq.publisher-confirm-type=correlated
          none 默认禁止发布确认模式
          correlated 开启发布确认(只监控生产者--交换机  监控不到 交换机--队列)
          simple(相当于单个确认) 两种效果 其一效果和correlated值一样会触发回调方法
                      其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForsOrDie方法等待broker节点
                          返回发送结果,根据返回结果来判定下一步的逻辑,要注意的是waitForConfirmsOrDie方法如果
                          返回false则会关闭channel,则接下来无法发送消息到broker

发送消息时候

        每次发送消息到交换机中都会被监控到(需要设置消息id)    

      //消息的其他属性(id) 需要开启发布确认模式
    CorrelationData correlationData=new CorrelationData("1");
    //发送消息                                     交换机          路由id        消息      消息的其他属性
    rabbitTemplate.convertAndSend("confirm_exchange","GJK",message,correlationData);

生产者

        需要传递配置CorrelationData  对象来传递消息id等信息

        

    //发消息
    @RequestMapping("/pron/{message}")
    public String sendMessage(@PathVariable("message") String message){

        //消息的其他属性(id) 需要开启发布确认模式
        CorrelationData correlationData=new CorrelationData("1");


        //发送消息                                     交换机          路由id        消息      消息的其他属性
       rabbitTemplate.convertAndSend("confirm_exchange","GJK",message,correlationData);
}

配置类

继承设置RabbitTemplate.ConfirmCallback 接口
重写

//发布确认回调接口
//消息退回
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct //后置处理器 类似生命周期钩子,创建类时候最后执行
    private void init() {
        //把当前配置注入注入
        rabbitTemplate.setConfirmCallback(this);

   
    }

    //消息确认 生产者--交换机
    @Override                    //消息信息          交换机是收到消息    失败原因
    public void confirm(CorrelationData correlationData, boolean b, String s) {


      String id=correlationData.getId()!=null?correlationData.getId():"";

        if(b){
            log.info("交换机已经收到Id为:{}的消息",id);
        }else {
            log.info("交换机还未收到ID为:{}的消息,原因为:{}",id,s);
        }

    }


}

(2)回退消息

针对全部队列,在发布确认的前提下,消息会回退到回调函数中,可以在回调函数中重新创建生产者转发

监控:交换机--队列 过程

 开启回退消息
     spring.rabbitmq.publisher-returns=true
     或者
     //设置 交换机--队列 路由失败时候 自动退回消费者(returnedMessage进行处理) 默认是false直接丢弃消息
     rabbitTemplate.setMandatory(true);
     交换机--队列消息路由失败时候对消息进行处理:通过mandatory参数可以在当消息传递过程装中
         不可到达目的地时候将消息返回给生产者

回调函数

配置类需要实现

RabbitTemplate.ReturnsCallback接口

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct //后置处理器 类似生命周期钩子,创建类时候最后执行
    private void init() {
        //把当前配置注入注入
        rabbitTemplate.setConfirmCallback(this);

        //设置 交换机--队列 路由失败时候 自动退回消费者(returnedMessage进行处理) 默认是false直接丢弃消息
      //  rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnsCallback(this);
    }

    //消息确认 生产者--交换机
    @Override                   //消息信息          交换机是收到消息    失败原因
    public void confirm(CorrelationData correlationData, boolean b, String s) {


      String id=correlationData.getId()!=null?correlationData.getId():"";

        if(b){
            log.info("交换机已经收到Id为:{}的消息",id);
        }else {
            log.info("交换机还未收到ID为:{}的消息,原因为:{}",id,s);
        }

    }

    //回退消息,回调函数  交换机--队列
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {

        //可以在这创建生产者再次发送当前消息returnedMessage
        log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}",
                new String(returnedMessage.getMessage().getBody()), //消息内容
                returnedMessage.getExchange(),//交换机
                returnedMessage.getReplyText(), //失败原因
                returnedMessage.getRoutingKey() //路由id
        );
    }
}

生产者发送消息和发布确认中的生产者一致即可,不用发送ReturnedMessage 对象(会自动生成)

但消息确认中CorrelationData 对象需要发送(不会自动生成)

五、备份交换机

作用:自动处理 交换机--队列 中被退回的消息 转发到备份交换机中。

类型死信队列绑定-死信交换机,交换机--备份交换机

将确认交换机的消息(设置了发布确认模式,开启了消息退回)

配置

 构建确认交换机时候需要配置备份交换机  withArgument("alternate-exchange","备份交换机名字")

例如

    //交换机声名--绑定备份交换机
    @Bean("confirmExchange")
    public DirectExchange directExchange(){
                                            //交换机名字                 是否持久化               //设置备份交换机                //备份交换机名字
        return ExchangeBuilder.directExchange(CONFIRM_CONFIG_NAME).durable(true).withArgument("alternate-exchange",BACKUP_CONFIG_NAME1).build();
    }

备份交换机可以设置成广播类型交换机

   //创建备份交换机
    @Bean("bfExchange")
    public FanoutExchange baexchange(){
        return ExchangeBuilder.fanoutExchange(BACKUP_CONFIG_NAME1).build();
    }

绑定备份交换机的队列称为报警队列(普通队列)

例如

  //报警队列
    @Bean("bjQueue")
    public Queue bjqueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME2).build();
    }

    //绑定报警队列和备份交换机
    @Bean
    public Binding bfMyinding(@Qualifier("bfQueue")Queue queue,@Qualifier("bfExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

六、其他知识

(1)幂等性(消息重复消费)

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单是例子,那就是多次支付
消费者再消费MQ中消息时候,消费者在返回确认时候ack网络中断,所有MQ未收到确认信息,所以这条消息会被发送给其他消费者进行消费
造成消息重复消费
 解决问题:
     消息全局ID:每次消费时候先判断该id是否被消费过
     指纹码机制:一些规则或者时间搓加别的服务给到唯一信息码(劣势:高并发时候频繁对比数据库信息会造成瓶颈)
     利用redis原子性:执行setnx命令 ,天然具有幂等性,从而实现不重复消费 

(2)队列优先级

 订单优先级
     优先权重 0-255 越大越优先
     数据优先处理
 优先级排序
     队列需要设置优先级,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
     队列设置最大优先级(0-255)
       QueueBuilder.durable("priorityQueue").maxPriority(10).build();
     消息设置优先级 maxPriority(10)

(3)惰性队列

惰性队列: 消息保存在内存中还是在磁盘上
    正常情况:消息是保存在内存中
    惰性队列:消息保存在磁盘中
使用在:消费者下线或者宕机
优点:减少内存小号
缺点:速度慢
两种模式:default(默认) lazy(惰性模式)
withArguments( “x-queue-mode”,"lazy") 设置惰性队列
    QueueBuilder.durable(QUEUE_B).withArguments(map)build()

RabbitMQ集群

镜像队列

高可用负载均衡

        使用第三方服务器

Federation Exchange

        联邦交换机

Shovel

        可以把源端的数据转发到目的端文章来源地址https://www.toymoban.com/news/detail-618445.html

到了这里,关于RabbitMQ整合Springboot的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot 整合 RabbitMQ

    由于有的 Idea 不选择插线无法创建 Spring Boot 项目,这里我们先随便选一个插件,大家也可以根据需求选择~~ 把版本改为 2.7.14 引入这两个依赖: 配置 application.yml文件 Config 类 : RabbitMQConfig 测试类: RabbitMQConfigTests 结果 当我们启动 测试类 之后就可以发现我们的 rabbitmq 界面里的

    2024年02月10日
    浏览(38)
  • SpringBoot整合RabbitMQ(基础)

    一.环境准备 1、在pom文件中引入对应的依赖: 2、在application.yml配置文件中配置RabbitMQ: 二、整合 点对点,简单模式 ①配置文件中声明队列 ②创建生产者 消息发送成功后,在web管理页面查看: 可以看到对应队列中产生了消息 ③创建消费者 启动项目,可以看到消息成功消费:

    2024年02月11日
    浏览(36)
  • 单个springboot整合rabbitmq

    rabbitmq是一种消息中间件,是基于erlang语言开发的AMQP(高级消息队列协议)的开源实现。 本质是个队列,FIFO先入先出。 1.1.1 rabbitmq特性: 开源,性能优秀,稳定保障 提供可靠的消息投递模式,返回模式 于Spring AMQP完美整合,API丰富 集群模式丰富 高可用 1.1.2 rabbitmq主要结构 生产

    2024年02月10日
    浏览(65)
  • SpringBoot整合实现RabbitMQ

    本文大纲 一.RabbitMQ介绍 二.RabbitMQ的工作原理 2.1 RabbitMQ的基本结构 2.2 组成部分说明 2.3 生产者发送消息流程 2.4 消费者接收消息流程 三.SpringBoot 整合实现RabbitMQ 3.1创建mq-rabbitmq-producer(生产者)发送消息 3.1.1pom.xml中添加相关的依赖 3.1.2 配置application.yml 3.1.3 配置RabbitMQ常量类

    2024年02月17日
    浏览(43)
  • SpringBoot 整合RabbitMQ

    2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出

    2024年02月15日
    浏览(51)
  • RabbitMQ整合Springboot

    目录 一、配置 二、使用 (1)创建普通交换机 (2) 创建普通队列 (3)绑定 交换机--队列 (4)创建带有死信交换机的队列 (5)生产者 (6)消费者 (7)Message对象 (8)延时队列优化(死信实现延时,有缺陷) 三、Rabbitmq插件实现延迟队列(重点) 四、发布确认 (1)确认回调

    2024年02月15日
    浏览(40)
  • springboot整合rabbitmq死信队列

    什么是死信 需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置: 从上面的逻辑图中,可以发现大致的思路: .1. 消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。 2. 正常队列针对死信信息,需要将数据 重新 发送至死信交换机

    2024年02月11日
    浏览(47)
  • RabbitMQ与SpringBoot整合实践

    作者:禅与计算机程序设计艺术 2020年是一个转折点,现代化的信息社会已经开启了数字化进程,越来越多的人开始接受信息技术作为工作的一部分。相较于传统的技术岗位,人工智能、大数据、云计算领域的软件工程师更加需要具备实际项目应用能力、高超的计算机和通信

    2024年02月09日
    浏览(37)
  • SpringBoot 整合 RabbitMQ demo

    Rabbit Windows安装教程 本文只做Demo案例的分享,具体知识需自行百度 1.application.properties 配置Rabbit的基本信息 2.pom文件 导入两个maven依赖 第一个是SpringBoot集成的rabbit 第二个是web依赖,用来方便发消息 3.Rabbit配置类 这里用的是主题模式,也就是模糊匹配的模式 这里设置的是只

    2024年02月17日
    浏览(43)
  • SpringBoot整合RabbitMQ(最新笔记)

    1.2 创建工程并导入依赖 我们使用的springboot版本为2.5.6,其他都是根据 spring-boot-starter-parent 自动选择版本 引入以下工程即可 spring-boot-starter-test 用于测试 junit 用于单元测试 spring-boot-starter-amqp SpringBoot和RabbitMQ的整合方案 1.2 创建配置文件并配置 SpringBoot配置文件名称为 applica

    2024年02月05日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包