MQ-消息队列-RabbitMQ

这篇具有很好参考价值的文章主要介绍了MQ-消息队列-RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.概念:

  • MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息
  • MQ的作用
    • 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。 
    • 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
    • 异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
    • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪
  • MQ的缺点
    • 系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况
    • 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
    • 业务一致性。主业务和从属业务一致性的处理
  • 主要的MQ产品

    • 主要的MQ产品包括:
      • ​​​​​​RabbitMQ、   erlang的          可用性很高     单击吞吐量一般   微秒级  可靠性高    
      • ActiveMQ、   apache ,java     可用性一般      单击吞吐量差         毫秒级       一般
      • RocketMQ、  阿里的,Java       高                   单击吞吐量高          毫秒级       高
      • ZeroMQ、
      • Kafka、      Scala  ,Java        高                单击吞吐量非常高       毫秒级     一般
      • IBM WebSphere 等
  • RabbitMQ:

    • 概念:RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成
    • docker下载和使用
      • 拉取
        •  docker pull rabbitmq:3.8-management
        • MQ-消息队列-RabbitMQ
      • 查看镜像
        • docker images
      • 给rabbitmq设置容器
        •  
          docker run \
          -e RABBITMQ_DEFAULT_USER=rabbit \
          -e RABBITMQ_DEFAULT_PASS=rabbit \
          -v mq-plugins:/plugins \
          --name mq \
          --hostname mq \
          -p 15672:15672 \
          -p 5672:5672 \
          -d \
          rabbitmq:3.8-management
          
        • MQ-消息队列-RabbitMQ
        • MQ-消息队列-RabbitMQ
      • 然后直接在地址栏访问rabbitmq的后台管理界面
        • 192.168.8.171:15672
        • 登录密码是刚刚设置的rabbit,rabbit
        • MQ-消息队列-RabbitMQ
      • 用idea使用MQ
      • 新建maven项目,改成2.3.9
      • 导入需要用到的依赖
      •         <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-amqp</artifactId>
                </dependency>
      • 然后我们在这个mq1父项目当中建两个子模块,一个充当消息的发送者,另一个充当接收者(消费者):pub,consumer

      • 先建发送者,我们选择的是手动配置模块,建maven模块

      • MQ-消息队列-RabbitMQ

      • 在模块当中,因为我们未选择自动配置springboot项目,我们需要将父项目的主配置文件放入pub这个项目的resource目录下,再建一个启动类,和一个测试类

        MQ-消息队列-RabbitMQ

         
        package com.pro;
        
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        
        @SpringBootApplication
        public class PubApp {
            public static void main(String[] args) {
                SpringApplication.run(PubApp.class,args);
            }
        }
        

         发送者测试类:

        • 建立连接

        • 建立通道

        • 创建队列

        • 发送消息

        • 关闭资源

        • package com.pro.test;
          
          
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import org.junit.Test;
          import org.junit.runner.RunWith;
          import org.springframework.boot.test.context.SpringBootTest;
          import org.springframework.test.context.junit4.SpringRunner;
          
          @RunWith(SpringRunner.class)
          @SpringBootTest
          public class PubTest {
              @Test
              public void testSendMeg() throws Exception {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("192.168.8.171");
                  factory.setPort(5672);
                  factory.setUsername("rabbit");
                  factory.setPassword("rabbit");
          
                  //建立连接
                  Connection connection = factory.newConnection();
          
                  //建立通道
                  Channel channel = connection.createChannel();
          
                  //创建队列
                  String queueName="xjj";
                  channel.queueDeclare(queueName,false,false,false,null);
          
                  for (int i = 0; i < 5; i++) {
                      //发送消息
                      String message = "杨星大帅哥";
                      channel.basicPublish("",queueName,null,message.getBytes());
                      System.out.println("消息发送成功"+message);
                  }
          
          
                  //关闭资源
                  channel.close();
                  connection.close();
                  //通道的声明
          
              }
          }
          

          测试发送消息之后,我们就可以在管理平台上很清晰的看到有待消费的消息

          MQ-消息队列-RabbitMQ

          • 在第一次测试的时候,报了这样的错
          • MQ-消息队列-RabbitMQ
          • MQ-消息队列-RabbitMQ
          • 但是正常是不会出现这种情况的,发现配置文件都不正常,所以就重新建立一个项目mq2
          • MQ-消息队列-RabbitMQ
          • 步骤就是上面一样的
    • 现在,我们来建另一个消费者模块consumer,需要将父项目的主配置文件放入pub这个项目的resource目录下,再建一个启动类,和一个测试类,
    • MQ-消息队列-RabbitMQ
    • package com.pro.test;
      
      import com.rabbitmq.client.*;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.springframework.boot.test.context.SpringBootTest;
      import org.springframework.test.context.junit4.SpringRunner;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class ConsumerTest {
      
          @Test
          public void testConsumerMsg() throws IOException, TimeoutException {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("192.168.8.171");
              factory.setPort(5672);
              factory.setUsername("rabbit");
              factory.setPassword("rabbit");
      
              //建立连接
              Connection connection = factory.newConnection();
              //建立通道
              Channel channel = connection.createChannel();
              //创建队列
              String queueName="xjj";
              channel.queueDeclare(queueName,false,false,false,null);
      
              //订阅消息:从队列当中取数据
              channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      //body :就是发过来的消息
                      //处理消息
                      String msg = new String(body);
                      System.out.println("收到"+queueName+"的消息:"+msg);
                  }
              });
              System.out.println("消费者,一直在这里等消息");
          }
      }
      

      MQ-消息队列-RabbitMQ

    • 如果想实现发送者已发送消息,消费者立马可以看到的效果,就需要写一个测试类ConnTest,来运行刚刚测试类当中的代码,但是需要用到main方法,作为程序运行的主入口,让获取消息队列的消息这个方法一直在后台运行,然后,我们再去发送者模块,发送消息,立马就能获取并打印出来
    • MQ-消息队列-RabbitMQ
    • MQ-消息队列-RabbitMQ
    • package com.pro.com.pro.test;
      
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      public class ConnTest {
          public static void main(String[] args) throws IOException, TimeoutException {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("192.168.8.171");
              factory.setPort(5672);
              factory.setUsername("rabbit");
              factory.setPassword("rabbit");
      
              //建立连接
              Connection connection = factory.newConnection();
              //建立通道
              Channel channel = connection.createChannel();
              //创建队列
              String queueName="xjj";
              channel.queueDeclare(queueName,false,false,false,null);
      
              //订阅消息:从队列当中取数据
              channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      //body :就是发过来的消息
                      //处理消息
                      String msg = new String(body);
                      System.out.println("收到"+queueName+"的消息:"+msg);
                  }
              });
              System.out.println("消费者,一直在这里等消息");
          }
      }
      
    • 通过这个案例的练习,我们可以知道,消息中间件可以实现
      • 发送者发送完消息,消费者不去取,中间件会一直保存(快递驿站)
      • 取完之后,驿站里面的快递(消息)就不存在消息队列当中了
      • 可以应用到发短信上面
  • 10月17日使用AMQP简化冗长的代码

  • amqp:
    • 首先在配置文件当中配置好连接消息队列
    • MQ-消息队列-RabbitMQ

    • 然后先来简化发送者的代码,先写一个配置类来配置队列queue(也可以在pubApp启动类当中来配置),不然后面你发送的时候,还没有创建队列,就无法发送到队列当中
    • MQ-消息队列-RabbitMQ
    • 然后我们再执行这个测试类来发消息(很清晰的发现代码量相比昨天少了很多)
    • MQ-消息队列-RabbitMQ
    • 有了发送者,现在我们需要修改接收者

    • 修改这也是一样需要在配置文件当中配置连接地址
    • 然后写一个监听类来监听队列名为“Java”的队列,然后读取里面的消息
    • MQ-消息队列-RabbitMQ
    • 然后启动启动类就可以发现效果了
    • MQ-消息队列-RabbitMQ

      

  • work工作队列

  • MQ-消息队列-RabbitMQ
  • 可以共享队列,实现消息分摊
  • 假设发送者发了50条消息
  • MQ-消息队列-RabbitMQ
  • MQ-消息队列-RabbitMQ
  • 现在写两个消费者来消费,监听器里面配置两个消费者,通过启动类启动后观察消费情况
  • 下图中搞错了,是两个消费者平摊了50条数据,可以看到按单双消费
  • MQ-消息队列-RabbitMQ
  • 但是我们现在想要实现按需分配,能者多劳,速度快的做的多
  • 那么我们需要在消费者的配置文件当中加上一个配置prefetch
  • MQ-消息队列-RabbitMQ
  • #表示处理完一个才再分配一个
    spring.rabbitmq.listener.simple.prefetch=1
  • MQ-消息队列-RabbitMQ
  • 广播

  • MQ-消息队列-RabbitMQ
  • 我们先把交换机绑定两个队列
  • MQ-消息队列-RabbitMQ
  • MQ-消息队列-RabbitMQ
  • MQ-消息队列-RabbitMQ
  • 然后我们要在监听器里面监听这两个队列,来获取消息

  •   @RabbitListener(queues = "fanout.queue1")
        public void listenerFanoutQueue1(String msg)throws Exception{
            System.out.println("消费者接受到了fanout.queue1的消息:"+msg);
        }
        @RabbitListener(queues = "fanout.queue2")
        public void listenerFanoutQueue2(String msg)throws Exception{
            System.out.println("消费者接受到了fanout.queue2的消息:"+msg);
        }
  • 然后,我们现在要到生产者这边生产并发送消息:指定交换机和消息

  • @Test
        //发送50条消息到Java队列中
        public void testSendFanoutExchange() throws InterruptedException {
            String exchange = "ycznz.fanout";
            String msg = "hello,fanout 测试交换机发到两个队列中";
            //指定交换机,和指定消息
            rabbitTemplate.convertAndSend(exchange, "", msg);
        }
  • 可以发现两个消费者都收到了这条消息,这样测试的目的在于知道,通过交换机指定队列,消费者再从队列当中得到消息,两个消费者都是均衡的得到发送者发送的消息
  • MQ-消息队列-RabbitMQ

 文章来源地址https://www.toymoban.com/news/detail-494138.html

