207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

这篇具有很好参考价值的文章主要介绍了207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

★ 发送消息

- Spring Boot可以将AmqpAdmin和AmqpTemplate注入任何其他组件,
  接下来该组件即可通过AmqpAdmin来管理Exchange、队列和绑定,还可通过AmqpTemplate来发送消息。 

- Spring Boot还会自动配置一个RabbitMessagingTemplate Bean(RabbitAutoConfiguration负责配置),
  如果想使用它来发送、接收消息,
  可使用RabbitMessagingTemplate代替上面的AmqpTemplate,两个Template的注入方式完全相同。

207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

★ 创建队列的两种方式

 方式一(编程式):在程序中通过AmqpAdmin创建队列。

 方式二(配置式):在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,
                  RabbitMQ将会自动为该Bean创建对应的队列。

代码演示

需求1:发送消息

1、ContentUtil 先定义常量

207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

2、RabbitMQConfig 创建队列的两种方式之一:

配置式:

在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。
就是在配置类中创建一个生成消息队列的@Bean。

问题:

用 @Configuration 注解声明为配置类,但是项目启动的时候没有自动生成这个队列。
据了解是因为RabbitMQ使用了懒加载,大概是没有消费者监听这个队列,就没有创建。
但是当我写后面的代码后,这个消息队列就生成了,但是并没有消费者去监听这个队列。
这有点想不通。
不知道后面是哪里的代码让这个配置类能成功声明这个消息队列出来。
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
水落石出:
经过测试:
在下面的MessageService 这个类中,依赖注入了 AmqpAdmin 和 AmqpTemplate 这两个对象,当我们通过这两个对象去声明队列、Exchange 和绑定的时候,配置类中的创建消息队列的bean就能成功创建队列。
这张图结合下面的 MessageService 中的代码就可得知:
这是依赖注入 AmqpAdmin 和 AmqpTemplate 这两个对象的有参构造器中声明的。
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

3、MessageService 编写逻辑

声明Exchange 、 消息队列 、 Exchange和消息队列的绑定、发送消息的方法等
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

PublishController 控制器

207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

application.properties 配置属性

207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

测试:消息发送

成功生成队列
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
发送消息测试
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器




★ 接收消息

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

 【备注】:该注解可通过queues属性指定它要监听的、已有的消息队列,
  它也可使用queuesToDeclare来声明队列,并监听该队列。


 - 如果没有显式配置监听器容器工厂(RabbitListenerContainerFactory),
 Spring Boot会在容器中自动配置一个SimpleRabbitListenerContainerFactory Bean作为监听器容器工厂,
 如果希望使用DirectRabbitListenerContainerFactory,可在application.properties文件中添加如下配置:
  spring.rabbitmq.listener.type=direct

 ▲ 如果在容器中配置了MessageRecoverer或MessageConverter,
   它们会被自动关联到默认的监听器容器工厂。



代码演示:

创建个消息队列的监听器就可以了。

@RabbitListener 注解修饰的方法将被注册为消息监听器方法。

该注解可通过queues属性指定它要监听的、已有的消息队列
它也可使用queuesToDeclare来声明队列,并监听该队列
还可以用 bindings 进行 Exchange和queue的绑定操作。
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器

测试: 消息接收

207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器
发送消息和监听消息
207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器),Spring Boot,RabbitMQ,java-rabbitmq,spring boot,监听器




★ 定制监听器容器工厂

▲ 如果要定义更多的监听器容器工厂或覆盖默认的监听器容器工厂,

可通过Spring Boot提供的SimpleRabbitListenerContainerFactoryConfigurer
或DirectRabbitListenerContainerFactoryConfigurer来实现,

它们可对SimpleRabbitListenerContainerFactory
或DirectRabbitListenerContainerFactory进行与自动配置相同的设置。 

▲ 有了自定义的监听器容器工厂之后,可通过@RabbitListener注解的containerFactory属性
  来指定使用自定义的监听器容器工厂,
例如如下注解代码:

@RabbitListener(queues = "myQueue1", containerFactory="myFactory")

完整代码:

application.properties RabbitMQ的连接等属性配置

# 配置连接 RabbitMQ 的基本信息------------------------------------------------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 下面属性可配置多个以逗号隔开的连接地址,一旦配置了该属性,host 和 port 属性就会被忽略
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# 连接虚拟主机
spring.rabbitmq.virtual-host=my-vhost01

# 配置RabbitMQ的缓存相关信息--------------------------------------------------------
# 指定缓存 connection ,还是缓存 channel
spring.rabbitmq.cache.connection.mode=channel
# 指定可以缓存多少个 Channel
spring.rabbitmq.cache.channel.size=50
# 如果选择的缓存模式是 connection , 那么就可以配置如下属性
# spring.rabbitmq.cache.connection.size=15

