RabbitMQ(基于AMQP的开源消息代理软件)

这篇具有很好参考价值的文章主要介绍了RabbitMQ(基于AMQP的开源消息代理软件)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、AMQP高级消息队列协议

(1)介绍

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

(2)工作流程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

二、RabbitMQ开源消息代理软件

 (1)介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。所有主要的编程语言均有与代理接口通讯的客户端库。

消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

(2)特点

RabbitMQ两大核心特性:异步消息、队列。 异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间。所有需要这种效果场景都可以使用MQ。 队列:进入队列的数据一定有先后之分。只要应用程序要对内容分先后的场景都可以使用MQ。 

(3)适用场景

1.应用解耦

2. 排队算法

使用队列特性。把数据发送给MQ,进入到队列就有了排队的效果

3.秒杀活动

使用队列特性。例如:抢红包、限时秒杀、直播卖货时抢商品。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束。

4.消息分发

在程序中同时向多个其他程序发送消息。应用了AMQP中交换机,实现消息分发

5.异步处理

利用MQ异步消息特性。大大提升主线程效率。

6.数据同步

利用异步特性。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步。

7.处理耗时任务

利用异步特性。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间。

8.流量削峰

在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(双11、品牌促销,10点抢购),如果使用监控工具,会发现这段时间访问出现顶峰。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋。是利用RabbitMQ中交换机实现的。

三、RabbitMQ核心原理

 RabbitMQ(基于AMQP的开源消息代理软件)

 发送者Publisher向RabbitMQ发送消息Message,在Message会包含路由键Routing Key、交换器名称、消息内容。交换器Exchange接收到消息Message后会根据交换器类型Exchange Type判断把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中。放入到队列成功后会返回给发送者Publisher一个ACK确认消息,表示消息发送成功了。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理,处理完成后会返回给RabbitMQ一个ACK,表示消息处理完成,RabbitMQ会删除这个消息。以上这些就是RabbitMQ的运行原理。  

核心概念

 1. Message

消息。它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。

2.Publisher

消息的生产者。也是一个向交换器发布消息的客户端应用程序。 ​ 通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher

3. Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出。使用MQ做耗时任务时,耗时任务就交给Consumer进行完成。

4. Exchange

交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。一共支持四种的交换器类型:

  1. direct(发布与订阅 完全匹配)

  2. fanout(广播)

  3. topic(主题,规则匹配)

  4. header(使用较少,相比direct就多了一些头信息)

5. Binding

绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 ​ 一个交换器里面可以绑定多个队列。一个队列一般都是只绑定到一个交换器上。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列。

6. Queue

消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

7. Routing-key

路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的映射,路由键是key,队列是value)。队列通过路由键绑定到交换器。 ​ 消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。 ​ 如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞(相当于丢弃)。 ​ 通俗理解:队列绑定到交换器时有路由键,这个路由键就相当于key-value中的key,value则是队列。当Publisher发送消息时一定会携带路由键(即使路由键是Null),有了路由键就让交换器知道了这个消息要发送给哪个队列。

8. Connection

链接。指Rabbit服务器和客户端建立的TCP链接。

9. Channel

Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。 ​ 在RabbitMQ中,TCP链接一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。

10. Virtual Host

虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 '/'。 ​ 通俗理解:一个RabbitMQ可以包含多个虚拟主机,每个虚拟主机都是一个RabbitMQ。平时我们没有去创建虚拟主机,都是使用RabbitMQ里面默认的'/'虚拟主机,单实际上一个RabbitMQ可以包含多个虚拟主机主机的,也就是说一个RabbitMQ可以包含多个实例。就像MySQL可以创建多个数据库一样。

11. Borker

表示消息队列服务器实体。就是RabbitMQ服务器进程。

12. 交换器和队列的关系

交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟交换器中绑定的路由键匹配,那么消息就会被路由到该绑定的队列中。 ​ 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。 ​ 路由键可以理解为匹配的规则。

13. RabbitMQ为什么需要信道?为什么不是TCP直接通信?

  1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。

  2. 如果不用信道,那应用程序就会以TCP链接RabbitMQ,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。

  3. 信道的原理是一条线程一条通道,多条线程多条通道共用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

这也是为什么使用RabbitMQ去处理秒杀、流量削锋、海量请求时依然对RabbitMQ比较有信心的原因。

四、安装RabbitMQ

 (1)拉取镜像

docker pull rabbitmq:management

(2)创建并启动容器

docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=bjsxt -e DEFAULT_PASS=bjsxt rabbitmq:management

 五、Spring AMQP介绍

Spring AMQP是Spring的顶级项目。是基于AMQP协议的消息传递解决方案。此框架提供顶级抽象模板AmqpTemplate接口,用于抽象消息传递标准。提供基于容器的监听处理。暂时Spring AMQP只提供了基于RabbitMQ处理消息传递的解决方案。其具体接口是RabbitOperations、实现是RabbitTemplate

六、使用

(1)引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

(2)编写配置文件

# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

(3)发送消息

/**
 * 测试Spring AMQP框架中发送消息的方式。
 */
@SpringBootTest
public class TestPublisher {
    /**
     * 注入客户端对象。
     * 类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
     * 建议使用接口: 优先级是 RabbitOperations > AmqpTemplate
     */
    @Autowired
    private RabbitOperations rabbitOperations;
    
    /**
     * 测试发送消息
     * 消息内容是字符串。
     * 注意:
     * Spring AMQP可以发送的消息类型必须是Message类型。
     * Spring AMQP可以帮助程序员自动封装消息类型Message对象。
     * 只要提供消息具体内容(消息体)即可实现默认封装。
     * Spring AMQP可以自动转换封装的消息体类型是Object。只要类型可序列化即可。
     */
    @Test
    public void testSendStringMessage(){
        String messageContent = "第一个字符串消息";
        String exchangeName = "direct.first.ex";
        String routingKey = "routing.key.1";

        // 发送消息的时候,只要指定要发送到的具体交换器名称,路由键,和消息内容即可。
        rabbitOperations.convertAndSend(exchangeName, routingKey, messageContent);

        System.out.println("消息发送完毕");
    }
}

(4)基于Configuration配置创建交换器、队列及完成绑定

@Configuration
public class RabbitMQConfig {
    // 发送消息时如果不存在这个队列,会自动创建这个队列。
    // 注意:是发送消息时,而不是启动项目时。
    // 相当于:可视化操作时创建一个队列
    // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
    @Bean
    public Queue queue(){
        return new Queue("queue.second");
    }

    // 如果没有这个交换器,在发送消息创建这个交换器
    // 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct.first.ex");
    }

    // 配置类中方法参数,会由Spring 容器自动注入
    @Bean
    public Binding directBingding(DirectExchange directExchange,Queue queue){
        // with(“自定义路由键名称”)
        return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
        // withQueueName() 表示队列名就是路由键名称
        // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    }
}

(5)编辑消息消费者

/**
 * 字符串类型消息体处理消费者
 * Spring AMQP和Spring Boot配合的启动器,可以自动注册监听。
 * 要求,当前类型的bean对象,必须被spring容器管理。
 */
