RabbitMQ入门案例之Topic模式

这篇具有很好参考价值的文章主要介绍了RabbitMQ入门案例之Topic模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

本文章将介绍RabbitMQ中的Topic(主题)模式,其中还会涉及 ‘#’ 和 ‘*’ 两个通配符在RabbitMQ中的区别。

官网文档地址:https://rabbitmq.com/getstarted.html

什么是Topic模式

RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针对自己感兴趣的主题进行订阅。
RabbitMQ入门案例之Topic模式

在Topic模式中,主题通过一个由单词和点号组成的字符串来描述。例如,“*.china.#”表示匹配所有以“china”为结尾的主题,比如“bj.china”或“shanghai.china.weather”等。( ‘ # ’ 和 ‘ * ’ 会再后面介绍)

当一个消息被发布到Topic交换机(Exchange)时,交换机会将消息转发给所有与该主题匹配的队列。消费者(即订阅者)可以对队列进行绑定,通过指定自己感兴趣的主题进行绑定。

通过使用Topic模式,我们可以实现高度灵活的信息交换模式,同时,确保只有感兴趣的消费者才会收到消息,提高了系统的效率和可靠性。

‘ # ’ 和 ‘ * ’二者的区别

在RabbitMQ的Topic模式中,符号“#”和“*”都用于匹配主题,但它们的意义是不同的。

符号“#”表示通配符可以匹配0个或者多个单词。例如,“china.#”可以匹配所有的以“china”为前缀的主题,例如“china.beijing”,“china.shanghai.weather”等等。

符号“ * ”表示通配符:可以匹配一个单词。例如,“china.*”可以匹配所有的以“china”为前缀并且后面只有一个单词的主题,例如“china.beijing”,“china.shanghai”,但是“china.shanghai.weather”不会被匹配。

总的来说,“#”更加灵活,可以匹配更多的情况,而“*”则更加具体,只能匹配一个单词。但是,使用通配符需要注意,因为它可能会匹配到不可预测的主题,可能会导致消息被传递到错误的队列或者丢失。因此,在设计主题时需要慎重考虑,并尽量减少通配符的使用。

Topic模式实操

老规矩,我们先到RabbitMQ的web管理界面去创建一个Topic的交换机
RabbitMQ入门案例之Topic模式
效果如下:
RabbitMQ入门案例之Topic模式
点击该topic_exchange,进入到下图界面,并绑定消息队列,如果队列不存在需要先创建在过来绑定
RabbitMQ入门案例之Topic模式
最终效果:
RabbitMQ入门案例之Topic模式
接下来就是代码部分了,我们需要创建一个maven项目,然后将下面的依赖导入:

	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--RabbitMQ依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

然后创建生产者,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @description: Producer 简单队列生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = "超级无敌爱学习";
            String  exchangeName = "topic-exchange";
            String routingKey1 = "pz.class.student";
            String routingKey2 = "class.user.student";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

创建消费者,代码如下:

import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("43.139.42.244");
        connectionFactory.setPort(5678);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, false, false, null);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println(queueName + ":开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}

接下来执行生产者代码,在这段代码中,我们先对路由key1进行发送消息并通过web管理界面查看效果:

RabbitMQ入门案例之Topic模式

执行结果:
RabbitMQ入门案例之Topic模式
web管理界面查看结果:
RabbitMQ入门案例之Topic模式

通过上面的图,我们可以发现,我们消息通过topic_exchange这个交换机通过指定路由key发送到了绑定的消息队列中,由于routingkey使用的是通配符发方式,其中“queue2 -> #.class.* ” , “ queue3 -> #.student.#”,又由于通配符,# 号是指0个及以上,* 号是仅匹配一个,那么结果就是符合预期的,因为routingkey1= pz.class.student,class前有一个,后面有一个,会映射到queue2,student前面有多个,后面没有可以映射到queue3,结果就和图示一样啦~~

接下来就执行routekey2的路由key来看看会发生什么效果:
RabbitMQ入门案例之Topic模式
执行结果
RabbitMQ入门案例之Topic模式
web管理界面查看结果:
RabbitMQ入门案例之Topic模式
可以看到这次queue1和queue3,接收到了消息,可以自己尝试分析一下,这里不做过多赘述。文章来源地址https://www.toymoban.com/news/detail-495321.html

以上便是本章全部内容,感谢阅读ovo
如有错误,感谢指正

到了这里,关于RabbitMQ入门案例之Topic模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ六】——RabbitMQ主题模式(Topic)

    通过本篇博客能够简单使用RabbitMQ的主题模式。 本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。 Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行

    2024年02月09日
    浏览(33)
  • RabbitMQ 发布订阅模式,routing路由模式,topic模式

    发布订阅模式 一个消息可以由多个消费者消费同一个消息  消费者1和2同时消费了该消息 举例  消费者1和2同时消费了该消息,比如说消息是发短信,发邮件,  那么1和发短息  2可以发邮件 routing路由模式 就是说哪些让谁干 哪些让谁干区分出来 也可以让所有消费者都消费 选择

    2024年02月02日
    浏览(53)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(78)
  • RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月10日
    浏览(59)
  • RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

    参考官网:https://www.rabbitmq.com/getstarted.html 简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式 简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。 发布订阅模式 fanout 同时向

    2024年02月10日
    浏览(48)
  • SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

    1、simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端

    2024年02月02日
    浏览(47)
  • rabbitmq topic模式设置#通配符情况下 消费者队列未接收消息问题排查解决

    生产者配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.# 消费者代码配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.user 其实以上代码看着没有问题,意思是代码生成一个队列,并把【topic.shcool.user】队列和生产者的【topic_exchange_shcool】exchange绑定,但是生产者发送消息是

    2024年02月11日
    浏览(48)
  • (五)kafka从入门到精通之topic介绍

    Kafka是一个流行的分布式消息系统,它的核心是一个由多个节点组成的分布式集群。在Kafka中,数据被分割成多个小块,并通过一些复杂的算法在节点之间传递。这些小块被称为Kafka Topic。 一个Topic是一组具有相同主题的消息。可以将Topic看作是一个数据仓库,在这个仓库中存

    2024年02月12日
    浏览(36)
  • RabbitMQ入门到实战一篇文章就够了

    课程内容 认识RabbitMQ 安装RabbitMQ SpringBoot使用RabbitMQ 其他特性 1.RabbitMQ认识 1.1.RabbitMQ是什么 MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程

    2024年03月09日
    浏览(50)
  • rabbitmq的介绍、使用、案例

    rabbitmq简单来说就是个消息中间件,可以让不同的应用程序之间进行异步的通信,通过消息传递来实现解耦和分布式处理。 消息队列:允许将消息发到队列,然后进行取出、处理等操作,使得生产者和消费者之间能够解耦,异步地进行通信。 持久性,可靠性的消息传递机制。

    2024年01月20日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包