RabbitMQ-消息中间件学习记录(what-how-why)

这篇具有很好参考价值的文章主要介绍了RabbitMQ-消息中间件学习记录(what-how-why)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  1. 什么是消息中间件
    简单的来说就是消息队列中间件,生产者发送消息到中间件,消息中间件用于
    保存消息并发送消息到消费者。

  2. 消息中间件RabbitMQ的基本组件
    1)producer -生产者
    2)customer -消费者
    3)broker (经纪人)- MQ服务器,管理消息对列、消息及相关消息。(接收并存储生产者发送的消息,发送消息到消费者)
    4)exchange-交换机,将生产者的消息按照一定规则发送给对应的消息对列queue
    5)queue-消息对列,队列,消息存放的容器,消息先进先出
    6)Message-消息,程序间的通信的数据

  3. 什么是消息队列queue(生产者生产msg-queue,消费者监听queue-消费)
    消息对列是一种分布式中的通信方式,它通过异步传输消息的方式,来解耦消 息的 生产者和消费者。在消息中间件中,生产者将消息发送到消息对列中,以为先进先出的方式,消费者从对列中取出消息(可以监听对列是否有消息-@RabblitListener和@RabbitHandler)

  4. 消息中间件的作用
    主要有三个作用:分别是服务解耦、实现异步通信、流量削峰
    1). 服务解耦:(场景-用户下订单、库存服务工作)
    例如订单服务-用户下订单,库存服务处理对应减库存,才返回给用户下单成功的消息。如果说库存服务出现了问题,就会造成订单丢失等问题。如果使用消息中间件(消息对列),可以把下的订单信息—> mq就返回用户下单这个,mq再发送给库存服务,这样生产者发送消息和消费者接收处理消息相互不影响,即使宕机了,消息还在中间件中。

2). 异步通信/异步调用:(用户注册新用户,服务发送短信和邮件)
传统的模式,用户注册系统新用户,服务给用户发送短信和邮件,三个操作都完成之后才返回用户下单注册的消息。因为短信和邮箱和注册信息是没有关系的服务,用户注册后消息发送给mq,用户不需要等邮件和短信发送成功,mq直接返回用户注册成功,至此用户注册业务完成。至于短信和邮件交给mq发送给短信业务-去发送。

注意:
异步就是某线程发出请求,不需要等其他线程完成就接着完成操作。用户注册,消息发送给mq,不需要等短信服务完成,短信发布发送都与注册无关,两者是异步关系。异步不是并发,所有操作同时进行,异步是各过各的。

3). 流量削峰:(商品秒杀)
例如商品秒杀的时候,这时候数据库并不能承受这么大的请求。可以把请求下订单的信息暂存在mq中,返回给用户下单成功,之后的操作由mq发送给对应的服务处理。缓存数据减少数据库的压力。

  1. 为什么需要使用消息中间件
    服务解耦、异步通信、流量削峰

  2. 消息中间件在分布式系统中使用场景(异步)
    6.1 服务解耦-订单和库存服务。用户下订单,消息发给mq,mq返回用户下订单成功,消费者-库存服务接收mq消息再去调用减少库存的消息。
    6.2 异步通信-用户注册新账户 用户注册和admin发送短信和邮件异步
    6.3 流量削峰-商品秒杀,先mq先存储订单信息,返回订单服务下单成功,后慢慢处理。减少大并发对数据库的影响/。

  3. RabbitMQ的五种消息模型/工作模式、
    1) simple 简单的一对一模式,producce-queue-customer
    2) word模式,一个消息对列queue—> 多个消费者,消费者争抢消息队列里面消息,注意一个消息只能被一个消费者消费。
    3) fanout-广播、订阅者模式。交换机将消息发送给所有binding的对列,消费端可以有多个customer使用word模式消费对列的消息。
    4) topic-主体模式,生产者的消息按照不同的路由规则,模糊匹配给不同满足条件的消息对列,消费者再去消费对列中消息
    5)routeKey,路由键(exchange-type-direct),按照不同的路由键发送到对应的queue中。

  4. 消息中间件是异步还是同步
    异步,各干各的,互不影响。(异步并不是并发-同时请求一个请求,而是互不影响个干各的,没有约束和先后顺序)。received生产者的message,send消息到消费者。二者是异步,解耦合互不影响。

  5. mq的消息确认机制confirm(MQ如何避免消息丢失?)

    1. . 对于生产者端来说,主要有两种确认机制
      a. message到broker后,mq立马确认confirm并返回消息告知生产者消息发送成功,如果失败也告知生产者,并重新发送。
      b. message到MQ之后,如果消息对列没有received成功(queue存储msg成功),会确认并返回消息接收失败到生产者
      a b 保证了生产者端不会丢失消息。

    2). 对于消费者来说。
    a. 消费者接收到queue的消息后,默认自动确认,queue删除该message。
    b. 消费者接收到msg后,对数据进行逻辑处理,如果直接confirm-queue直接删除msg,处理数据过程中可能会宕机消息丢失。
    ----设置为手动confirm确认收货,数据处理完再收货成功,queue再去删除msg。也可以对数据不满,退回到queue重新入队,也可以直接删除数据。
    c. 接收失败告知queue,不会删除数据,MQ重新发送消息-这种操作很常见
    这样避免数据在消费者端丢失

