rabbitMQ大致讲解

这篇具有很好参考价值的文章主要介绍了rabbitMQ大致讲解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 1.基本概况

        MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。

        RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

2.使用场景

        学习或者使用一项技术,首先要先知道他能够解决那些痛点,而不要过分关注如何使用它,先理解后使用

  • 异步通信:

    • 将耗时的任务异步执行,以提高系统的响应性。生产者将任务放入消息队列,而消费者从队列中获取任务并执行。
  • 系统解耦:

    • 在分布式系统中,通过消息队列实现各个组件之间的解耦,使得系统更加灵活、可扩展,并降低各个组件之间的依赖性。
  • 削峰填谷:

    • 处理流量峰值,通过消息队列缓冲和平滑大量的请求,以防止系统过载。生产者将请求发送到队列,而消费者按照其处理能力逐渐处理这些请求。
  • 日志处理:

    • 在分布式系统中,将日志异步发送到消息队列,然后由日志处理服务消费,以便进行日志聚合、监控和分析。
  • 事件驱动架构:

    • 构建事件驱动的体系结构,通过消息队列进行组件之间的通信。当一个组件触发事件时,它将消息发布到队列,而其他组件则订阅并处理这些事件。
  • 任务分发和调度:

    • 将任务分发到多个工作者节点,以实现任务的并行处理。生产者将任务放入队列,而多个消费者从队列中获取任务并执行。
  • 订单处理:

    • 在电子商务系统中,通过消息队列处理订单和支付流程。当用户下订单时,将订单信息发送到队列中,而订单处理服务异步处理这些订单。
  • 微服务通信:

    • 在微服务架构中,通过消息队列实现微服务之间的通信。微服务可以通过发布和订阅消息进行协作,而不直接调用彼此的 API。
  • 实时数据处理:

    • 将实时生成的数据通过消息队列传递给分析引擎或仪表板,以便实时监控和分析系统的状态。
  • 发布/订阅模式:

    • 实现发布/订阅模式,使得多个消费者可以订阅并处理同一类事件。这在通知系统和广播消息时非常有用。

 3.大体介绍

        3.1四大核心概念

                生产者(publisher)->交换机(exchange)->队列(queues)->消费者(consumer)

        3.2核心部分

                取自官网rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习

 3.2.1 生产者

        RabbitMQ 中的生产者负责将消息发送到队列。

创建一个简单的生产者通常需要以下步骤:

  1. 创建连接工厂: 使用 ConnectionFactory 创建连接到 RabbitMQ 服务器的连接。

  2. 创建连接: 调用 newConnection 方法创建一个连接对象。

  3. 创建通道: 调用 createChannel 方法创建一个通道,所有的操作都在通道上进行。

  4. 声明交换器和队列: 使用 exchangeDeclare 方法声明交换器,使用 queueDeclare 方法声明队列。

  5. 发布消息: 使用 basicPublish 方法发布消息到交换器。

import com.rabbitmq.client.*;

public class SimpleProducer {
    private static final String EXCHANGE_NAME = "my_direct_exchange";
    private static final String ROUTING_KEY = "my_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明直连交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

            // 发布消息
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
            
            System.out.println("Message sent: " + MESSAGE);
        }
    }
}

消息的基本属性

使用 basicPublish 方法发布消息时,可以设置消息的基本属性,例如:

  • 交换器名称(Exchange Name): 指定消息发送的交换器。
  • Routing Key(路由键): 对于 Direct、Topic 类型的交换器,消息的路由键与绑定键进行匹配,以决定将消息路由到哪个队列。
  • BasicProperties(基本属性): 可以设置消息的基本属性,如消息的 ID、内容类型、优先级等。

 

发布确认机制

RabbitMQ 提供了发布确认机制,用于确保消息成功发送到交换器。可以通过调用 confirmSelect 方法开启确认模式,并通过 waitForConfirmswaitForConfirmsOrDie 方法等待确认。

// 开启发布确认模式
channel.confirmSelect();

// 发布消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

