SpringCloud源码探析(六)-消息队列RabbitMQ

这篇具有很好参考价值的文章主要介绍了SpringCloud源码探析(六)-消息队列RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.概述

RabbitMQ是一个开源的消息代理和队列服务器,它是基于Erlang语言开发,并且是基于AMQP协议的。由于Erlang语言最初使用与交换机领域架构,因此使得RabbitMQ在Broker之间的数据交互具有良好的性能。AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种消息队列应用层协议,专门面向消息的中间件而设计,类似于JAVA的JMS协议,基于此规范能够开发出各种各样的消息中间件。RabbitMQ能够与SpringAMQP完美整合,具有较强的扩展性和丰富的API,且具有高可靠性和低延时的优点,被业界广泛使用。本文将介绍SpringAMQP与Rabbit的使用,帮助读者更好地理解消息队列。

2.消息队列RabbitMQ使用

2.1 MQ对比

SpringCloud源码探析(六)-消息队列RabbitMQ
由上图可知,从可靠性和消息延迟的角度来说,RabbitMQ都是较为突出的,从吞吐量的角度来说可能一般。因此在选用消息队列中间件时,应根据使用场景选择更为合适的。

2.2 AMQP核心概念

名称 概念
Server/Broker 可以理解为消息队列实体
VHost 虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离
Exchange 消息交换机,它指定消息按一定规则,路由到对应队列
Queue 消息队列载体,每个消息可能会被投到一个或多个队列
Producer 消息生产者,消息投递方
Consumer 消息消费者,接收消息的程序
Binding 绑定器,它将exchange和queue按照路由规则绑定起来
channel 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
Routing Key 路由关键字,exchange根据关键字将消息投递到指定queue

2.3 RabbitMQ架构图

SpringCloud源码探析(六)-消息队列RabbitMQ

由上图可知,生产者通过channel连接到Broker,将消息投送到Exchange,Exchange再按照指定规则将消息投送到对应的queue,消费者通过监听指定的queue来获取消息。生产者不需要关注投递到那个队列,消费者也不关心消息是从哪个Exchange发送而来,两者之间是松耦合的关系。Exchange的类型有:Fanout交换机、Direct交换机、Topic交换机。Fanout交换机是一种广播模式,消息进来时会投递到所有与之绑定的队列。Direct交换机是完全根据key进行匹配队列,key一致时才会投送。Topic交换机可以按一定的规则进行匹配key,匹配成功进行投递。

2.4 SpringBoot中使用RabbitMQ

2.4.1 引入pom文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.4.2 添加配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

2.4.3 编写生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitApplication.class)
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, world!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

2.4.3 编写消费者

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "simple.queue")
    public void receiveMessage(String message) {
        System.out.println("接收到消息:" + message);
    }
}

2.4.4 测试结果

SpringCloud源码探析(六)-消息队列RabbitMQ
运行完成测试代码后,消费者收到消息。上述代码展示的是RabbitMQ最基础的发送和接收模型,生成者将消息发送到指定队列,消费者订阅指定队列获取消息。

2.5 RabbitMQ核心消息发送模型

2.5.1 WorkQueue模型

该模型一个队列可以对应多个消费者,适用于发送者发送大量消息,容易发送积压,因此需要多个消费者来消费。
SpringCloud源码探析(六)-消息队列RabbitMQ
2.workQueue测试代码
测试类:

    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "work.queue";
        // 消息
        String message = "hello, world";

        for (int i = 1; i <= 50; i++) {
            // 发送消息
            String messageInfo = message + "..." + i;
            rabbitTemplate.convertAndSend(queueName, messageInfo);
            log.info("消息发送成功:{}", messageInfo);
            Thread.sleep(50);
        }
    }

消费代码:

    @RabbitListener(queues = "work.queue")
    public void receiveWorkMessage1(String message) throws InterruptedException {
        System.out.println("消息队列1...接收到消息:" + message);
        Thread.sleep(10);
    }

    @RabbitListener(queues = "work.queue")
    public void receiveWorkMessage2(String message) throws InterruptedException {
        System.out.println("消息队列2...接收到消息:" + message);
        Thread.sleep(15);
    }

3.workQueue运行结果
SpringCloud源码探析(六)-消息队列RabbitMQ
SpringCloud源码探析(六)-消息队列RabbitMQ

2.5.2 FanoutExchange模型

1.FanoutExchange消息发送模型
SpringCloud源码探析(六)-消息队列RabbitMQ
FanoutExchanger模型比简单模型和WorkQueue模型多了一个交换机(Exchange),消息先会发送到Exchange,然后Exchange将消息路由到每一个与其绑定的queue。
2.代码配置

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }

    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}

发送部分测试代码:

  @Test
    public void testFanout() {
        // 队列名称
        String exchangeName = "fanout.exchange";
        // 消息
        String message = "hello, world!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"", message);
    }

3.运行结果
SpringCloud源码探析(六)-消息队列RabbitMQ

2.5.3 DirectExchange模型

DirectExchange的消息发送模式与FanoutExchange模型基本一致,区别在于它会将收到的消息根据规则路由到指定的Queue,也就是说中间多了一层根据规则筛选发送队列,它的每一个Queue都与Exchange设置一个BindingKey,发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
1.代码如下
监听者部分代码如下:

 @RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))
    public void receiveDirectQueue1(String message) throws InterruptedException {
        System.out.println("消息队列1...接收到消息:" + message);
    }


    @RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))
    public void receiveDirectQueue2(String message) throws InterruptedException {
        System.out.println("消息队列2...接收到消息:" + message);
    }