到了这里,关于MQ-消息队列-RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQ消息队列(主要介绍RabbitMQ)

    消息队列概念:是在消息的传输过程中保存消息的容器。 作用:异步处理、应用解耦、流量控制..... RabbitMQ:     SpringBoot继承RabbitMQ步骤:         1.加入依赖          2.配置         3.开启(如果不需要监听消息也就是不消费就不需要该注解开启)         4.创建队列、

    2024年02月11日
    浏览(35)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(29)
  • RaabitMQ(三) - RabbitMQ队列类型、死信消息与死信队列、懒队列、集群模式、MQ常见消息问题

    这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项,Durable和Transient。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。但是同时,由于需

    2024年02月14日
    浏览(33)
  • MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

    书接上文,展示一下五种模型我使用的是spring could 微服务的框架 文章说明:         本文章我会分享总结5种实用的rabbitMQ的实用模型 1、hello world简单模型 2、work queues工作队列 3、Publish/Subscribe发布订阅模型 4、Routing路由模型 5、Topics 主题模型 (赠送) 6、消息转换器 Rabbi

    2024年02月05日
    浏览(38)
  • mq 消息队列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十几年前,淘宝的notify,借鉴ActiveMQ。京东的ActiveMQ集群几百台,后面改成JMQ。 Linkedin的kafka,因为是scala,国内很多人不熟。淘宝的人把kafka用java写了一遍,取名metaq,后来再改名RocketMQ。 总的来说,三大原因,语言、潮流、生态。 MQ这种东西,当你的消息量不大的时候,用啥

    2024年02月12日
    浏览(35)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(74)
  • 【学习日记2023.6.19】 之 RabbitMQ服务异步通信_消息可靠性_死信交换机_惰性队列_MQ集群

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收

    2024年02月11日
    浏览(33)
  • 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序

    目录 基本概念  MQ 的优势  1.应用解耦  2.异步提速  3.削峰填谷  MQ 的劣势 使用mq的条件  常见MQ产品  RabbitMQ简介 RabbitMQ的六种工作模式   JMS RabbitMQ安装和配置。 RabbitMQ控制台使用。 RabbitMQ快速入门——生产者 需求: RabbitMQ快速入门——消费者 小结  多个系统之间的通信方

    2024年02月16日
    浏览(33)
  • MQ消息队列详解以及MQ重复消费问题

    https://blog.csdn.net/qq_44240587/article/details/104630567 核心的就是:解耦、异步、削锋 现有ABCDE五个系统,最初的时候BCD三个系统都要调用A系统的接口获取数据,一切都很正常,但是突然,D系统说:我不要了,你不用给我传数据了,A系统无奈,只能修改代码,将调用D系统的代码删除

    2024年04月13日
    浏览(39)
  • 消息队列MQ

    MQ的原理可以简单概括为生产者将消息发送到队列中,消费者从队列中获取消息进行处理。具体来说,MQ的原理包括以下几个方面: 生产者:生产者将消息发送到MQ服务器中,消息可以是文本、对象、文件等形式。生产者可以使用API或者其他工具将消息发送到MQ服务器,同时可

    2024年02月03日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包