RabbitMQ交换机与队列

这篇具有很好参考价值的文章主要介绍了RabbitMQ交换机与队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反, 生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单, 一方面它接收来自生产者的消息,另一方面将它们推入队列。 交换机必须确切知道如何处理收到的消息。 是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。 这就的由交换机的类型来决定。

Exchanges 的类型

  1. 直接(direct)
  2. 主题(topic)
  3. 标题(headers) ,
  4. 扇出(fanout)

声明交换机

//参数:(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANEG_NAME,type);

默认 exchange

默认交换机使用空串标识,消息发送到交换机,交换机根据路由发送到队列中。
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

channel.basicPublic("",queueName,props,message);

临时队列

临时队列,当消费者断开连接后,临时队列将被删除
临时队列声明

String queueName=channel.queueDeclare().getQueue();

绑定(bindings)

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
将队列绑定到交换机。

//参数:(队列,交换机,路由)
channel.queueBind(queueName,EXCHANGE_NAME,RouterKey);

Fanout

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)

Fanout 这种类型它是将接收到的所有消息广播到它知道的所有队列中。
RabbitMQ交换机与队列,Spring,SpringBoot,rabbitmq

public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机 参数(交换机,交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列绑定交换机 参数(队列,交换机,routingKey)
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("Receive02等待消息....");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queue,true,askCallback,consumerTag->{});
    }
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机 参数(交换机,交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列绑定交换机 参数(队列,交换机,routingKey)
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("Receive02等待消息....");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queue,true,askCallback,consumerTag->{});
    }
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发出消息:"+message);
        }

消费者01和消费者02都将接收到生产者发送的消息

Direct exchange

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
RabbitMQ交换机与队列,Spring,SpringBoot,rabbitmq
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。 绑定键为 blackgreen 和的消息会被发布到队列 Q2, 其他消息类型的消息将被丢弃

多重绑定

当然如果 exchange 的绑定类型是 direct, 但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
RabbitMQ交换机与队列,Spring,SpringBoot,rabbitmq

Topics

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
这些单词可以是任意单词,比如说: “stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”.这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中
(星号)可以代替一个单词
#(井号)可以替代零个或多个单词

RabbitMQ交换机与队列,Spring,SpringBoot,rabbitmq
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和
出现,那么该队列绑定类型就是 direct 了

	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName="Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        //队列绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接受消息...");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
            System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName,true,askCallback,consumerTag->{});
    }
	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName="Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        //队列绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接受消息...");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
            System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName,true,askCallback,consumerTag->{});
    }
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        Map<String,String> binding=new HashMap<>();
        binding.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        binding.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        binding.put("quick.orange.fox","被队列 Q1 接收到");
        binding.put("lazy.brown.fox","被队列 Q2 接收到");
        binding.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        binding.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        binding.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        binding.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
        for (Map.Entry<String, String> stringEntry : binding.entrySet()) {
            String routingKey = stringEntry.getKey();
            String message = stringEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:"+message);
        }
    }

队列

声明队列

channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,arguments);

参数介绍:
1、QUEUE_NAME: 队列的名称;
2、durable: 是否持久化;
3、exclusive: 是否独享、排外的;
4、autoDelete: 是否自动删除;
5、arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

死信队列

死信,顾名思义就是无法被消费的消息 一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源 :

  1. 消息 TTL 过期
  2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
public class Consumer01 {
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static final String DEAD_EXCHANGE="dead_exchange";
    public static final String NORMAL_QUEUE="normal_queue";
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //正常队列绑定死信队列信息
        Map<String, Object> arguments=new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置队列最大长度
        //arguments.put("x-max-length",6);
        //声明普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定普通队列和死信队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        //成功时的回调
        DeliverCallback askCallback=(consumerTag,message)->{
            //拒绝消息 需要手动应答
            //channel.basicReject();
            System.out.println("Consumer01接收到的消息为:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,askCallback,consumerTag->{});
    }
}
public class Consumer02 {
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Consumer02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(DEAD_QUEUE,true,askCallback,consumerTag->{});
    }
}

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。文章来源地址https://www.toymoban.com/news/detail-820236.html

Map<String,Object> arguments=new HashMap<>();
arguments.put("x-message-ttl",5000)
channel.queueDeclare(queueName,durable,exclusive,autoDelete,arguments);

到了这里,关于RabbitMQ交换机与队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ队列及交换机的使用

    目录 一、简单模型 1、首先控制台创建一个队列 2、父工程导入依赖  3、生产者配置文件  4、写测试类 5、消费者配置文件 6、消费者接收消息 二、WorkQueues模型 1、在控制台创建一个新的队列 2、生产者生产消息 3、创建两个消费者接收消息 4、能者多劳充分利用每一个消费者

    2024年02月04日
    浏览(43)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(47)
  • 消息队列RabbitMQ.02.交换机的讲解与使用

    目录 RabbitMQ中交换机的基本概念与作用解析 交换机的作用: 交换机的类型: 直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。 主题交换机(Topic Exchange): 使用通配符匹配路由键,允许更灵活的消息路由。 扇形交换机(Fanout E

    2024年01月24日
    浏览(57)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(79)
  • RabbitMQ之动态创建队列与绑定交换机和监听器

    为什么需要动态创建队列与绑定交换机?我在写项目的时候遇到这么个问题,我数据库中存在一个字段messageType指定为消息类型,消息类型存在三种,一种是通知类,一种是验证码类,一种是活动类。并且对应的,要将消息进行不同渠道的分发,还存在一个channelType,而他又存

    2024年02月03日
    浏览(37)
  • RabbitMQ入门 消息队列快速入门 SpringAMQP WorkQueue 队列和交换机 Fanout Direct exchange RAbbitMQ单体部署

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年04月08日
    浏览(71)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(103)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)

    Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符! Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: snow.com 通配符规则: # :匹配一个或多

    2024年04月12日
    浏览(48)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群

    1.1.1.什么是MQ MQ(message queue:消息队列) ,从字面意思上看,本质是个 队列 , FIFO 先入先出 ,只不过队列中存放的 内容是message 而已 ,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服

    2024年01月20日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包