RabbitMQ 发布订阅模式,routing路由模式,topic模式

这篇具有很好参考价值的文章主要介绍了RabbitMQ 发布订阅模式,routing路由模式,topic模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

发布订阅模式

一个消息可以由多个消费者消费同一个消息

 消费者1和2同时消费了该消息

举例

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("03-pubsub1","", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

RabbitMQ 发布订阅模式,routing路由模式,topic模式

RabbitMQ 发布订阅模式,routing路由模式,topic模式

 消费者1和2同时消费了该消息,比如说消息是发短信,发邮件,  那么1和发短息  2可以发邮件RabbitMQ 发布订阅模式,routing路由模式,topic模式

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("03-pubsub1", "fanout");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, "03-pubsub1", "");
    channel.basicConsume(queue, false,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

routing路由模式

就是说哪些让谁干

哪些让谁干区分出来

也可以让所有消费者都消费

选择性的让某个消费者消费,或者都消费

RabbitMQ 发布订阅模式,routing路由模式,topic模式

 生产者RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("04-routing1","info", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

 消费者1

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "04-routing1", "info");
    channel.queueBind(queue, "04-routing1", "error");
    channel.queueBind(queue, "04-routing1", "waring");
    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("04-routing1", "direct");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个

    channel.queueBind(queue, "04-routing1", "trace");


    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

上面的只有消费者1消费了消息 

可以根据channel.queueBind(queue, "04-routing1", "trace"); 绑定消息  也可以让1和2都消费,

 

topic模式和Routing模式高度相识,用通配符的形式指定让谁消费,或者都消费

RabbitMQ 发布订阅模式,routing路由模式,topic模式

 生产者

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    /**
     * 发布订阅模式需要指定交换机和类型,不能用上面的模式
     * 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定
     * 或者没有符合路由规则的队列,那么消息会丢失
     * 第一个参数:交换机名字
     * 第二个参数:交换机类型
     *  fanout:广播,将消息交给所有绑定到交换机的队列
     *  direct:定向,把消息交给符合指定routing key的队列
     *  topic:通配符,把消息交给符合routing pattern(路由模式)的队列
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //6 发送消息
    /**
     * 第一个参数:交换机名称 没有交换机就设置""
     * 第二个参数:路由key  routing模式需要路由key
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */


    channel.basicPublish("05-topic1","employee.save", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitMQ".getBytes());

    //7 关闭消息
    //channel.close();
    connection.close();
}

消费者1

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "employee.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 1  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

消费者2

RabbitMQ 发布订阅模式,routing路由模式,topic模式

public static void main(String[] args) throws IOException, TimeoutException {
    //1 创建连接工厂
    ConnectionFactory connectionFactory=new ConnectionFactory();
    //2 设置rabbitmq  ip地址
    connectionFactory.setHost("localhost");
    //3 创建连接对象   Conection对象
    Connection connection=connectionFactory.newConnection();
    //4 创建管道  Chanel
    Channel channel=connection.createChannel();
    //5 设置队列属性
    /**
     * 第一个参数:队列的名称
     * 第二个参数:队列是否要持久化
     * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)
     * 第四个参数:是否自动删除消息
     * 第五个参数:是否要设置一些额外的参数
     */
    //channel.queueDeclare("02-work",false,false,true,null);
    //channel.basicQos(1);
    //6 使用chanel 去 rabbitmq 获取消息进行消费
    /**
     * 第一个参数:队列的名称
     * 第二个参数:是否自动签收
     * 第三个参数:消息属性
     * 第四个参数:消息内容
     */
    channel.exchangeDeclare("05-topic1", "topic");
    //绑定
    String queue = channel.queueDeclare().getQueue();
    //可与绑定多个
    channel.queueBind(queue, "05-topic1", "user.*");

    channel.basicConsume(queue, true,new DeliverCallback(){
        /**
         * 当消息从mq 中取出来了会回调这个方法
         * 消费者消费消息就在这个  handle中进行处理
         */
        @Override
        public void handle(String s, Delivery delivery){
            System.out.println("消费者 2  消息中的内容为:"+new String(delivery.getBody()));
        }
    },new CancelCallback(){
        /**
         * 当消息取消了会回调这个方法
         */
        @Override
        public void handle(String s) throws IOException {
            System.out.println(111);
        }
    });
    //7 关闭消息   注意消费者 需要持续监听,不要关闭
    //channel.close();
    //connection.close();
}

结果就是消费者1消费了消息

所有工作模式总结

RabbitMQ 发布订阅模式,routing路由模式,topic模式

 文章来源地址https://www.toymoban.com/news/detail-430803.html

到了这里,关于RabbitMQ 发布订阅模式,routing路由模式,topic模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ入门案例之发布订阅模式

    本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。 在该模

    2024年02月09日
    浏览(57)
  • springboot rabbitmq 发布订阅 广播模式

    根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。 实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门

    2024年02月02日
    浏览(40)
  • 【RabbitMQ四】——RabbitMQ发布订阅模式(Publish/Subscribe)

    通过本篇博客能够简单使用RabbitMQ的发布订阅模式。 本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。 发布订阅模式的核心是生产者生产的消息,其他消费者都可以收到该生产者生产的消息。 由于发布订阅模式

    2024年02月02日
    浏览(35)
  • RabbitMQ--04--发布订阅模式 (fanout)-案例

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 @RabbitListener和@RabbitHandler的使用 OrderService OrderServiceImpl 在项目的test中发送请求 访问网址: http://localhost:15672/#/queues yml配置 SmsConsumerService、SmsConsumerServiceImpl EmailConsumerService、EmailConsumerServiceImpl DuanxinCo

    2024年04月14日
    浏览(44)
  • RabbitMQ的Publish/Subscribe发布订阅模式详解

    各位小伙伴很久不见了,今儿又要给大家分享干货了。我们知道RabbitMQ有简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、远程过程调用模式、发布者确认模式等。这么多模式,你可能一下子很难全部吸收,今天袁老师主要给大家介绍发布订阅模式Publish/Subsc

    2024年02月10日
    浏览(47)
  • Spring Boot整合RabbitMQ之发布与订阅模式

    RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现: 用户发布动态,其“粉丝”收到其发布动态的消息 用户下订单,库存模块、支付模块等收到消息并

    2024年02月12日
    浏览(41)
  • C#使用RabbitMQ-3_发布订阅模式(扇形交换机)

    发布订阅模式允许一个生产者向多个消费者发送消息。在RabbitMQ中实现发布订阅模式通常涉及以下几个关键组件: 生产者 :负责生产并发送消息到RabbitMQ的Exchange(路由器)。 Exchange :作为消息的中转站,根据不同的规则将消息路由到一个或多个队列。 队列 :存储消息的缓

    2024年02月21日
    浏览(41)
  • RabbitMQ(一) - 基本结构、SpringBoot整合RabbitMQ、工作队列、发布订阅、直接、主题交换机模式

    Publisher : 生产者 Queue: 存储消息的容器队列; Consumer:消费者 Connection:消费者与消息服务的TCP连接 Channel:信道,是TCP里面的虚拟连接。例如:电缆相当于TCP,信道是一条独立光纤束,一条TCP连接上创建多少条信道是没有限制的。TCP一旦打开,就会出AMQP信道。无论是发布消息

    2024年02月14日
    浏览(55)
  • 【图解RabbitMQ-7】图解RabbitMQ五种队列模型(简单模型、工作模型、发布订阅模型、路由模型、主题模型)及代码实现

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月07日
    浏览(48)
  • Python如何操作RabbitMQ实现fanout发布订阅模式?有录播直播私教课视频教程

    生产者 消费者 生产者 消费者 生产者 消费者

    2024年01月17日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包