@Component
public class StringMessageConsumer {

/**
     * 常规开发中,都会先定义消息的消费者。后定义消息的发布者。
     * 典型的观察者设计模式。先有监听,后有事件。
     *
     * 注解RabbitListener
     *  可选属性: 常用属性
     *   bindings - 定义绑定规则。属性类型是: QueueBinding[]
     * 注解QueueBinding,描述具体的绑定规则。就是交换器和队列的绑定规则。
     *  必要属性:
     *   value - 监听的队列,属性类型是Queue
     *   exchange - 队列绑定的交换器,属性是Exchange
     *  可选属性:
     *   key - 绑定的路由键都是什么,类型是String[]
     * 注解Queue,描述一个具体的队列,如果队列在RabbitMQ中存在,直接使用并监听;如果不存在,
     *  创建队列并监听。
     *  可选属性:
     *   name - 队列名称,String类型
     *   autoDelete - 是否是自动删除的队列,String类型。可选值: "true" | "false"
     *
     * 注解Exchange,描述一个具体的交换器,如果交换器存在,直接使用;如果不存在,则创建,
     *  并基于key绑定队列
     *  可选属性:
     *   name - 交换器名称,String类型
     *   autoDelete - 是否是自动删除的交换器,String类型。可选值: "false" | "true"。默认false
     *   type - 交换器的类型,String类型。默认是direct。可选: direct,fanout,topic,headers
     *    可以使用枚举类型中的常量赋值,具体是ExchangeTypes.XXX
     * @param messageBody
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.second", autoDelete = "false", durable = "true"),
                    exchange = @Exchange(name = "direct.second.ex",
                            autoDelete = "false", type = ExchangeTypes.DIRECT),
                    key = {"routing.key.second.1", "routing.key.second.2"}
            )
    })
    public void onMessage(String messageBody){
        System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    }
}

七、传递自定义消息

/**
 * 处理自定义类型消息体
 */
@Component
public class MyMessageConsumer {
    /**
     * Spring AMQP可以自动实现消息体类型转换。
     * 使用的方式是强制类型转换。只要包装传输的消息体数据类型和方法参数类型匹配即可。
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "pojo.queue1", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
                    key = {"routing.key.1"}
            )
    })
    public void onMessage1(MyMessage myMessage){
        System.out.println("处理自定义消息:" + myMessage);
    }

    /**
     * 可以通过统一消息类型Message处理消息内容
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "pojo.queue2", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
                    key = {"routing.key.2"}
            )
    })
    public void onMessage2(Message message) throws Exception{
        // 获取消息体,消息体是字节数组。根据具体类型进行处理。
        byte[] body = message.getBody();
        ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(body);
        ObjectInputStream inputStream =
                new ObjectInputStream(byteArrayInputStream);
        Object obj = inputStream.readObject();
        if(obj.getClass() == MyMessage.class){
            MyMessage myMessage = (MyMessage) obj;
            System.out.println(myMessage);
        }
        System.out.println("消息体中的对象类型是:" + obj.getClass().getName());
    }
}

八、ACK确认机制

发送消息确认

(1)修改配置

spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
    publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
    publisher-returns: true # 开启路由失败确认机制。默认值:false

(2)实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnsCallback接口重写不能到达交换器或路由失败时的回调处理逻辑

@Component
public class PublisherHandler implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法执行结束后立刻执行此方法。即初始化逻辑。
     */
    @PostConstruct
    public void init(){
        // 设置RabbitTemplate中的回调逻辑
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 消息路由失败回调逻辑
     * @param returned 路由失败的消息
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("交换器 : " + returned.getExchange());
        System.out.println("路由键 : " + returned.getRoutingKey());
        System.out.println("路由失败编码 : " + returned.getReplyCode());
        System.out.println("路由失败描述 : " + returned.getReplyText());
        System.out.println("消息 : " + returned.getMessage());
    }

    /**
     * 当交换器不能到达时,具体的处理方案。
     * @param correlationData 消息唯一标记
     * @param ack 是否确认
     * @param cause 不能到达交换器(即ack为false)的具体原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标记 : " + correlationData);
        System.out.println("是否确认到达交换器 : " + ack);
        System.out.println("不能到达交换器的原因 : " + cause);
    }
}

消费消息确认

在Spring AMQP中,消费者(Consumer)默认的ACK机制是自动确认,即消费代码正常执行结束,立刻确认消息已消费;消费代码发送异常,相当于消息未消费。如果希望关闭自动ACK机制,可使用两种处理方案实现。

(1)重试消费

可以在消费者中基于配置开启重试机制,并设置重试消费次数。当消费消息发生错误,导致未确认(NACK)时,消费者尝试重复消费消息;当重复消费次数到达设置阈值后,强制确认(ACK),RabbitMQ会移除队列中的消息。

修改配置文件

spring:
  rabbitmq:
    host: 192.168.91.128
    port: 5672
    username: bjsxt
    password: bjsxt
    listener:
      simple:
        retry:
          enabled: true # 开启重试机制
          max-attempts: 1  # 重试消费1次

(2)手工确认ACK

1.修改配置文件

spring:
  rabbitmq:
    host: 192.168.91.128
    port: 5672
    username: bjsxt
    password: bjsxt
    listener:
      simple:
        acknowledge-mode: manual # 手工确认。 默认AUTO,自动确认

2.正常消费

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工ACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 确认消息
             *  参数1 - 消息的唯一标识
             *  参数2 - 是否批量提交。为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,
             *         则可以一次性确认 deliveryTag 小于等于传入值的所有消息
             */
            channel.basicAck(deliveryTag, false);
            System.out.println("消息已确认");
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

3.错误消费

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工NACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 参数1 - 消息的唯一标识
             * 参数2 - 是否批量提交
             * 参数3 - 是否重新发出消息。false则废弃此消息。
             */
            channel.basicNack(deliveryTag, false, true);

            System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

4.Reject确认(丢弃)

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工ACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 参数1 - 消息的唯一标识
             * 参数3 - 是否重新发出消息。false则废弃此消息。
             */
            channel.basicReject(deliveryTag, false);

