利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

这篇具有很好参考价值的文章主要介绍了利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、前期项目环境准备

1.1父项目以及子项目

1.2配置pom.xml

1.3配置application.yml

二、扇出(Fanout) 交换机实现消息的发送和接收

2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息

2.1.1consumer子项目结构

2.1.2FanoutConfig类的实现扇出(Fanout)交换机、队列以及交换机和队列的绑定的创建

2.1.3springRabbitListener类实现Fanout消息的接收

2.1.4运行consumer子项目

2.2编写子项目publisher的代码实现扇出(Fanout)交换机的发送消息

2.2.1publisher子项目结构

2.2.2编写SpringAmqpTest测试类代码

2.2.3运行该测试类,然后回到consumer子项目的两个队列是否都能够接收到该消息

三、订阅(Direct)交换机实现消息的发送和接收

3.1编写子项目consumer(消费者,接收消息)的代码实现扇出订阅(Direct)交换机接收消息

3.1.1在springRabbitListener类添加以下两个Direct队列方法

3.1.2再次重新运行consumer子项目

​3.2编写子项目publisher的代码实现扇出(Direct)交换机的发送消息

3.2.1在SpringAmqpTest测试类继续添加下面代码

3.2.2运行该测试类,然后回到consumer子项目的key拥有red的监听队列是否都够接收到该消息

四、主题(Topic)交换机实现消息的发送和接收

4.1编写子项目consumer(消费者,接收消息)的代码实现扇出主题(Topic)交换机接收消息

4.1.1在springRabbitListener类添加以下两个Topic队列方法

4.1.2再次重新运行consumer子项目

4.2编写子项目publisher的代码实现扇出(Topic)交换机的发送消息

4.2.1在SpringAmqpTest测试类继续添加下面代码

4.2.2运行该测试类,然后回到consumer子项目的key拥有.news的监听队列是否都够接收到该消息

五、总结

5.1交换机的作用是什么?

5.2声明队列、交换机、绑定关系的Bean是什么?

5.3描述下Direct交换机与Fanout交换机的差异?

5.4基于@RabbitListener注解声明队列和交换机有哪些常见注解?

5.5描述下Direct交换机与Topic交换机的差异?


一、前期项目环境准备

1.1父项目以及子项目

创建父项目mq-demo,以及两个子项目publisher(生产者,发送消息)和consumer(消费者,接收消息

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

1.2配置pom.xml

在父项目mq-demo的pom.xml引入以下的amqp依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

1.3配置application.yml

在两个子项目的application.yml都进行以下的配置(设置连接参数,分别是:主机名、端口号、用户名、密码、virtual-host):

spring:
  rabbitmq:
    host: 192.168.22.134
    port: 5672 #发送信息的端口,不是启动rabbitmq管理页面的端口(15672)
    username: itcast
    password: 123321
    virtual-host: /

二、扇出(Fanout) 交换机实现消息的发送和接收

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

在扇出(广播)模式下,消息发送流程是这样的:

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息  

2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息

2.1.1consumer子项目结构

其中config包是配置包,listener包是监听消息以便接收的包

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

2.1.2FanoutConfig类的实现扇出(Fanout)交换机、队列以及交换机和队列的绑定的创建

分别创建一个名为itcast.fanout的交换机、一个名为fanout.queue1的队列、一个名为fanout.queue2d1队列以及交换机和两个队列之间的绑定。

@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    // 声明第1个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列1和交换机
    @Bean
    public Binding fanoutBing1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 声明第2个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //绑定队列2和交换机
    @Bean
    public Binding fanoutBing2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

2.1.3springRabbitListener类实现Fanout消息的接收

 利用@RabbitListeneer注解的queues属性="队列名称",来对队列的消息进行监听。

@Component
public class springRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void ListenerFanoutQueue1(String msg) {
        System.out.println("消费者接收到fanout.queue1的消息【"+msg+"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void ListenerFanoutQueue2(String msg) {
        System.out.println("消费者接收到fanout.queue2的消息【"+msg+"】");
    }

}

2.1.4运行consumer子项目

在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.fanout交换机、名为fanout.queue1队列和名为fanout.queue2是否创建成功。

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

2.2编写子项目publisher的代码实现扇出(Fanout)交换机的发送消息

2.2.1publisher子项目结构

 publisher子项目结构比consumer子项目结构而言相对简单了,在publisher子项目结构中我们只需编写@Test类来完成消息的发送,然后看看consumer子项目是否能够接收到该消息即可。

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

2.2.2编写SpringAmqpTest测试类代码

需要指定交换机的名称和发送的消息,然后利用rabbitTemplate.convertAndSend()方法进行消息的发送

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName ="itcast.fanout";
        //消息
        String message = "hello, every one!";
        // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }


}

