【RabbitMQ六】——RabbitMQ主题模式(Topic)

这篇具有很好参考价值的文章主要介绍了【RabbitMQ六】——RabbitMQ主题模式(Topic)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

通过本篇博客能够简单使用RabbitMQ的主题模式。
本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。

什么是Topic模式

Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这使我们相较于Direct模式灵活性更大。

使用Topic模式的要点

routing key必须是由"."进行分隔的单词列表,最大限制为255字节

通配符规则

  • "*"可以代替一个单词。
  • "#"可以代替零个或多个单词。

示例

创建了三个绑定:Q1绑定了绑定键“.orange”。和Q2的".*.rabbit"和“lazy.#”。
rabbitmq topic模式,# 《消息中间件》,微服务,java-rabbitmq,rabbitmq,java,微服务

1.一个消息的路由键为"quick.orange.rabbit" 时,它将会被送到队列Q1和Q2。
2.一个消息的路由键为"quick.orange.fox"时,它将会背诵到队列Q1
3.一个消息的路由键为"lazy.brown.fox"时,它将被送到队列Q2
4.一个消息的路由键为"quick.brown.fox",没有匹配任何队列,消息将会丢失。
5.一个消息的路由键为"lazy.orange.new.rabbit",它将被送到队列Q2.
6.一个消息的路由键为"orang"或者"quick.orange.new.rabbit"没有匹配到任何队列消息将丢失。

代码示例

Pom文件引入RabbtiMQ依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {

    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setVirtualHost("虚拟主机");
        factory.setUsername("用户名");
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }

    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}



生产者

import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //创建topic类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明routingKey
        String severityInfo="info.log.test";
        String severityError="error.test";
        String severityError2="log.error.test";
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="info.log.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="主题模式error.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="log.error.test:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
    }
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/2/1 9:39]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:39]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (error)
        String severityError="error.*";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );

        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (info,error,warning)
        String severityInfo="info.#";
        String severityError="*.error.*";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了3个routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );
        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }

}

效果

rabbitmq topic模式,# 《消息中间件》,微服务,java-rabbitmq,rabbitmq,java,微服务
rabbitmq topic模式,# 《消息中间件》,微服务,java-rabbitmq,rabbitmq,java,微服务
rabbitmq topic模式,# 《消息中间件》,微服务,java-rabbitmq,rabbitmq,java,微服务

总结

通过使用通配符实现灵活性的应用有很多,例如nginx的请求转发,gateway为请求过滤等等都是使用了统配符的技术。通过这种联想来对知识进行结构化,找相同和不同,思考能力和学习力也会有很大的提高。文章来源地址https://www.toymoban.com/news/detail-696423.html

到了这里,关于【RabbitMQ六】——RabbitMQ主题模式(Topic)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq:Topic exchange(主题交换机)的理解和使用

    在RabbitMq中,生产者的消息都是通过交换机来接收,然后再从交换机分发到不同的队列中去,在分发的过程中交换机类型会影响分发的逻辑,下面主要讲解一下主题交换机。 ​ 主题交换机核心是可以以范围的行为向队列发送消息,它和直连交换机区别在于,直连交换机一个队

    2024年02月12日
    浏览(34)
  • RabbitMQ入门案例之Topic模式

    本文章将介绍RabbitMQ中的Topic(主题)模式,其中还会涉及 ‘#’ 和 ‘*’ 两个通配符在RabbitMQ中的区别。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针

    2024年02月10日
    浏览(42)
  • RabbitMQ 发布订阅模式,routing路由模式,topic模式

    发布订阅模式 一个消息可以由多个消费者消费同一个消息  消费者1和2同时消费了该消息 举例  消费者1和2同时消费了该消息,比如说消息是发短信,发邮件,  那么1和发短息  2可以发邮件 routing路由模式 就是说哪些让谁干 哪些让谁干区分出来 也可以让所有消费者都消费 选择

    2024年02月02日
    浏览(49)
  • RabbitMQ学习——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月13日
    浏览(73)
  • RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计

    1.rabbitmq队列方式的梳理,点对点,一对多; 2.发布订阅模式,交换机到消费者,以邮箱和手机验证码为例; 3.topic模式,根据规则决定发送给哪个队列; 4.rabbitmq回调确认,setConfirmCallback和setReturnsCallback; 5.死信队列,延迟队列,创建方法,正常—死信,设置延迟时间; 点对

    2024年02月10日
    浏览(54)
  • RabbitMQ: topic 结构

    2024年02月09日
    浏览(28)
  • Kafka - 主题Topic与消费者消息Offset日志记录机制

    可以根据业务类型,分发到不同的Topic中,对于每一个Topic,下面可以有多个分区(Partition)日志文件: kafka 下的Topic的多个分区,每一个分区实质上就是一个队列,将接收到的消息暂时存储到队列中,根据配置以及消息消费情况来对队列消息删除。 可以这么来理解Topic,Partitio

    2024年02月03日
    浏览(52)
  • RabbitMQ交换机(3)-Topic

    RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针对自己感兴趣的主题进行订阅。 在Topic模式中, 主题通过一个由单词和点号组成的字符串 来描述。例如,“#.china”表示匹配所有以“china”为结尾的

    2024年01月19日
    浏览(44)
  • rabbitmq基础-java-5、Topic交换机

    Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。 只不过 Topic 类型 Exchange 可以让队列在绑定 BindingKey 的时候使用通配符! BindingKey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如: item.insert 通配符规则: # :匹配一个或多个

    2024年01月25日
    浏览(36)
  • 消息中间件学习笔记--RabbitMQ(二、模式,一次违反常规的Java大厂面试经历

    .Fanout:转发消息到所有绑定队列 比较常用的是Direct、Topic、Fanout. Fanout 这种Fanout模式不处理路由键,只·需要简单的将队列绑定到exchange上,一个发送到exchange的消息都会被转发到与该exchange绑定的所有队列上。很像广播子网,每台子网内的主机都获得了一份复制的消息。Fan

    2024年04月09日
    浏览(96)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包