测试代码如下:

 @Test
    public void testDirect() {
        // 队列名称
        String exchangeName = "direct.exchange";
        // 消息
        String message = "hello, world!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"red", message);
    }

    @Test
    public void testDirectYellow() {
        // 队列名称
        String exchangeName = "direct.exchange";
        // 消息
        String message = "hello, world!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"yellow", message);
    }

2.测试结果
SpringCloud源码探析(六)-消息队列RabbitMQ
SpringCloud源码探析(六)-消息队列RabbitMQ
由上述结果可知,当队列订阅同一个Exchange时,向指定Exchange并携带RoutingKey,只有包含对应RoutingKey的队列才能收到消息。

2.5.4 TopicExchange模型

TopicExchange与DirectExchange类似,区别在于RoutingKey必须是多个单词的列表,并且以.分割,Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词
*:代指一个单词。

1.测试代码

 @RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC),key = {"#.news"}))
    public void receiveTopicQueue2(String message) throws InterruptedException {
        System.out.println("消息队列2...接收到消息:" + message);
    }

发送消息部分代码:

@Test
    public void testTopicQueue() {
        // 队列名称
        String exchangeName = "topic.exchange";
        // 消息
        String message = "湖人总冠军";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"sports.news", message);
    }

2.测试结果
SpringCloud源码探析(六)-消息队列RabbitMQ

3.小结

1.RabbitMQ是一个遵循AMQP协议的消息中间件,消息从生产者发送到消费者,他能根据规则指定消息发送对象,缓存或进行持久化;
2.Spring AMQP是基于AMQP协议开放的接口,能够无缝对接各种基于AMQP的协议,快速利用Spring进行开发;
3.RabbitMQ延时低,而可靠性高,适用于吞吐量较大且对信息实时性要求较高的场景。

4.参考文献

1.https://www.bilibili.com/video/BV1LQ4y127n4
2.https://juejin.cn/post/6844903903637536775
3.https://www.cnblogs.com/tinmh/p/6026726.html

5.附录

https://gitee.com/Marinc/nacos.git文章来源地址https://www.toymoban.com/news/detail-446356.html

到了这里,关于SpringCloud源码探析(六)-消息队列RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringCloud-基于SpringAMQP实现消息队列

    Spring AMQP作为Spring框架的一部分,是一套用于支持高级消息队列协议(AMQP)的工具。AMQP是一种强大的消息协议,旨在支持可靠的消息传递,特别适用于构建分布式系统。Spring AMQP构建在RabbitMQ之上,提供了在微服务架构中进行异步通信和消息传递的强大机制。 这个框架的设计

    2024年03月13日
    浏览(35)
  • RabbitMq消息模型-队列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 队列消息特点: 消息不会丢失 并且 有先进先出的顺序。 消息接收是有顺序的,不是随机的,仅有一个消费者能拿到数据,而且不同消费者拿不到同一份数据。 基本模型: SimpleQueue 在上图的模型中,有以下几个概念: P:为生产

    2024年02月09日
    浏览(32)
  • 【RabbitMQ】消息队列-RabbitMQ篇章

    RabbitMQ是一个开源的 遵循AMQP协议 实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中 存储消息,转发消息 ,具有 高可用 , 高可扩性 , 易用性 等特征。 1.1、RabbitMQ—使用场景 一般场景 像一般的下订单业务如下图: 将订单信息写入数据库成功后,发

    2024年02月12日
    浏览(35)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(49)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(43)
  • RabbitMQ 消息中间件 消息队列

    RabbitMQ 1、RabbitMQ简介 RabbiMQ是⽤Erang开发的,集群⾮常⽅便,因为Erlang天⽣就是⼀⻔分布式语⾔,但其本身并 不⽀持负载均衡。支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 2、RabbitMQ 特点 可

    2024年02月03日
    浏览(44)
  • 3.精通RabbitMQ—消息队列、RabbitMQ

    RabbitMQ面试题 (总结最全面的面试题) 入门RabbitMQ消息队列,看这篇文章就够了 消息队列 是一种基于 队列 ,用于解决 不同进程或应用 之间 通讯 的 消息中间件 。 支持多种 消息传递模式 ,如 队列模型 、 发布/订阅模型 等。 业务解耦 :通过 发布/订阅 模式,减少系统的 耦

    2024年02月15日
    浏览(61)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的堆积问题 —— 使用惰性队列解决消息的堆积问题

    消息的堆积问题是指在消息队列系统中,当生产者以较快的速度发送消息,而消费者处理消息的速度较慢,导致消息在队列中积累并达到队列的存储上限。在这种情况下,最早被发送的消息可能会在队列中滞留较长时间,直到超过队列的容量上限。当队列已满且没有更多的可

    2024年02月05日
    浏览(37)
  • 消息队列|RabbitMQ入门概述

    MQ(message queue) ,在互联网架构中,MQ 是一种非常常见的上下游“ 逻辑解耦+物理解耦 ”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。MQ多用于分布式系统之间进行通信。 换句话说: 有一个大的系统由A系统和B系统组成,A系统先将数据发

    2024年02月02日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包