// 等待确认
if (channel.waitForConfirms()) {
    System.out.println("Message sent successfully!");
} else {
    System.err.println("Failed to send message!");
}
3.2.2 exchange(交换器)
//声明交换器的方法
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException {
        return this.exchangeDeclare(exchange, type, durable, autoDelete, false, arguments);
    }
  1. exchange(交换器名称):

    • 类型:String
    • 描述:要声明的交换器的名称。
  2. type(交换器类型,下方有对交换器类型的详解):

    • 类型:String
    • 描述:交换器的类型,有四种类型可以选择:
      • direct: 将消息路由到与消息的 routing key 完全匹配的队列。
      • fanout: 将消息广播到所有与交换器绑定的队列。
      • topic: 将消息路由到与消息的 routing key 匹配的队列,支持通配符。
      • headers: 使用消息的 headers 属性进行匹配。
  3. durable(持久性):

    • 类型:boolean
    • 描述:如果设置为 true,交换器将在服务器重启后仍然存在(持久性交换器)。如果设置为 false,交换器将在服务器重启后被删除(非持久性交换器)。
  4. autoDelete(自动删除):

    • 类型:boolean
    • 描述:如果设置为 true,交换器将在至少有一个队列与之绑定时存在。当所有与交换器绑定的队列都解绑时,交换器将被删除。
  5. arguments(其他参数):

    • 类型:Map<String, Object>
    • 描述:其他的一些交换器参数,以键值对的形式传递。例如,在使用 Headers Exchange 类型时,可以通过设置 x-match 参数来定义匹配规则。

         生产者推送到具体交换器,交换器推送到零个或多个队列中,具体推送规则取决于交换器类型以及相应的路由规则

Exchange type(交换器类型) Default pre-declared names
Direct exchange(直接) (Empty string) and amq.direct
Fanout exchange(扇出) amq.fanout
Topic exchange(主题) amq.topic
Headers exchange(标头) amq.match (and amq.headers in RabbitMQ)

        交换器还有许多其他属性,例如名字,持久性(rabbitMQ重启之后是否仍然存在该交换器),自动删除(最后一个队列与交换器解除绑定之后删除),参数(可选,由插件或其他特定功能使用)

  1.   默认交换器

                他是一个没有名称的直接交换器(名字为空字符串),它会使默认没有绑定的队列自动绑定到该交换器上,默认交换器不支持绑定/解绑操作,如果进行该操作会报错

     2. 直接交换器

                他会根据路由key直接将消息推送到响应名称队列中

rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习

rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习 

         如上图,a队列绑定了aa,b队列绑定了bb,cc这两个键值,exchange就会根据消息所包含的路由来进行消息的推送选择

          3.fanout扇出交换器

                该交换器会将消息发送到所有与他绑定的队列中,从而忽视路由键的关联

rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习

         4.topic主题交换器

                该交换器可以更加方便的进行自定义队列推送,适用于比较复杂的业务场景

        rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习

                注意:1.当一个队列绑定键是#,那么他将接收所有交换器推送的消息

                         2.如果队列中没有#与*好出现,那么该队列就如同direct一样的效果

            5.标头交换器

                他与直接交换器功能个差不多一致只是传参不一样,从下面源码可以看出,标头交换器是根据arguments进行路由选择的 rabbitMQ大致讲解,java-rabbitmq,rabbitmq,学习

                 声明标头交换器的时候可以设置是全部匹配还是部分匹配(下述为示例代码)

// 声明 Header Exchange,并设置匹配模式为 "any"(部分匹配即可),"all"(需要全部匹配才可)
            Map<String, Object> exchangeArgs = new HashMap<>();
            exchangeArgs.put("x-match", "any");
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, false, false, exchangeArgs);
3.2.3 queue(队列)

         相关属性name(队列名称),Durable(队列在服务端重启之后是否仍然存在),Exclusive(独占,只由一个队列连接使用,当该链接关闭时,删除连接),Auto-delete(自动删除,如果没有消费者连接到这个队列,那么队列将被自动删除),Arguments(可选;由插件和代理特定功能(如消息 TTL、队列长度限制等)使用)

