RabbitMQ系列(14)--Topics交换机的简介与实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(14)--Topics交换机的简介与实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、Topics交换机的介绍

Topics交换机能让消息只发送往绑定了指定routingkey的队列中去,不同于Direct交换机的是,Topics能把一个消息往多个不同的队列发送;Topics交换机的routingkey不能随意写,必须是一个单词列表,并以点号分隔开,例如“one.two.three”,除此外还有两个替换符,*(星号)能代替一个单词,#(井号)可以代替零个或多个单词,例如“*.one.*”是中间是one的3个单词,“*.*.one”是最后一个是one的3个单词,“one.#”是第一个单词是one的多个单词,若队列绑定键是#,这个队列将接收所有数据,这时候类似fanout交换机,若队列绑定键中没有#和*出现,这时候就类似direct交换机

 2、Topics交换机的实现 

(1)新建一个名为topics的包,用于装发布确认的代码

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

效果图:

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

(2)新建一个名为Receive01的类用于编写消费者的代码

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

 注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

RabbitMQ系列(6)--RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

/**
 * 消息接收
 */
public class Receive01 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明队列
        String queueName = "Q1";
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(queueName,false,false,false,null);
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"*.one.*");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}

(3)复制Receive01类并粘贴重命名为Receive02

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息接收
 */
public class Receive02 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明队列
        String queueName = "Q1";
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(queueName,false,false,false,null);
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.two");
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"three.#");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}

(4)新建一个名为Emit的类用于编写生产者的代码

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

代码如下:

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;


/**
 * 发消息
 */
public class Emit {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("four.one.two","被队列Q1Q2接收");
        bindingKeyMap.put("three.one.five","被队列Q1Q2接收");
        bindingKeyMap.put("four.one.six","被队列Q1接收");
        bindingKeyMap.put("three.seven.six","被队列Q2接收");
        bindingKeyMap.put("three.eight.two","虽然满足两个绑定,但只被队列Q2接收一次");
        bindingKeyMap.put("three.seven.six","不匹配任何绑定,不会被任何队列接收到,会被丢弃");
        bindingKeyMap.put("four.one.nine.two","四个单词,不匹配任何绑定,会被丢弃");
        bindingKeyMap.put("three.one.nine.two","四个单词,但匹配Q2");

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }

    }

}

 (5)分别先运行Receive01、Receive02、Emit

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmqRabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

(6)查看Receive01和Receive02接收消息的情况RabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmqRabbitMQ系列(14)--Topics交换机的简介与实现,rabbitmq,rabbitmq,java-rabbitmq

从上述结果可看出topic交换机实现成功文章来源地址https://www.toymoban.com/news/detail-539159.html

到了这里,关于RabbitMQ系列(14)--Topics交换机的简介与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)

    Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符! Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: snow.com 通配符规则: # :匹配一个或多

    2024年04月12日
    浏览(38)
  • 【初始RabbitMQ】交换机的实现

    RabbitMQ消息传递模型的核心思想就是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者不知道这些消息会传递到那些队列中 相反,生产者只能将消息发送到交换机,交换机的工作内容也很简单,一方面是接受来自生产者的消息,另一方面是将它们推入到队列

    2024年02月20日
    浏览(28)
  • RabbitMQ的交换机(原理及代码实现)

    Fanout Exchange(扇形) Direct Exchange(直连) opic Exchange(主题) Headers Exchange(头部) Fanout 扇形的,散开的; 扇形交换机 投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发; 如下图所示 P代表provider提供者,X代表exchange交换机,第三部分代

    2024年02月08日
    浏览(29)
  • RabbitMQ中交换机的应用 ,原理 ,案例的实现

                                    🎉🎉欢迎来到我的CSDN主页!🎉🎉                     🏅我是平顶山大师,一个在CSDN分享笔记的博主。📚📚     🌟推荐给大家我的博客专栏《RabbitMQ中交换机的应用及原理,案例的实现》。🎯🎯                    

    2024年01月24日
    浏览(44)
  • RabbitMQ中交换机的应用及原理,案例的实现

    目录 一、介绍 1. 概述 2. 作用及优势 3. 工作原理 二、交换机Exchange 1. Direct 2. Topic 3. Fanout 三、代码案例 消费者代码   1. 直连direct  生产者代码 测试 2. 主题topic  生产者代码 测试 3. 扇形fanout  生产者代码 测试 每篇一获 RabbitMQ中的交换机(exchange)是消息的分发中心,它

    2024年01月24日
    浏览(28)
  • RabbitMQ 备份交换机和死信交换机

      为处理生产者将消息推送到交换机中,交换机按照消息中的路由键及自身策略无法将消息投递到指定队列中造成消息丢失的问题,可以使用备份交换机。   为处理在消息队列中到达TTL的过期消息,可采用死信交换机进行消息转存。可以通过死信交换机的方式实现延迟队

    2024年02月14日
    浏览(43)
  • golang整合rabbitmq-实现创建生产者绑定交换机-创建消费者消费完整代码

    1,在生产者端初始化mq连接 2,创建生产者 3,另起一个go服务进行消费者消费 后面将会发布golang整合es操作的文章

    2024年01月25日
    浏览(48)
  • RabbitMQ之Exchange(交换机)属性及备用交换机解读

    目录 基本介绍 主要结论 备用交换机  springboot代码实战(备用交换机) 实战架构 工程概述 RabbitConfigDeal 配置类:创建队列及交换机并进行绑定  MessageService业务类:发送消息及接收消息 主启动类RabbitMq01Application:实现ApplicationRunner接口 在 RabbitMQ 中,交换机主要用来将生产

    2024年02月02日
    浏览(44)
  • 【RabbitMQ】RabbitMQ的交换机

    在上文中, 都没有交换机,生产者直接发送消息到队列。 而一旦引入交换机,消息发送的模式会有很大变化:可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化: Publisher:生产者,不再发送消息到队列中,而是发给交换机 Exchange:交换机,一方面,接收生

    2024年03月12日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包