RabbitMQ灵活运用,怎么理解五种消息模型

这篇具有很好参考价值的文章主要介绍了RabbitMQ灵活运用,怎么理解五种消息模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


简介

上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型


一、AMQP协议

我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。

AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是一个面向消息的中间件传输协议,用于在应用程序之间进行异步消息通信。

AMQP协议定义了多种角色和服务,包括生产者、消费者、交换器、队列等。其中生产者负责生成消息,消费者负责接收和处理消息,交换器则负责将消息路由到队列中。如下图
RabbitMQ灵活运用,怎么理解五种消息模型

AMQP协议的消息传输基于消息队列,支持消息的确认、持久化、事务等功能,同时支持不同的消息传输模式,如点对点(point-to-point)和发布-订阅(publish-subscribe)模式。

我们今天要介绍的RabbitMQ的五种模型,其实都是基于AMQP的基础模型或其演变而来

二、交换机类型与默认交换机

1. 交换机的四种类型

在进一步讲解之前,我们需要知道RabbitMQ,交换机有四种类型,分别是:Direct、Fanout、Topic和Headers。

  • direct:
    按照消息的路由键(routing key)完全匹配来投递消息。直接匹配模式,将消息发送到与路由键完全匹配的队列中。direct模式可以使用RabbitMQ自带的默认交换机,所以不需要将交换机进行任何绑定操作

  • topic:
    使用通配符进行模糊匹配,消息会带有一个路由键(routing key),而队列绑定到交换机上时,也可以指定主题,而且还能使用一定的正则形式,只要主题匹配上消息的路由键,该消息就会发送至该队列
    符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
    符号“ * ” 匹配不多不少一个词 eg:“ log.* ”只能匹配到“log.erro”

  • headers:
    按照消息头(header)匹配来投递消息。根据消息头的键值对匹配,当消息头与某个绑定的headers完全匹配时,才会将消息发送到该队列中。

  • fanout :
    将接收到的所有消息广播到它知道的所有队列中,如果routing_key 有指定也不会生效

不难发现,这四种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,四种类别对应着四种判断角度。fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;headers —— 这种模式的交换机不再以消息的routing_key作为判断依据,而是在队列绑定交换机时,每个队列需提供一个Map,当消息发送给交换机时,交换机会解析消息头,看看有没有能和各队列Map吻合的属性,有则发给该队列。

2. 默认交换机

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

三、五种模式速览

1. 一对一简单模式

RabbitMQ灵活运用,怎么理解五种消息模型

  • 概念
    生产者将消息发送到“ hello”队列。使用者从该队列接收消息。

  • 图解
    “ P”是我们的生产者
    “ C”是我们的消费者
    中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。需要注意的是,虽然看起来生产者直接连接了队列,但是实际上,它连接的是rabbitMQ的默认交换机

RabbitMQ灵活运用,怎么理解五种消息模型


2. work模式(轮询)

RabbitMQ灵活运用,怎么理解五种消息模型

  • 概念
    work模式是一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列,C1\C2是竞争关系,一个消息被C1消费,C2将没有消息

  • 优点及作用
    一个生产者一个队列多个消费者模式能够并行化工作,解决了消息积压问题

  • 工作原理
    默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

  • 注意
    轮询分发采用平均,导致机器性能的浪费,可以将消费者信道设置如下,代表不公平分发

int preFetchCount = 1;
// 该参数的意思是消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者,该设置需要和手动ACK配合使用
channel.basicQos(preFetchCount )


3. 发布/订阅模式

前两种发布模式,在图中都没有画出 Exchange交换机,属于是一种模型的简化,实际上上两种模式,都将消息发布给了rabbitMQz自带的默认交换机,然后默认交换机再将消息转发给消息队列
(PS:每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同)
RabbitMQ灵活运用,怎么理解五种消息模型

  • 概念
    和模式二不同的是,模式三是没有路由规则,多个队列,多个消费者生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,因此一个消息可以被多个消费者消费

4. 路由模式(自称direct模式)

RabbitMQ灵活运用,怎么理解五种消息模型

  • 概念
    交换机连接接队列发送消息需要指定routing_key,所以模式四就是生产者发送消息到交换机并且要指定routing_key,消费者将队列绑定到交换机时需要指定路由key
  • 算法
    背后的路由算法很简单:消息自带一个routing_key(亦称路由键),交换机会把该消息路由给与其routing_key完全匹配的队列

RabbitMQ灵活运用,怎么理解五种消息模型
在direct模式中,第一个队列以 orange为 key 绑定至交换机 x,第二个队列以black和green绑定。注意:如果有消息没有使用上述key,比如某个消息的key 是 red ,那么该消息将会被丢弃

另外,用相同的路由键可以绑定多个队列
RabbitMQ灵活运用,怎么理解五种消息模型


5. Topic模式

RabbitMQ灵活运用,怎么理解五种消息模型

  • 概念
    topic模式的routing_key使用通配符组成进行模糊匹配

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
符号“ * ” 只匹配一个词 eg:“ log.* ”只能匹配到“log.erro”