//该方法为声明队列的方法,关于其中的参数下述进行详解
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
        validateQueueNameLength(queue);
        return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
    }
  1. queue(队列名称):

    • 类型:String
    • 描述:要声明的队列的名称。如果不指定队列名称,RabbitMQ 将会为队列生成一个唯一的名称。
  2. durable(持久性):

    • 类型:boolean
    • 描述:如果设置为 true,队列将在服务器重启后仍然存在(持久性队列)。如果设置为 false,队列将在服务器重启后被删除(非持久性队列)。
  3. exclusive(排他性):

    • 类型:boolean
    • 描述:如果设置为 true,队列将只允许当前连接创建的消费者使用。一旦连接关闭,队列将被删除。
  4. autoDelete(自动删除):

    • 类型:boolean
    • 描述:如果设置为 true,队列将在至少有一个消费者连接到它时存在。当所有消费者断开连接时,队列将被删除。
  5. arguments(其他参数):

    • 类型:Map<String, Object>
    • 描述:其他的一些队列参数,以键值对的形式传递。例如,可以通过设置 x-message-ttl 参数来定义消息的过期时间。

返回值为 AMQP.Queue.DeclareOk,其中包含有关已声明队列的一些信息。

        3.2.4Bindings (绑定)

                用来绑定交换机与队列的路由关系,也可以进行交换器与交换器关系绑定(一般用不上)

//队列绑定
public com.rabbitmq.client.impl.AMQImpl.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
        validateQueueNameLength(queue);
        return (com.rabbitmq.client.impl.AMQImpl.Queue.BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Bind.Builder()).queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();
    }
//交换器绑定
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
        return (BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Exchange.Bind.Builder()).destination(destination).source(source).routingKey(routingKey).arguments(arguments).build()).getMethod();
    }
        3.2.5 消费者

                rabbitMQ是进行消息转发,不涉及业务处理,需要创建消费者来处理队列中的数据

                 主要使用步骤为:

  1. 创建连接工厂: 使用 ConnectionFactory 创建连接到 RabbitMQ 服务器的连接。

  2. 创建连接: 调用 newConnection 方法创建一个连接对象。

  3. 创建通道: 调用 createChannel 方法创建一个通道,所有的操作都在通道上进行。

  4. 声明队列: 使用 queueDeclare 方法声明要消费的队列。

  5. 创建消费者: 创建 Consumer 对象,实现 DefaultConsumer 抽象类的 handleDelivery 方法,该方法在接收到消息时被调用。

  6. 订阅队列: 调用 basicConsume 方法订阅队列,传入队列名称和消费者对象。

import com.rabbitmq.client.*;

public class SimpleConsumer {
    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 处理接收到的消息
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                }
            };

            // 订阅队列
            channel.basicConsume(QUEUE_NAME, true, consumer);

            // 等待接收消息,此处可以添加逻辑来保持程序运行
            Thread.sleep(10000);
        }
    }
}

消费者的基本属性

  • Consumer Tag(消费者标签): 在调用 basicConsume 时,可以传入一个消费者标签。消费者标签用于标识消费者,可以在取消订阅时使用。如果不提供标签,RabbitMQ 将生成一个唯一的标签。

  • Auto Acknowledgment(自动应答): 在调用 basicConsume 时,可以传入一个布尔值表示是否自动应答。如果设置为 true,一旦消息被接收,就会自动向 RabbitMQ 发送应答。如果设置为 false,需要在处理完消息后手动调用 basicAck 方法发送应答。

  • Delivery(传送信息): handleDelivery 方法的参数 Delivery 包含有关消息的信息,如交付标签、路由键、交换器等。

  • BasicProperties(基本属性): handleDelivery 方法的参数 BasicProperties 包含有关消息的基本属性,如消息的 ID、内容类型、优先级等。

消费者的应答机制

RabbitMQ 提供了两种消费者应答机制:

  • 自动应答: 设置 autoAck 参数为 true,一旦消息被接收,就会自动向 RabbitMQ 发送应答。

  • 手动应答: 设置 autoAck 参数为 false,在处理完消息后,需要手动调用 basicAck 方法发送应答。

自动应答简化了应用程序的逻辑,但在某些情况下可能导致消息的丢失。手动应答允许应用程序更细粒度地控制消息的确认。选择使用哪种应答机制取决于应用程序的需求和消息处理的语义。