1、2两种方式避免了mq的消息丢失。

  1. MQ重复消费
    1)如何造成重复消费
    (1) 生产者端,传输到MQ-queue消息对列接收成功,MQ因为网络问题没有ack->producer,导致生产者又发送了一次消息到MQ。queue-customer-这样msg就被消费了两次。
    (2)消费者端,MQ-queue消息对列消息传到customer。一种是消费者没有接收成功,因为网络问题没有ack queue,queue重复发送,这种不会造成msg重复消费。另一种是消费者消费成功,但是因为不可控因素没有ack queue,消息对列重复发送mgs-to-customer-重复消费。

    2)解决方法
    对于幂等性消息(查询),消费者重复消费也没有关系。
    对于非幂等性消息,消费者重复消费就会有影响了。
    方法:消费者在消费消息之前,获取msg唯一id,到redis进行存储判断setnx(判断是已经存在并存储key-value)。

Boolen flag=stringRadisTemplate.opsForValue.setAbsent(id,value);
1-flag=true,key不存在,未被消费,c正常消费msg
2-false,key存在,已经被消费(两种可能-正在消费或者完成消费-忘记告知ack-queue了),无论哪种情况都直接丢弃。

注意一个问题:如果redis显示有消费记录-且消费者正在消费,此时消费者执行业务宕机了,redis分布式锁会成死锁-解决方法在IfAbsent方法加上过期时间和单位。

一句话就是:消费之前,缓存中有消费记录则丢弃消息,不二次消费。
redis缓存中没有消费记录则,重复存入缓存并消费(设计锁过期时间)。

以下是消息中间件MQ的相关代码和配置信息

  1. 使用MQ的步骤
    1)在pom文件中加上依赖amqp
 <dependency>
       <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency
2) 配置文件配置rabbit服务器的对应信息(spring.rabbitmq host、port,username,ps等)
spring.rabbitmq.host=rabbitmq服务器地址信息
spring.rabbitmq.port=端口号
spring.rabbitmq.username=账户name
spring.rabbitmq.password=密码
spring.rabbitmq.virtual-host=/
#1. 生产者发送message, mq收到消息就确认回复到生产者
spring.rabbitmq.publisher-confirms=tr
#2. queue消息对列接收生产者的消息失败,就确认返回消息到生产操者
spring.rabbitmq.publisher-returns=true
#3. 消费者接收queue消息对列的消息之后,手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3) 服务启动类上面加上注解@EnableRabbit-开启MQ
在springboot启动类加上 @EnableRabbit-开启MQ

4) 业务使用消息中间件存储消息的时候
(1) 创建交换机(注意有不同类型的交换机 direct-fanout-topic)

        public void createExchange() {
//        1. 创建direct类型的exchange     交换机的名字-hello.java.exchange
        DirectExchange directExchange = new DirectExchange("hello.java.exchange", true, false);
//       2. 声明交换机
        amqpAdmin.declareExchange(directExchange);
        log.info("exchange创建成功1111", "hello.java.exchange");
    }

(2)创建消息队列queue

public void createQueue() {
//       1. 创建队列-queue  队列名称-hello-java-queue
        Queue queue = new Queue("hello.java.queue", true, false, false);
//       2. 声明mq队列
        amqpAdmin.declareQueue(queue);
        log.info("queue创建成功1111", "hello.java.queue");
    }
   

(3)交换机和消息队列直接关系绑定

    public void bindEQ() {
//        1. 创建绑定对象( "hello.java.queue"--消息对列, "hello.java.exchange"--交换机,"hello.java"-绑定关系的route-key)
        Binding binding = new Binding(
                "hello.java.queue",
                Binding.DestinationType.QUEUE,
                "hello.java.exchange",
                "hello.java",
                null);
                
//       2. 声明绑定关系(这个关系实际也是一个对象)
        amqpAdmin.declareBinding(binding);
        log.info("Binding创建成功1111", "hello.java.binding");
    }

