SpringBoot教程(十五) | SpringBoot集成RabbitMq

这篇具有很好参考价值的文章主要介绍了SpringBoot教程(十五) | SpringBoot集成RabbitMq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。

rabbitMq的官网: rabbitmq.com/

rabbitMq的安装这里先略过,因为我尝试了几次都失败了,后面等我安装成功了会把详细的文章发出来。目前是使用公司的环境进行的调试。

1. 一些概念

RabbitMQ是一个开源的消息代理和队列服务器,用来实现各个应用服务间的数据共享(跨平台 ,跨语言)。RabbitMQ是使用erlang语言编写的,并且基于AMQP协议实现。

所有的消息队列产品模型抽象上来说,都是类似的过程。生产者创建消息,然后发布到消息队列中,由消费者进行消费。

而rabbitMQ也是类似的,有生产者,消费者角色。其内部结构如下图所示。

SpringBoot教程(十五) | SpringBoot集成RabbitMq

那么接下来我们就来介绍一下RabbitMQ中的这些概念。

1. Message:

消息,就是我们需要传递和共享的信息,消息由一些列的可选属性组成,包括路由键,优先级,是否持久化等信息

2. Publisher

消息的生产者,也是一个向交换机发布消息的客户端应用程序。

3. Exchange:

交换机,这是RabbitMQ中的一个非常重要的概念,在rabbitMq中,生产者产生的消息都不是直接发送到队列中去的,而是发送到了交换机中,交换机会通过一定的规则绑定队列,交换机会根据相应的路由规则发送给对服务器中的队列。

4. Binding:

绑定, 用于交换机和消息列队之间的关联。一个绑定就是基于路由键(routing-key)将交换机和消息队列连接起来的路由规则。所以可以将交换机理解成一个有绑定有成的路由表。

5. Queue:

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列中。消息一直在对队列里边,等待消费者连接到这个队列将其消费。

6. Connection:

网络连接,比如一个TCP连接。

7. Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是简历在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统过来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

8. Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用。

9. Virtual Host

虚拟主机,标识一批交换机、消息队列和相关对象。 虚拟主机是相同的身份认证和加密环境的独立服务器域。 每个vhost本质就是一个mini版的rabbitMQ服务器,拥有自己的队列,交换机,绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ的默认vhost是/.

10. Broker

标识消息队列服务器实体。

2. Exchange类型

Exchange分发消息的时候根据类型的不同分发策略有所区别,目前常见的有四种类型: direct、fanout、topic、headers。 headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完成一直但是性能差很多,几乎用不到了,所以直接看另外三种类型。

2.1 direct交换机

SpringBoot教程(十五) | SpringBoot集成RabbitMq

消息中的路由键(routing key)如果和Binding中的bing key一致,交换机就将消息发送到队列的队列中。路由键要完全匹配,单个传播。

2.2 fanout

SpringBoot教程(十五) | SpringBoot集成RabbitMq

每个发到fanout类型交换机的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

2.3 topic

SpringBoot教程(十五) | SpringBoot集成RabbitMq

topic交换机通过模式匹配分配路由的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用.隔开。它同样会识别两个通配符: # 和* 。 #匹配0个或多个单词, * 匹配一个单词

3. springBoot集成RabbitMQ

SpringBoot集成rabbitMQ还是比较简单的,因为springBoot使用RabbitTemplate对常用操作进行了封装。

接下来我们来看一下集成过程。首先导入依赖。

<!-- 无需在parent的配置文件中添加 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在springBoot的配置文件 application.yml中配置rabbitMQ连接信息

server:
  port: 7890

spring:
  rabbitmq:
    host: 172.15.33.52
    port: 5672
    username: root
    password: 123456

接下来我们我们分三种交换机进行演示。

3.1 direct

首先是配置类,在配置类中我们需要声明交换机,队列和绑定关系。

@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_QUEUE = "directQueue";
    public static final String DIRECT_QUEUE2 = "directQueue2";
    public static final String DIRECT_EXCHANGE = "directExchange";
    public static final String DIRECT_ROUTING_KEY = "direct";

    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);
    }

    @Bean
    public Queue directQueue2() {
        return new Queue(DIRECT_QUEUE2, true);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    @Bean
    public Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

}

这里我们创建了一个叫directExchange的交换机,绑定了directQueue和directQueue2两个队列,路由键是direct.

消息的生产者,我们通过一个Controller来进行模拟,直接引用rabbitTemplate

@RestController
@Slf4j
@RequestMapping("/direct")
public class DirectController {

    private final RabbitTemplate rabbitTemplate;

    public DirectController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * direct交换机为直连模式交换机
     *      根据消息携带的路由键将消息投递给对应队列
     *
     *
     * @return
     */
    @GetMapping("send")
    public Object sendMsg() {
        rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");
        return "direct消息发送成功!!";
    }
}

当我在浏览器访问对应连接的时候,就会生产一条消息发送到directExchange交换机,路由key为:direct, 消息内容为:发送一条测试消息:direct

接下来我们来看消息的消费者。