# 配置 和 RabbitTemplate 相关的属性--------------------------------------------------
# 指定 RabbitTemplate 发送消息失败时会重新尝试
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.template.retry.initial-interval=1s
# RabbitTemplate 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.template.retry.max-attempts=5
# 设置每次尝试重新发送消息的时间间隔是一个等比数列:1s, 2s, 4s, 8s, 16s
# 第一次等1s后尝试,第二次等2s后尝试,第三次等4s后尝试重新发送消息......
spring.rabbitmq.template.retry.multiplier=2
# 指定发送消息时默认的Exchange名
spring.rabbitmq.template.exchange=""
# 指定发送消息时默认的路由key
spring.rabbitmq.template.routing-key="test"

# 配置和消息监听器的容器工厂相关的属性--------------------------------------------------
# 指定监听器容器工厂的类型
spring.rabbitmq.listener.type=simple
# 指定消息的确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 指定获取消息失败时,是否重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 发送消息失败时,最多尝试重新发送消息的次数
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 发送消息失败后每隔1秒重新尝试发送消息
spring.rabbitmq.listener.simple.retry.initial-interval=1s

ContentUtil 常量工具类

package cn.ljh.app.rabbitmq.util;


//常量
public class ContentUtil
{
    //定义Exchange的常量-----fanout:扇形,就是广播类型
    public static final String EXCHANGE_NAME = "boot.fanout";

    //消息队列数组
    public static final String[] QUEUE_NAMES =new String[] {"queue_boot_01","queue_boot_02","queue_boot_03"};


}

RabbitMQConfig 配置式创建消息队列

package cn.ljh.app.rabbitmq.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


//配置式:在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列
//声明这个类为配置类
@Configuration
public class RabbitMQConfig
{

    //用配置式的方式在RabbitMQ中定义队列
    @Bean
    public Queue myQueue()
    {
        //在容器中配置一个 Queue Bean,Spring 就会为它在 RabbitMQ 中自动创建对应的 Queue
        return new Queue("queue_boot",   /* Queue 消息队列名 */
                true,         /* 是否是持久的消息队列 */
                false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */
                false,     /* 是否在没有消息的时候自动删除消息队列 */
                null       /* 额外的一些消息队列的参数 */
        );
    }
}

MessageService 发送消息的业务代码

声明Exchange 、Queue ,Exchange 绑定Queue,发送消息代码文章来源地址https://www.toymoban.com/news/detail-735659.html

package cn.ljh.app.rabbitmq.service;

import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;


//业务逻辑:声明Exchange 和 Queue 消息队列,发送消息的方法
@Service
public class MessageService
{
    //AmqpAdmin来管理Exchange、队列和绑定
    private final AmqpAdmin amqpAdmin;

    //AmqpTemplate来发送消息
    private final AmqpTemplate amqpTemplate;

    //通过有参构造器进行依赖注入
    public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate)
    {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;

        //由于声明 Exchange 、 队列 、 绑定(Exchange绑定队列),都只需要做一次即可,因此放在此处构造器中完成即可
        //创建 fanout 类型的 Exchange ,使用FanoutExchange实现类
        FanoutExchange exchange = new FanoutExchange(
                ContentUtil.EXCHANGE_NAME,
                true,    /* Exchange是否持久化 */
                false, /* 是否自动删除 */
                null   /* 额外的参数属性 */
        );
        //声明 Exchange
        this.amqpAdmin.declareExchange(exchange);


        //此处循环声明 Queue ,也相当于代码式创建 Queue
        for (String queueName : ContentUtil.QUEUE_NAMES)
        {
            Queue queue = new Queue(queueName,   /* Queue 消息队列名 */
                    true,         /* 是否是持久的消息队列 */
                    false,       /* 是否是独占的消息队列,独占就是是否只允许该消息消费者消费该队列的消息 */
                    false,     /* 是否在没有消息的时候自动删除消息队列 */
                    null       /* 额外的一些消息队列的参数 */
            );
            //此处声明 Queue ,也相当于【代码式】创建 Queue
            this.amqpAdmin.declareQueue(queue);

            //声明 Queue 的绑定
            Binding binding = new Binding(
                    queueName,  /* 指定要分发消息目的地的名称--这里是要发送到这个消息队列里面去 */
                    Binding.DestinationType.QUEUE, /* 分发消息目的的类型,指定要绑定 queue 还是 Exchange */
                    ContentUtil.EXCHANGE_NAME, /* 要绑定的Exchange */
                    "x", /* 因为绑定的Exchange类型是 fanout 扇形(广播)模式,所以路由key随便写,没啥作用 */
                    null
                    );
            //声明 Queue 的绑定
            amqpAdmin.declareBinding(binding);
        }
    }

    //发送消息的方法
    public void publish(String content)
    {
        //发送消息
        amqpTemplate.convertAndSend(
                ContentUtil.EXCHANGE_NAME, /* 指定将消息发送到这个Exchange */
                "",  /* 因为Exchange是fanout 类型的(广播类型),所以写什么路由key都行,都没意义 */
                content /* 发送的消息体 */
        );
    }
}

PublishController.java 发送消息的控制层

package cn.ljh.app.rabbitmq.controller;

import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