(4)使用MQ的操作工具类 RabbitTemplate-操作发送消息
对象注入

  @Autowired
    RabbitTemplate rabbitTemplate;

生产者发送消息,需要携带消息-mgs和发送给哪个queue的route-key。注意发送消息需要一个唯一id,后面防止重复发送需要此id判断

    public void sendMessageStr() throws InterruptedException {
        String msg = "测试数据测试数";
//        发送10条message到exchange中
//        new CorrelationData(UUID.randomUUID().toString()  发送的消息的唯一id mq可以接收并处理
        rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java" , msg+ "11111111111111", new CorrelationData(UUID.randomUUID().toString()));
        rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", msg + "222222222222", new CorrelationData(UUID.randomUUID().toString()));
        log.info("交换机消息发送成功----------->");
    }
   (4)消费者监听消息对列消息,消费消息
   使用@RabbitListener监听消息对列,使用RabbitHandler接收对应类型的消息。前者放在类上面,后者放到监听方法上面。
queues是消息对列名称的集合
@RabbitListener(queues = {"hello.java.queue"})

使用@RabbitHandler监听不同类型的消息文章来源地址https://www.toymoban.com/news/detail-649186.html

// 消息是TestEntity2 类型,会自动匹配到对应方法接收
@RabbitHandler
public void receiveOfSecond(TestEntity2 testEntity2) throws InterruptedException {
    System.out.println("receiveOfSecond-监听接受queue的数据是----->" + testEntity2);
}

@RabbitHandler
public void receiveOfFirst(TestEntity testEntity) throws InterruptedException {
    System.out.println("receiveOfFirst-监听接受queue的数据是----->" + testEntity);
}

到了这里,关于RabbitMQ-消息中间件学习记录(what-how-why)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(53)
  • 消息中间件之RabbitMQ

    1.基于AMQP协议Erlang语言开发的一款消息中间件,客户端语言支持比较多, 比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高, 可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取 零拷贝技术,消息堆积时,性能会下降 2.消息吞吐量在

    2024年01月19日
    浏览(71)
  • 消息中间件RabbitMQ详解

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中

    2024年02月16日
    浏览(34)
  • 中间件RabbitMQ消息队列介绍

    1.1 什么是 MQ MQ ( message queue ),从字面意思上看,本质是个队列, FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中, MQ 是一种非常常 见的上下游 逻辑解耦+物理解耦 的消息通信服务。使用了 MQ 之

    2024年02月13日
    浏览(44)
  • RabbitMQ:可靠消息传递的强大消息中间件

     消息中间件在现代分布式系统中起着关键作用,它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中,我们将重点介绍 RabbitMQ,一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用它来实现可靠的消息传

    2024年02月12日
    浏览(56)
  • 高性能消息中间件 RabbitMQ

    消息队列 MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的 异步通信 。 同步通信相当于两个人当面对话,你一言我一语。必须及时回复: 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。 消息

    2024年02月11日
    浏览(86)
  • Springboot整合RabbitMQ消息中间件

    spring-boot-rabbitmq–消息中间件整合 前言:RabbitMQ的各种交换机说明 1、直连交换机 生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中 直连交换机通常用来循环分发任务给多个workers,

    2024年02月11日
    浏览(30)
  • 消息队列中间件(二)- RabbitMQ(一)

    接收,存储,转发消息 生产者 交换机 队列 消费者 简单模式 工作模式 发布 路由模式 主题模式 发布订阅模式 Broker 接收和分发消息的应用 Virtual host 虚拟分组 Connection: TCP连接 Channel: 节省连接,每次访问建立一次Connection消耗太大,所以使用信道代替连接 交换机 队列 www.r

    2024年02月11日
    浏览(43)
  • 中间件_RabbitMQ五种消息模型

    RabbitMQ官方文档 RabbitMQ 提供了5种常用消息模型。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。 简单消息队列官方文档 1、创建简单消息队列 2、导入依赖 3、编写生产者测试类SpringAmqpTest,并利用 RabbitTemplate 实现消息发送 4、编写消费者,监听队列消息

    2024年02月06日
    浏览(28)
  • 快速掌握MQ消息中间件rabbitmq

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.video A https://www.bilibili.com/video/BV1cb4y1o7zz?p=12vd_source=533ee415c42b820b0f4105acb4932a02 参考资料 官方文档 开源社区 博客文

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包