举例说明,当使用下列 routing_key 发送消息时:

  1. “quick.orange.rabbit”的消息将传递到两个队列;
  2. “lazy.orange.elephant ”也将发送给他们两个;
  3. “quick.orange.fox ”只会进入Q1,而“ lazy.brown.fox ”只会进入Q2;
  4. “lazy.pink.rabbit ”将被传递到Q2只有一次,即使两个绑定都匹配;
  5. “quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃;
  6. “orange“ 和“”quick.orange.male.rabbit“,因为单词个数对不上,则不会被匹配
  7. “lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到Q2队列。

四、实例

我们以最复杂的Topic模式为例,实际写一段java代码

1. 生产者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 待发送的消息
        String routingKey = "topic.key1";
        String message = "hello rabbitmq, this is topic message";

        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("Sent message: " + message + ", routingKey: " + routingKey);

        channel.close();
        connection.close();
    }
}

2. 消费者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机,由于发送者已创建,此步其实可省略,一般由发送方建立交换机
        // channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个非持久、独占、自动删除的队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定路由键为 "topic.#" 的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "topic.#");

        System.out.println("Waiting for messages. To exit press CTRL+C");

        // 消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message + ", routingKey: " + routingKey);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}


五、总结

rabbitMQ的五种模式就介绍完了,其实后三种模式非常相像,我们可以这么理解Topic模式

当队列用‘#’绑定时它将接收所有消息,与routing_key无关,此时就像fanout模式一样
当队列绑定中不使用‘*’和‘#’时,主题交换就像direct模式一样。

而至于前两种模式,由于模型中不带有交换机,所以生产者和消费者都是直接连接的Queue,则就是直肠子的模式文章来源地址https://www.toymoban.com/news/detail-507300.html

到了这里,关于RabbitMQ灵活运用,怎么理解五种消息模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ - 基于 SpringAMQP 带你实现五种消息队列模型

    目录 一、SpringAMQP 1.1、概念 1.2、前置知识(实现案例前必看!) 1.2.1、创建队列 1.2.2、创建交换机 1.2.3、创建绑定 1.2.4、@RabbitListener 注解 a)情况一:queue 存在 b)情况二:queue 不存在  1.2.5、为什么更建议使用 @Bean 注解创建,而不是 @RabbitListener 注解创建? 1.3、案例实现

    2024年04月12日
    浏览(46)
  • 五种消息模型简单说明

    RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。  我们通过一个demo工程来了解下RabbitMQ的工作方式,导入工程:  依赖:  我们抽取一个建立RabbitMQ连接的

    2024年02月12日
    浏览(35)
  • RabbitMQ的五种常见消费模型

    RabbitMQ是一个流行的消息队列中间件,它确保了不同应用程序之间的可靠消息传递。由于其高性能、轻量级和灵活性,RabbitMQ在许多应用程序中被广泛使用,例如 异步任务处理、负载均衡、事件通知 等。在RabbitMQ中,消息的生产和消费是通过一系列的消费模型来管理的。每个

    2024年02月08日
    浏览(39)
  • 【图解RabbitMQ-7】图解RabbitMQ五种队列模型(简单模型、工作模型、发布订阅模型、路由模型、主题模型)及代码实现

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月07日
    浏览(46)
  • RabbitMQ消息拒绝怎么解决

    RabbitMQ是一个可靠的、高效的、易于使用的分布式消息队列系统。它支持多种消息协议,如AMQP、STOMP、MQTT等。RabbitMQ被广泛应用于企业级应用中,尤其是在异步通信、解耦合和负载均衡方面。 在使用RabbitMQ时,有时候我们会遇到消息被拒绝的情况。这种情况不仅会影响系统的

    2024年02月16日
    浏览(51)
  • RabbitMQ怎么处理消息事务

    在 RabbitMQ 中,可以通过以下两种方式实现消息事务: 发送方确认(Publisher Confirms) :这是 RabbitMQ 提供的一种轻量级事务机制。在发送消息之前,发送方可以要求 RabbitMQ 确认消息是否成功投递到交换机(Exchange)中。如果确认失败,发送方可以选择重试或者处理发送失败的情

    2024年02月07日
    浏览(40)
  • RabbitMQ管理页面怎么发送消息

    1.登录RabbitMQ 后台管理系统 点击Queues 2.Add a new queue 创建 一个 queue (h5) 3.进入queue (h5) 4.编辑header和发送内容 5.消费者消费

    2024年02月10日
    浏览(33)
  • RabbitMq消息模型-队列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 队列消息特点: 消息不会丢失 并且 有先进先出的顺序。 消息接收是有顺序的,不是随机的,仅有一个消费者能拿到数据,而且不同消费者拿不到同一份数据。 基本模型: SimpleQueue 在上图的模型中,有以下几个概念: P:为生产

    2024年02月09日
    浏览(46)
  • rabbitMq怎么查看队列消息-Tracing日志

    Trace 是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。 1、启动Tracing插件 在RabbitMQ中默认是关闭的,需手动开启。此处rabbitMQ是使用docker部署的 开启了插件后,无需重启,rabbitMq管理界面就会出现Tracing项,可新建追踪。 2、新建trace 新建trace时,JSON模

    2024年02月12日
    浏览(35)
  • Rabbitmq怎么保证消息的可靠性?

    一、消费端消息可靠性保证 : 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成

    2024年04月14日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包