RabbitMQ---订阅模型-Direct

这篇具有很好参考价值的文章主要介绍了RabbitMQ---订阅模型-Direct。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、 订阅模型-Direct

• 有选择性的接收消息
• 在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
• 在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。
例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
• 但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
• 在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
• 消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
RabbitMQ---订阅模型-Direct,Spring Cloud,rabbitmq,java-rabbitmq,后端,服务器,java-rabbitmq

• P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
• X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
• C1:消费者,其所在队列指定了需要routing key 为 error 的消息
• C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

1.1、生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

public class Send {
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明exchange,指定类型为direct
       channel.exchangeDeclare(EXCHANGE_NAME, "direct");
       // 消息内容
       String message = "商品新增了, id = 1001";
       // 发送消息,并且指定routing key 为:insert ,代表新增商品
       channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
       System.out.println(" [商品服务:] Sent '" + message + "'");
       channel.close();
       connection.close();
   }
}

1.2、消费者1

我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

public class Recv {
   private final static String QUEUE_NAME = "direct_exchange_queue_1";
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
       // 定义队列的消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                   byte[] body) throws IOException {
               // body 即消息体
               String msg = new String(body);
               System.out.println(" [消费者1] received : " + msg + "!");
           }
       };
       // 监听队列,自动ACK
       channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

1.3、 消费者2

我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

public class Recv2 {
   private final static String QUEUE_NAME = "direct_exchange_queue_2";
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
       // 定义队列的消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                   byte[] body) throws IOException {
               // body 即消息体
               String msg = new String(body);
               System.out.println(" [消费者2] received : " + msg + "!");
           }
       };
       // 监听队列,自动ACK
       channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

1.4、测试

我们分别发送增、删、改的RoutingKey,发现结果:

RabbitMQ---订阅模型-Direct,Spring Cloud,rabbitmq,java-rabbitmq,后端,服务器,java-rabbitmq

RabbitMQ---订阅模型-Direct,Spring Cloud,rabbitmq,java-rabbitmq,后端,服务器,java-rabbitmq文章来源地址https://www.toymoban.com/news/detail-681307.html

到了这里,关于RabbitMQ---订阅模型-Direct的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot整合RabbitMQ之发布与订阅模式

    RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现: 用户发布动态,其“粉丝”收到其发布动态的消息 用户下订单,库存模块、支付模块等收到消息并

    2024年02月12日
    浏览(41)
  • Spring RabbitMQ那些事(3-消息可靠传输和订阅)

    在有些业务场景中,消息是不能丢的,比如 分布式事务资金动账 ,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。 消息是有可能丢的,比如生产者在发送消息时 broker 服务挂了,消息没有来得及落盘,这时消

    2024年01月21日
    浏览(42)
  • 【图解RabbitMQ-7】图解RabbitMQ五种队列模型(简单模型、工作模型、发布订阅模型、路由模型、主题模型)及代码实现

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月07日
    浏览(49)
  • Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

    业务开发中有很多延时操作的场景,比如最常见的 超时订单自动关闭 、 延时异步处理 ,我们常用的实现方式有: 定时任务轮询 (有延时)。 借助Redission的延时队列 。 Redis的key过期事件通知机制 (需开启key过期事件通知,对Redis有性能损耗)。 RocketMQ中定时消息推送 (支

    2024年02月04日
    浏览(39)
  • Spring RabbitMQ那些事(1-交换机配置&消息发送订阅实操)

    在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。 我们先上应用启动配置文件 application.yml ,如下: 备注:这里我们指定了 RabbitListenerContainerFactory 的类型

    2024年01月17日
    浏览(46)
  • Spring cloud stream 结合 rabbitMq使用

    之前的开发主要是底层开发,没有深入涉及到消息方面。现在面对的是一个这样的场景: 假设公司项目A用了RabbitMQ,而项目B用了Kafka。这时候就会出现有两个消息框架,这两个消息框架可能编码有所不同,且结构也有所不同,而且之前甚至可能使用的是别的框架,造成了一个

    2024年02月04日
    浏览(49)
  • Spring Cloud 之RabbitMQ的学习【详细】

    分布式系统通信两种方式: 直接远程调用(同步) 借助第三方间接通信(异步) Feign就属于同步通讯。存在的如下问题 耦合度高,每次添加新的模块就要修改原有模块的代码 性能下降,调用者需要等待服务者返回的结果,如果调用链过长,则响应的时间越长 资源浪费,在

    2024年02月08日
    浏览(31)
  • spring cloud 搭建消息中间件 RabbitMQ 环境、Mac/Windows下载安装RabbitMQ、配置RabbitMQ环境变量

    spring boot、spring cloud工程:Mac/Windows下载安装Erlang、RabbitMQ,并配置环境变量。 这里学习如何安装 RabbitMQ,因为远程配置中心的动态更新需要结合 RabbitMQ 来使用。 这里给出自己下载和使用的百度网盘链接:Erlang 版本为25.3.2、RabbitMQ版本为3.12.1 : 链接:百度网盘链接 提取码:

    2024年02月15日
    浏览(70)
  • 实战:Spring Cloud Stream消息驱动框架整合rabbitMq

    相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的

    2024年02月08日
    浏览(46)
  • 微服务Spring Cloud Config配置中心与RabbitMQ安装指南

    本文档详细描述了如何在Spring Cloud微服务架构中设置Config配置中心,将项目配置文件存储在Git服务器上(如GitHub或Gitee),并在微服务启动时从Config配置中心获取配置文件。同时,提供了RabbitMQ消息队列的安装指南,为微服务之间的通信提供可靠的消息传递机制。

    2024年02月11日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包