package com.lsqingfeng.action.rabbitmq.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @className: DirectQueueListener
 * @description: 直连交换机的监听器
 * @author: sh.Liu
 * @date: 2021-08-23 16:03
 */
@Slf4j
@Component
public class DirectQueueListener {

    /**
     * 尽管设置了两个消费者,但是只有一个能够消费成功
     * 多次发送则轮训消费:
     * DirectReceiver消费者收到消息1  : 发送一条测试消息:direct
     * DirectReceiver消费者收到消息2  : 发送一条测试消息:direct
     * DirectReceiver消费者收到消息1  : 发送一条测试消息:direct
     * DirectReceiver消费者收到消息2  : 发送一条测试消息:direct
     *
     * 一个交换机可以绑定多个队列。如果通过路由key可以匹配到多个队列,消费的时候也只能有一个进行消费
     * @param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
    public void process(String testMessage) {
        System.out.println("DirectReceiver消费者收到消息1  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
    public void process2(String testMessage) {
        System.out.println("DirectReceiver消费者收到消息2  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("DirectReceiver消费者收到消息3  : " + testMessage);
    }

}

当我们访问浏览器生产消息会,观察控制台结果:

DirectReceiver消费者收到消息1 : 发送一条测试消息:direct DirectReceiver消费者收到消息3 : 发送一条测试消息:direct

在发送一次:

DirectReceiver消费者收到消息3 : 发送一条测试消息:direct DirectReceiver消费者收到消息2 : 发送一条测试消息:direct

由于我们又两个队列都绑定了交换机,且routeKey一样,所以会打印两条。要注意direct只有routeKey完全匹配的时候才能被消费,同时每个队列中的消息只会 被消费一次。

3.2 fanout

配置类:

@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_QUEUE = "fanoutQueue";
    public static final String FANOUT_QUEUE2 = "fanoutQueue2";
    public static final String FANOUT_QUEUE3 = "fanoutQueue3";
    public static final String FANOUT_EXCHANGE = "fanoutExchange";
    public static final String FANOUT_ROUTING_KEY = "fanout";

    @Bean
    public Queue fanoutQueue() {
        return new Queue(FANOUT_QUEUE, true);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2, true);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

这里也是用一个Fanout类型的交换机绑定了两个队列,要注意在这种模式下,是不需要指定routing-Key的,因为所有绑定的队列都会收到消息。

生产者代码如下:

@RestController
@Slf4j
@RequestMapping("/fanout")
public class FanoutController {

    private final RabbitTemplate rabbitTemplate;

    public FanoutController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * fanout交换机为扇形模式交换机
     *      消息会发送到所有绑定的队列上。
     * @return
     */
    @GetMapping("send")
    public Object sendMsg() {
        rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, "发送一条测试消息:fanout");
        return "fanout消息发送成功!!";
    }
}

消息的消费者:

@Slf4j
@Component
public class FanoutQueueListener {

    /**
     * fanout交换机: 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列
     * 同一个队列监听多次,只会消费一次。
     * 交换机绑定的多个队列都可以收到消息
     * @param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
    public void process(String testMessage) {
        System.out.println("FanoutReceiver消费者收到消息1  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
    public void process2(String testMessage) {
        System.out.println("FanoutReceiver消费者收到消息2  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("FanoutReceiver消费者收到消息3  : " + testMessage);
    }

}

打印结果:

FanoutReceiver消费者收到消息1 : 发送一条测试消息:fanout FanoutReceiver消费者收到消息3 : 发送一条测试消息:fanout

因为方法1和方法2监听的是同一个队列,只有一个可以消费成功。多次执行,两个方法交替执行。

3.3 topic

主题交换机,会根据routing-Key的匹配规则,将消息发送到符合规则的队列中。

配置类:

/**
 * @className: TopicExchangeConfig
 * @description:
 * *  (星号) 用来表示一个单词 (必须出现的)
 * #  (井号) 用来表示任意数量(零个或多个)单词
 * @author: sh.Liu
 * @date: 2021-08-23 15:49
 */
@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_QUEUE = "topicQueue";
    public static final String TOPIC_QUEUE2 = "topicQueue2";
    public static final String TOPIC_QUEUE3 = "topicQueue3";
    public static final String TOPIC_EXCHANGE = "topicExchange";
    public static final String TOPIC_ROUTING_KEY = "topic*";

    @Bean
    public Queue topicQueue() {
        return new Queue(TOPIC_QUEUE, true);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2, true);
    }

    @Bean
    public Queue topicQueue3() {
        return new Queue(TOPIC_QUEUE3, true);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
    }

    @Bean
    public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");
    }

    @Bean
    public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#");
    }
}

这里要注意我们的绑定管关系。分别是topic.#, test.*, #

#: 代表所有,* 代表有且只有一个。

消息的发送者,我们将routingKey作为参数方便我们看效果:

@RestController
@Slf4j
@RequestMapping("/topic")
public class TopicController {

    private final RabbitTemplate rabbitTemplate;

    public TopicController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @GetMapping("send")
    public Object sendMsg(String routingKey) {
        rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "发送一条测试消息:topic");
        return "topic消息发送成功!!";
    }
}

消息的消费者:

/**
 * @className: TopicQueueListener
 * @description: 主题交换机的监听器
 * @author: sh.Liu
 * @date: 2021-08-23 16:03
 */
@Slf4j
@Component
public class TopicQueueListener {

    /**
     * topic: 主题交换机
     * @param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
    public void process(String testMessage) {
        System.out.println("TopicReceiver消费者收到消息1  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
    public void process2(String testMessage) {
        System.out.println("TopicReceiver消费者收到消息2  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("TopicReceiver消费者收到消息3  : " + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)
    public void process4(String testMessage) {
        System.out.println("TopicReceiver消费者收到消息4  : " + testMessage);
    }

}

请求:http://localhost:7890/topic/send?routingKey=test.a

结果:

TopicReceiver消费者收到消息3 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

代表: test.* 和 # 与路由key匹配成功

请求:http://localhost:7890/topic/send?routingKey=topic.123

TopicReceiver消费者收到消息1 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

代表: topic.# 和 # 匹配成功

请求: http://localhost:7890/topic/send?routingKey=test

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

test.* 后面必须要有一个单词

请求: http://localhost:7890/topic/send?routingKey=test.aaa

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic TopicReceiver消费者收到消息3 : 发送一条测试消息:topic

test.*和 #匹配成功

请求:http://localhost:7890/topic/send?routingKey=test.aaa.b

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

只对# 匹配成功, 因为test.*只能匹配一个单词,aaa.b代表两个


 文章来源地址https://www.toymoban.com/news/detail-493932.html

到了这里,关于SpringBoot教程(十五) | SpringBoot集成RabbitMq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot集成rabbitmq

    RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,

    2024年02月13日
    浏览(31)
  • 【SpringBoot笔记29】SpringBoot集成RabbitMQ消息队列

    这篇文章,主要介绍SpringBoot如何集成RabbitMQ消息队列。 目录 一、集成RabbitMQ 1.1、引入amqp依赖 1.2、添加连接信息 1.3、添加RabbitMQ配置类

    2023年04月08日
    浏览(55)
  • Springboot集成rabbitmq——实现延迟队列

    目录 1.rabbitmq简介 2.延迟队列 3.Springboot集成rabbitmq 4.以死信队列形式实现 5.以插件形式实现  MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用

    2024年02月05日
    浏览(39)
  • SpringBoot集成RabbitMQ(生产者)

    默认读者已经对SpringBoot和RabbitMQ比较熟悉 SpringBoot集成RabbitMQ(生产者)的步骤如下: 创建SpringBoot工程 Maven添加 spring-boot-starter-amqp 编写application.properties配置RabbitMQ的信息 编写交换机、队列、绑定配置类 在业务逻辑代码中注入RabbitTemplate 调用RabbitTemplate的方法,完成消息推送

    2024年02月15日
    浏览(39)
  • 【RabbitMQ与SpringBoot集成测试收发消息】

    安装环境:虚拟机VMWare + Centos7.6 + Maven3.6.3 + JDK1.8 RabbitMQ版本:rabbitmq-server-3.8.8-1.el7.noarch.rpm 编程工具Idea + 运行JDK为17 在RabbitMQ的UI界面或命令行上 创建新的Virtual Host ,取名为 vhtest02 ,如下图所示: 使用Idea的 Spring Initializr 创建生产者工程 springrabbitmqtest ,坐标如下: 配置

    2024年02月13日
    浏览(38)
  • SpringBoot集成常用第三方框架-RabbitMQ

    作者主页:编程指南针 作者简介:Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容:Java项目、Python项目、前端项目、人工智能与大数据、简历模板、学习资料、面试题库

    2024年01月17日
    浏览(57)
  • SpringBoot集成RabbitMQ之ACK确认机制(第三节)

    目录 开始语 📝简述 🗒️模式NONE application配置 生产者 消费者 结果验证 🗒️模式AUTO application配置 生产者 消费者 结果验证 🗒️模式ACK(重点) application配置 生产者 消费者 结果验证 🗒️生产者确认机制 yml添加配置 修改生产者代码 结果验证 结束语 一位普通的程序员,

    2024年02月04日
    浏览(34)
  • 简单的RabbitMQ集成Springboot实现订单异步发送功能示例以及RabbitMQ的相关问题

    引入RabbitMQ的依赖,在pom.xml文件中添加以下代码: 在application.properties文件中配置RabbitMQ的相关信息: 创建消息队列,并定义消息接收者: 定义消息发送者: 在需要发送订单信息的地方调用OrderSender的send方法即可: RabbitMQ是一个开源的消息中间件,主要用于实现应用之间的异

    2024年02月09日
    浏览(33)
  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(48)
  • 微服务集成RabbitMq保姆级教程

     本文通过简单的示例代码和说明,让读者能够了解微服务如何集成RabbitMq 之前的教程 https://www.cnblogs.com/leafstar/p/17641358.html 在这里我将介绍Centos中通过docker进行安装RabbitMq   1.首先你已经有一台可以使用的虚拟机(教程很多)   2.yum install docker -y    3.拉取docker镜像   4.开启

    2024年02月12日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包