消费者的错误处理

在实际应用中,消费者应该具备良好的错误处理机制。可以捕获异常并根据需要执行适当的操作,例如记录日志、重试、拒绝消息等。文章来源地址https://www.toymoban.com/news/detail-790345.html

@Override
public void handleDelivery(String consumerTag, Envelope envelope,
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
        // 处理接收到的消息
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);

        // 模拟处理中的异常
        if (message.contains("error")) {
            throw new RuntimeException("Simulated error during message processing");
        }

        // 手动应答
        channel.basicAck(envelope.getDeliveryTag(), false);
    } catch (Exception e) {
        // 异常处理逻辑
        System.err.println("Error processing message: " + e.getMessage());

        // 可以选择拒绝消息并重新入队,或者进行其他错误处理
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    }
}

到了这里,关于rabbitMQ大致讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件学习笔记--RabbitMQ(二、模式,一次违反常规的Java大厂面试经历

    .Fanout:转发消息到所有绑定队列 比较常用的是Direct、Topic、Fanout. Fanout 这种Fanout模式不处理路由键,只·需要简单的将队列绑定到exchange上,一个发送到exchange的消息都会被转发到与该exchange绑定的所有队列上。很像广播子网,每台子网内的主机都获得了一份复制的消息。Fan

    2024年04月09日
    浏览(99)
  • 消息队列RabbitMQ.02.交换机的讲解与使用

    目录 RabbitMQ中交换机的基本概念与作用解析 交换机的作用: 交换机的类型: 直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。 主题交换机(Topic Exchange): 使用通配符匹配路由键,允许更灵活的消息路由。 扇形交换机(Fanout E

    2024年01月24日
    浏览(57)
  • 【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月15日
    浏览(53)
  • RabbitMQ学习(五):RabbitMQ持久化

    在上一章内容中我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉后消 息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它将忽视队列 和消息,除非告知它不要这样做。 确保消息不会丢失需要做两件事:我们需

    2024年02月16日
    浏览(41)
  • RabbitMQ学习(二)——Linux下安装RabbitMQ

    1、 先去官网下载RabbitMQ 下载地址 :Downloading and Installing RabbitMQ — RabbitMQ 选择对应的系统版本点击下载,下载后会得到 .rpm 文件   2、下载Erlang RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,需要是安装 Erlang Erlang 和 RabbitMQ 版本对照:RabbitMQ Erlang Version

    2024年02月08日
    浏览(39)
  • 【Docker】Docker安装MySQL、Redis、RabbitMQ、Elasticsearch、Nacos等常见服务(质量有保证,详情讲解)

    本文描述的是使用Docker来安装我们常用的一些服务,以下示例都是作者自己在用的,质量有保证。 至于为什么使用Docker,因为方便,不需要下载、配置乱七八糟的, 安装常见服务之前先讨论下Docker容器间的连接或通信方式。随着安装的Docker容器越来越多,会发现存在一些复

    2024年02月15日
    浏览(53)
  • RabbitMQ系列(3)--创建RabbitMQ的Java项目

    1、新建空项目 2、给项目起名称 3、创建空项目后为空项目创建Maven模块 (1)新建模块 (2)选择Maven模块 (3)为模块创建名称 4、给新建的项目设置JDK (1)打开项目结构 (2)把项目的JDK版本设置为JDK1.8 (3)把模块的JDK版本设置为JDK1.8 5、为项目设置Maven依赖和Maven仓库 (1)打开 设置 (2)搜索

    2023年04月09日
    浏览(43)
  • 【Docker】Docker安装启动MySQL、Redis、RabbitMQ、Elasticsearch、Nacos等等常见服务(质量有保证,详情讲解)

    本文描述的是使用Docker来安装我们常用的一些服务,以下示例都是作者自己在用的,质量有保证。 至于为什么使用Docker,因为方便,不需要下载、配置乱七八糟的, 安装常见服务之前先讨论下Docker容器间的连接或通信方式。随着安装的Docker容器越来越多,会发现存在一些复

    2024年02月16日
    浏览(48)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包