            System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

九、同步消息

(1)修改配置文件

spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
    template:
      reply-timeout: 100000 # 配置同步消息超时时长,单位毫秒

(2)消费者

/**
 * 同步消息消费者。如:抢红包。
 */
@Component
public class SyncMessageConsumer {
    /**
     * 同步消息消费方法。和异步消息消费方法的唯一区别就是有返回值。类型不限。
     * @param message
     * @return
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.sync", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.sync", type = "topic"),
                    key = {"routing.key"}
            )
    })
    public String onMessage(String message){
        System.out.println("处理同步消息:" + message);
        return "消息已处理";
    }
}

(3)发送者文章来源地址https://www.toymoban.com/news/detail-433010.html

    /**
     * 转换,处理,发送消息,并等待接收消费者反馈。方法返回值就是消费者端返回的确认消息,即消费方法返回结果
     * Object convertSendAndReceive(String exchange, String routingKey, Object messageBody)
     * 如果消费者超时未返回,代码自动结束,并向下继续运行。
     * 消费者反馈结果使用null填充。
     */
    @Test
    public void testSyncMessage(){
        String message = "测试同步消息";
        Object result = rabbitOperations.convertSendAndReceive(
                "topic.sync",
                "routing.key",
                message
        );
        if(result == null){
            System.out.println("超时未返回");
        }else {
            System.out.println(result.getClass().getName());
            System.out.println(result);
        }
    }

到了这里,关于RabbitMQ(基于AMQP的开源消息代理软件)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(53)
  • 4.RabbitMQ高级特性 幂等 可靠消息 等等

    保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点(Broker)确认应答 完善的消息进行补偿机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。 生产者进行接收应答,用来确定这条消息是否正常的发送到了Broker,这种方式也是

    2024年02月11日
    浏览(45)
  • RabbitMQ(三):AMQP协议_rabbitmq的amqp协议(2)

    1.1 AMQP协议介绍 因为RabbitMQ是一种遵循AMQP协议的分布式消息中间件,RabbitMQ实现的AMQP版本是0.9.1,所以在此处简单了解一下AMQP-0-9-1 协议。 1、AMQP是什么 AMQP,全称Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议。它支持符合要求的客户端应用(application)和

    2024年04月16日
    浏览(38)
  • RabbitMQ---Spring AMQP

    Spring有很多不同的项目,其中就有对AMQP的支持: Spring AMQP的页面:http://spring.io/projects/spring-amqp 注意这里一段描述: Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。 添加AMQP的启动器: 在application.yml中添加

    2024年02月11日
    浏览(28)
  • RabbitMQ高级特性解析:消息投递的可靠性保证与消费者ACK机制探究

    学习RabbitMQ高级特性,涵盖消息的持久化、确认模式、退回模式以及消费者ACK机制等方面,助您构建高可靠性的消息队列系统。

    2024年01月16日
    浏览(66)
  • 【云原生 | 27】Docker部署运行开源消息队列实现RabbitMQ

    作者简介:🏅云计算领域优质创作者🏅新星计划第三季python赛道第一名🏅 阿里云ACE认证高级工程师🏅 ✒️个人主页:小鹏linux 💊个人社区:小鹏linux(个人社区)欢迎您的加入! 目录 1. 关于MQ 1.1 什么是MQ? 1.2 MQ是干什么用的?  1.3 MQ衡量标准  1.4 主流竞品分析  2. 关

    2024年01月20日
    浏览(47)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(103)
  • 2023-07-06:RabbitMQ中的AMQP是什么?

    2023-07-06:RabbitMQ中的AMQP是什么? 答案2023-07-06: AMQP AMQP(Advanced Message Queuing Protocol)是一个应用层协议的开放标准,旨在设计面向消息的中间件。基于AMQP协议的客户端和消息中间件可以自由地传递消息,不受客户端、中间件产品或开发语言的限制。其目标是实现一种被广泛

    2024年02月12日
    浏览(51)
  • (四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。 1、 生产者批量发送消息 2、消费端配置限流机制 3、消费者监听队列 在RabbitMQ中,多个消费者监听同一条队列,则队列

    2024年02月15日
    浏览(43)
  • (一)RabbitMQ概念-优势、劣势、应用场景 、AMQP、工作原理

    Lison dreamlison@163.com , v1.0.0 , 2023.06.22 RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。 高可靠性、易扩展、高可用、功能丰富等 支持

    2024年02月15日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包