2.2.3运行该测试类,然后回到consumer子项目的两个队列是否都能够接收到该消息

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

三、订阅(Direct)交换机实现消息的发送和接收

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.1编写子项目consumer(消费者,接收消息)的代码实现扇出订阅(Direct)交换机接收消息

 为了编码的方便,我们就直接在子项目consumer下的listener包里面的springRabbitListener类使用注解的方式进行交换机、队列的创建和消息的监听了

3.1.1在springRabbitListener类添加以下两个Direct队列方法

这里我们直接使用@RabbitListener注解进行交换机、队列的创建和消息的监听。这里不是使用binding而是使用bindings进行构建,其中:

@Queue里面的name属性是队列的名称

@Exchange里面的name属性是交换机的名称,type属性是交换机的类型(这里是direct类型)

 key属性是指定消息的 RoutingKey,接收拥有相同的RoutingKey队列的消息

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",
                                 type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public  void ListenerDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息【"+msg+"】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",
                                 type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public  void ListenerDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息【"+msg+"】");
    }

3.1.2再次重新运行consumer子项目

 在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.direct交换机、名为direct.queue1队列和名为direct.queue2是否创建成功。

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

3.2编写子项目publisher的代码实现扇出(Direct)交换机的发送消息

  为了编码的方便,我们就直接在子项目publisher下的SpringAmqpTest测试类进行编码

3.2.1在SpringAmqpTest测试类继续添加下面代码

这里的 rabbitTemplate.convertAndSend()方法相对于Fanout的测试类来说,要在中间多填一个参数的名称,该名称就是RoutingKey(指定队列key),向监听消息队列也是该key的发送消息,比如这里的发送消息的RoutingKey为red,那么只有在监听消息队列的key也拥有red才能接收到这个消息

    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName ="itcast.direct";
        //消息
        String message = "hello, red!";
        // 发送消息,参数分别是:交互机名称、RoutingKey(指定队列key)、消息
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }

3.2.2运行该测试类,然后回到consumer子项目的key拥有red的监听队列是否都够接收到该消息

从上面的3.1.1编码可知,我们两个队列的key都拥有red,所以两个队列都接受到了该消息

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

四、主题(Topic)交换机实现消息的发送和接收

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

        #:匹配一个或多个词

        *:匹配不多不少恰好1个词

举例:

         item.#:能够匹配item.spu.insert 或者 item.spu

         item.*:只能匹配item.spu 

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

解释:

        Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather

        Queue4:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news

4.1编写子项目consumer(消费者,接收消息)的代码实现扇出主题(Topic)交换机接收消息

  为了编码的方便,我们就直接在子项目consumer下的listener包里面的springRabbitListener类使用注解的方式进行交换机、队列的创建和消息的监听了

4.1.1在springRabbitListener类添加以下两个Topic队列方法

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",
                                 type = ExchangeTypes.TOPIC),
            key = "China.#"
    ))
    public  void ListenerTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息【"+msg+"】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",
                                 type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public  void ListenerTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息【"+msg+"】");
    }

这里我们直接使用@RabbitListener注解进行交换机、队列的创建和消息的监听。这里不是使用binding而是使用bindings进行构建,其中:

@Queue里面的name属性是队列的名称

@Exchange里面的name属性是交换机的名称,type属性是交换机的类型(这里是topic类型)

 key属性是指定消息的 RoutingKey,接收拥有相同的RoutingKey队列的消息,但是不同的是,这里我们可以使用#或者*的通配符进行更方便的匹配发送和接收消息的队列

4.1.2再次重新运行consumer子项目

 在http://192.168.22.134:15672/(这里输入自己的ip地址和自己的RabbitMQ端口号)查看名为itcast.topic交换机、名为topic.queue1队列和名为topic.queue2是否创建成功。 

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

4.2编写子项目publisher的代码实现扇出(Topic)交换机的发送消息

  为了编码的方便,我们就直接在子项目publisher下的SpringAmqpTest测试类进行编码

4.2.1在SpringAmqpTest测试类继续添加下面代码

这里的 rabbitTemplate.convertAndSend()方法相对于Fanout的测试类来说,要在中间多填一个参数的名称,该名称就是RoutingKey(指定队列key),向监听消息队列也是该key的发送消息,比如这里的发送消息的RoutingKey为USA.news,那么只有在监听消息队列的key也拥有.news才能接收到这个消息

    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName ="itcast.topic";
        //消息
        String message = "今天是个好日子!";
        // 发送消息,参数分别是:交互机名称、RoutingKey(通配符(*代表一个,#代码0个或多个))、消息
        //rabbitTemplate.convertAndSend(exchangeName,"China.news",message);//两个队列都收到消息
        //rabbitTemplate.convertAndSend(exchangeName,"China.weather",message);//队列1收到消息
        rabbitTemplate.convertAndSend(exchangeName,"USA.news",message);//队列2收到消息
    }