//发送消息
@RestController
public class PublishController
{
    private final MessageService messageService;
    //有参构造器进行依赖注入
    public PublishController(MessageService messageService)
    {
        this.messageService = messageService;
    }

    @GetMapping("/publish/{message}")
    //因为{message}是一个路径参数,所以方法接收的时候需要加上注解 @PathVariable
    public String publish(@PathVariable String message)
    {
        //发布消息
        messageService.publish(message);
        return "消息发布成功";
    }

}

MyRabbitMQListener 监听器,监听消息队列

package cn.ljh.app.rabbitmq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//监听器:监听消息队列并进行消费
@Component
public class MyRabbitMQListener
{
    //queues 指定监听已有的哪个消费队列
    @RabbitListener(queues = "queue_boot_01")
    public void onQ1Message(String message)
    {
        System.err.println("从 queue_boot_01 消息队列接收到的消息:" + message);
    }

    //queues 指定监听已有的哪个消费队列
    @RabbitListener(queues = "queue_boot_02")
    public void onQ2Message(String message)
    {
        System.err.println("从 queue_boot_02 消息队列接收到的消息:" + message);
    }

    //queues 指定监听已有的哪个消费队列
    //还可以用 queuesToDeclare 直接声明并监听该队列,还可以用 bindings 进行Exchange和queue的绑定
    @RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03"
            ,durable = "true"
            ,exclusive = "false"
            ,autoDelete = "false"),

            admin = "amqpAdmin" /*指定声明Queue,绑定Queue所用的 amqpAdmin,不指定的话就用容器中默认的那一个 */
    )
    public void onQ3Message(String message)
    {
        System.err.println("从 queue_boot_03 消息队列接收到的消息:" + message);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
    </parent>
    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmq_boot</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq_boot</name>
    <properties>
        <java.version>11</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- RabbitMQ 的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- web 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 开发者工具的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <!-- lombok 依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

到了这里,关于207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 90、RabbitMQ如何确保消息发送?消息接收?

    信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。 一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID) 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack (未确认) 消息给生产者。

    2024年02月16日
    浏览(44)
  • 搭建RabbitMQ消息服务,整合SpringBoot实现收发消息

    作者主页 :Designer 小郑 作者简介 :3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN博客专家,蓝桥云课认证讲师。 消息队列是一种在应用程序之间传递数据的通信机制 ,它基于 发布-订阅 模式,将消息发送者(发布者)和消息接收者

    2024年02月09日
    浏览(53)
  • 如何使用RabbitMQ发送和接收消息

    本文介绍了如何使用RabbitMQ的Python客户端库pika来发送和接收消息,并提供了示例代码。读者可以根据自己的需求修改代码,例如修改队列名称、发送不同的消息等。 RabbitMQ 是一个开源的消息队列软件,可以用于在应用程序之间传递消息。下面是一个使用 RabbitMQ 的流程和代码

    2024年02月15日
    浏览(44)
  • RabbitMQ如何保证消息的发送和接收

    一、RabbitMQ如何保证消息的发送和接收 1.ConfirmCallback方法 ConfirmCallback是一个回调接口,消息发送到broker后触发回调,确认消息是否到达broker服务器,也就是只确认消息是否正确到达Exchange交换机中。 2.ReturnCallback方法 通过实现ReturnCallback接口,启动消息失败返回,此接口是在交

    2024年02月15日
    浏览(42)
  • 使用C#和RabbitMQ发送和接收消息

    通过NuGet安装 RabbitMQ.Client 以下是一个简单的示例代码,演示如何使用 C# 和 RabbitMQ 客户端库来发送和接收消息: durable持久化 durable 参数用于指定队列是否是持久化的。 当 durable 参数设置为 true 时,表示队列是持久化的。持久化的队列会在RabbitMQ服务器重启后仍然存在,确保

    2024年02月11日
    浏览(41)
  • 如何使用 RabbitMQ 进行消息的发送和接收

    1、创建连接工厂: 2、创建交换器和队列: 3、发送消息: 4、接收消息: 在上述示例中,我们创建了一个连接工厂,并通过它建立与 RabbitMQ 服务器的连接和通道。然后,我们声明了一个直连型交换器和一个队列,并将它们绑定在一起。接下来,我们使用basicPublish方法发送消

    2024年04月22日
    浏览(48)
  • RabbitMQ+springboot用延迟插件实现延迟消息的发送

    延迟队列:其实就是死信队列中消息过期的特殊情况 延迟队列应用场景: 可以用死信队列来实现,不过死信队列要等上一个消息消费成功,才会进行下一个消息的消费,这时候就需要用到延迟插件了,不过要线在docker上装一个插件 前置条件是在Docker中部署过RabbitMq。 1、打开

    2024年02月10日
    浏览(48)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(48)
  • C#使用RabbitMQ发送和接收消息工具类

    下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码: 通过NuGet安装 RabbitMQ.Client

    2024年02月11日
    浏览(52)
  • rabbitMQ:绑定Exchange发送和接收消息(direct)

    AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息 发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,

    2024年02月15日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包