4.2.2运行该测试类,然后回到consumer子项目的key拥有.news的监听队列是否都够接收到该消息

从上面的3.1.1编码可知,只有队列2是#.news,所以只有对了2接受到了该消息

rabbitmq 创建队列,java-rabbitmq,rabbitmq,java

五、总结

5.1交换机的作用是什么?

       接收publisher发送的消息

       将消息按照规则路由到与之绑定的队列

       不能缓存消息,路由失败,消息丢失

       FanoutExchange的会将消息路由到每个绑定的队列

5.2声明队列、交换机、绑定关系的Bean是什么?

        Queue、FanoutExchange、Binding 

5.3描述下Direct交换机与Fanout交换机的差异?

        Fanout交换机将消息路由给每一个与之绑定的队列

        Direct交换机根据RoutingKey判断路由给哪个队列

        如果多个队列具有相同的RoutingKey,则与Fanout功能类似

5.4基于@RabbitListener注解声明队列和交换机有哪些常见注解?

        @Queue和@Exchange

5.5描述下Direct交换机与Topic交换机的差异?

        Topic交换机接收的消息RoutingKey必须是多个单词,以.分割

        Topic交换机与队列绑定时的bindingKey可以指定通配符

   #:代表0个或多个词

   *:代表1个词

今天的知识就分享到这了,希望能够帮助到你~ 文章来源地址https://www.toymoban.com/news/detail-754307.html

到了这里,关于利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 「中间件」rabbitmq 消息队列基础知识

    RabbitMQ是一个消息队列软件,用于在应用程序之间转发消息。以下是RabbitMQ的基本概念: 消息:RabbitMQ中的消息是传递的基本单位,它由消息头和消息体组成。 队列(Queue):队列是消息的缓冲区,用于存储待处理的消息。 交换器(Exchange):交换器是接收生产者发送的消息并

    2024年02月07日
    浏览(40)
  • 深入详解高性能消息队列中间件 RabbitMQ

      目录 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 优势 4、RabbitMQ 整体架构剖析 4.1、发送消息流程 4.2、消费消息流程 5、RabbitMQ 应用 5.1、广播 5.2、RPC VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++软件异常排查从入

    2024年02月05日
    浏览(35)
  • 消息队列中间件 - Docker安装RabbitMQ、AMQP协议、和主要角色

    不管是微服务还是分布式的系统架构中,消息队列中间件都是不可缺少的一个重要环节,主流的消息队列中间件有RabbitMQ、RocketMQ等等,从这篇开始详细介绍以RabbitMQ为代表的消息队列中间件。 AMQP协议 AMQP协议是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与

    2024年02月03日
    浏览(38)
  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(40)
  • 消息队列(中间件)

    通信协议: 为了实现客户端和服务器之间的通信来完成的逻辑,基于TCP实现的自定义应用层协议。通过这个协议,完成客户端–服务器远程方法调用。 序列化/反序列化: 通过网络传输对象把对象存储到硬盘上。 序列化:把对象转化为二进制的数据序列,反序列化:把二进制数

    2024年02月07日
    浏览(37)
  • 消息队列中间件(一)

    流量削峰 应用解耦 异步处理 ActiveMQ 优:单机吞吐万级,时效性ms级,可用性高(主从架构),可靠性高(丢失率低) 缺:官方维护少,高吞吐场景较少使用 Kafka 大数据 - 数据采集,传输,存储 优:高吞吐量(百万级),时效性ms级,可用性高,日志成熟 缺:短轮询,失败

    2024年02月11日
    浏览(35)
  • 消息队列中间件介绍

    消息队列介绍   消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。 目前常见的消息中间件有ActiveMQ、Rabbi

    2024年02月04日
    浏览(33)
  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(56)
  • RabbitMQ消息中间件

    RabbitMQ消息中间件 RabbitMQ简介 windows下安装RabbitMQ RabbitMQ基本概念 RabbitMQ简单模式 RabbitMQ工作队列模式 RabbitMQ发布订阅模式 RabbitMQ路由模式 RabbitMQ主题模式 RabbitMQ RPC模式 RabbitMQ发布确认模式

    2024年02月10日
    浏览(40)
  • 消息中间件之RabbitMQ

    1.基于AMQP协议Erlang语言开发的一款消息中间件,客户端语言支持比较多, 比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高, 可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取 零拷贝技术,消息堆积时,性能会下降 2.消息吞吐量在

    2024年01月19日
    浏览(74)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包