【RabbitMQ】消息队列-RabbitMQ篇章

这篇具有很好参考价值的文章主要介绍了【RabbitMQ】消息队列-RabbitMQ篇章。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、RabbitMQ是什么

RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用高可扩性易用性等特征。

1.1、RabbitMQ—使用场景

一般场景

像一般的下订单业务如下图:

将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端,
像这样耗时就很大 = 所有服务操作的耗时总和,而且若是这一整条执行链某个环节出了问题触发回滚,得不偿失

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();
    // 2: 发送短信服务
    messageService.sendSMS("order");//1-2 s
    // 3: 发送email服务
    emailService.sendEmail("order");//1-2 s
    // 4: 发送APP服务
    appService.sendApp("order");    
}

那么当我们开辟一个线程池去异步处理的话,也存在缺点:(最大的原因就是自己去实现起来,因素过多,实现复杂)
存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:出现了消息可能会丢失,需要你自己做消息补偿
4:如何保证消息的可靠性你自己写
5:如果服务器承载不了,你需要自己去写高可用
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

所以MQ就诞生了
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
只管下单,下单后直接就给用户提示下单成功,别的事交给mq去派发,让别的服务去mq拿消息处理

用户响应耗时 = 下单(主要)耗时(50ms) 别的(次要)处理服务全放到消息队列当中等待处理

public void makeOrder(){
    // 1 :保存订单 
    orderService.saveOrder();   
    rabbitTemplate.convertSend("ex","2","消息内容");
}
解耦

发送方将消息发送到消息队列中,接收方从队列中获取消息进行处理。这种松耦合的通信模式可以提高系统的可扩展性和灵活性。

这样使得下单服务并不受,发短信、发邮件、等等服务的影响(前提是下单不依赖任何一个服务的返回值)
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

削峰

当系统面临突然的请求高峰时,消息队列可以起到缓冲的作用。请求先进入消息队列排队,然后逐个被处理,使得系统能够逐渐消化高峰期的请求压力,避免过载和故障

也就是如果某个时刻有大量的请求,此时都会到mq里面去,而不会瞬间开启很多线程去异步执行,从而达到销峰的效果,使得即便大量的用户请求来了,那系统处理请求还是非常平滑的
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

异步

消息队列支持异步处理,即发送方发送消息后,并不需要等待接收方立即处理完成,而是继续执行其他任务。接收方在合适的时间从队列中获取消息进行处理。这种异步处理可以提高系统的性能和响应速度,尤其适用于处理耗时的操作。

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

2、Dokcer安装RabbitMQ

2.1安装Dokcer

  1. yum 包更新到最新
> yum update
  1. 安装软件包,yum-util提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
  1. 设置yum源为阿里云
> yum-config-manager --add-repo
> http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
  1. 安装docker
> yum install docker-ce-y
  1. 安装后查看docker版本
> docker-v
  1. 安装加速镜像

从阿里云获取镜像加速器:
https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": ["https://spukdfwp.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docke

2.2安装rabbitmq

  1. 路径:https://www.rabbitmq.com/download.html

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  1. 点击上图中标红线的 community Docker
    image,跳转到如下地址:https://registry.hub.docker.com/_/rabbitmq/

当前可以看到安装镜像的时候可以设置用户名,密码,ip。就不用安装完进入容器内部设置
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
3. 官网给的安装案例

$ docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management

4.命令讲解

docker run -id --hostname my-rabbit --name=myrabbit -p 15672:15672 rabbitmq:3-management

--hostname:指定容器主机名称
--name:指定容器名称
-p:将mq端口号映射到本地
-e 设置

5.修改命令创建并安装

docker run -di  --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3-management

6.阿里云开放上方命令 设置的端口号

-p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

7.安装成功

[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker images
REPOSITORY   TAG            IMAGE ID       CREATED        SIZE
rabbitmq     3-management   6c3c2a225947   7 months ago   253MB
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                                                                                                                                               NAMES
1de1f1e10cb0   rabbitmq:3-management   "docker-entrypoint.s…"   6 minutes ago   Up 6 minutes   4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp   myrabbit
[root@iZbp1av1izm1qqcdfa0nndZ ~]#

8.停掉手动安装的rabbimq

systemctl stop rabbitmq-server

9.启动docker的rabbitmq容器

##查看容器
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                                                                                                                                               NAMES
1de1f1e10cb0   rabbitmq:3-management   "docker-entrypoint.s…"   9 minutes ago   Up 9 minutes   4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:25672->25672/tcp, :::25672->25672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp   myrabbit
##启动容器 docker start 容器id(CONTAINER ID)
[root@iZbp1av1izm1qqcdfa0nndZ ~]# docker start 1de1f1e10cb0
1de1f1e10cb0
[root@iZbp1av1izm1qqcdfa0nndZ ~]#

10.通过服务器(虚拟机ip+端口号(15672))访问RabbitMQ主页http://192.168.157.128:15672

默认登录账号和密码都是admin
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

并且在admin账号下可以通过增加用户,给用户不同角色,也就对应不同的操作权限:
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
详情如下:
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

3、RabbitMQ入门案例 - Simple 简单模式

1.实现步骤:

1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程

2.构建一个maven工程
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
3.导入rabbitmq的maven依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>


<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>



<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>



4.启动rabbitmq-server服务

systemctl start rabbitmq-server
或者
docker start myrabbit

5、定义生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1", false, false, true, null);
            // 6: 准备发送消息的内容
            String message = "你好,学相伴!!!";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            // 8: 关闭连接
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

1:执行发送,这个时候可以在web控制台查看到这个队列queue的信息

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

2:我们可以进行对队列的消息进行预览和测试如下:

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

3:进行预览和获取消息进行测试

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

NACK 只是做消息预览,不会吧消息从队列移除
ACK相当于手动的把消息处理了,这个时候就会把消息从队列剔除,导致消息丢失

6、定义消费者

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        // 所有的中间件技术都是基于tcp/ip协议基础上构建新型协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();

            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到的消息是:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收失败了。。。");
                }
            });
            System.out.println("开始接收消息");
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("发送消息出现异常...");
        }finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者和生产者的区别在于,消费者是从mq中取消息,而生产者是从mq中存消息
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

4、RabbitMQ的核心组成部分

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
核心概念:

  1. Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
  2. Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
  3. Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
  4. Message:消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  5. Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
  6. Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
  7. Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
  8. Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
  9. Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

4.1 RabbitMQ整体架构

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

4.2RabbitMQ的运行流程

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

所以发送消息的时候没有设置交换机,rabbitmq发送消息一定会有默认一个交换机,并且消息不是直接到队列当中的,而是由交换机根据路由键发送消息到绑定的队列

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5、RabbitMQ的模式

5.1 发布订阅模式–fanout

特点:Fanout—发布与订阅模式,是一种广播机制,它是没有路由key的模式。

也就是只要生产者发送一条消息经过交换机加入队列中,左右的消费者都能拿到消息

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
这里就直接用web界面演示

  1. 新建一个fanout模式的交换机(让交换机代替生产者去发消息)
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  2. 创建3个消息队列q1、q2、q3
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  3. 将队列绑定到交换机上
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  4. 由交换机代替生产者发送消息
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  5. 然后三个队列都会有一个交换机发来的消息
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  6. q1队列消息正常被消费者拾取(其他队列一样)
    【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

  7. q1队列消息正常被消费者拾取之后,队列消息-1

ACK后  页面在自动会更新队列消息条目,默认5

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5.2路由模式-direct模式

Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

这样就可以给指定设置了路由key的队列发送消息,并且一个队列可以有多个路由key,当发送消息指定了路由key,则只有设置了相对应的路由key的队列才能接收到消息

5.3路由模式-Topic模式

Topic模式是direct模式上的一种叠加,增加了模糊路由RoutingKey的模式。

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

* 代表一级(必须有一级)
# 代表0级或者多级

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

注意最好用代码的形式来进行绑定

在实际开发中,我们既可以在RabbitMq的web界面进行交换机的创建,队列的创建,绑定路由key等等操作。
还可以在生产者代码里面通过channel.XXX的方式设置交换机,设置队列,设置路由key,等等,效果是一样的

例如下面代码:生产者(消费者也可以声明) 代码实现【交换机和队列】的声明和绑定

package com.xxx.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ入门案例 - 完整的声明方式创建
 * 代码实现创建交换机和队列,并绑定关系
 * 生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("121.196.153.197");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 准备发送消息的内容
            String message = "你好,交换机";
            // 6:准备交换机。取名规范 :  类型_业务模块_交换机
            String exchangeName = "direct_order_exchange";
            // 7: 定义路由key
            String routeKeyOrder = "order";
            String routeKeyCourse = "course";
            // 8: 指定交换机的类型
            String exchangeType = "direct";
            // 9: 声明交换机/注册交换机
            // @params1: 交换机名称
            // @params2: 交换机类型
            // @params3: 是否持久化 所谓的持久化就是值:交换机不会随着服务器的重启造成丢失,如果是true代表不丢失,false重启丢失
            channel.exchangeDeclare(exchangeName, exchangeType, true);
            // 10: 声明队列/注册队列
            // @params1: 队列名称
            // @params2: 是否持久化
            // @params3: 是不是排他性,是否是独占独立
            // @params4: 是不是自动删除 随着最后一个消费者消息完毕消息以后是否把队列自动删除
            // @params5: 是不是有参数 参数携带可能会引发headers模式
            channel.queueDeclare("queue5", true, false, false, null);
            channel.queueDeclare("queue6", true, false, false, null);
            channel.queueDeclare("queue7", true, false, false, null);
            // 11: 绑定队列
            // @params1: 队列名称
            // @params2: 交换机名称
            // @params3: routeKey
            channel.queueBind("queue5", exchangeName, routeKeyOrder);
            channel.queueBind("queue6", exchangeName, routeKeyOrder);
            channel.queueBind("queue7", exchangeName, routeKeyCourse);
            // 12: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routeKeyOrder, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 13: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }

    }
}


消费者去队列拿消息:

package com.xxx.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * RabbitMQ入门案例 - 完整的声明方式创建
 * 消费者
 */
public class Consumer {
    private static Runnable runnable = new Runnable() {
        public void run() {
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("121.196.153.197");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection();
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
            } finally {
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue5").start();
        new Thread(runnable, "queue6").start();
        new Thread(runnable, "queue7").start();
    }

}


5.4轮询模式 - Work模式

5.4.1Work模式 - 轮询模式(Round-Robin)

轮询模式的分发:一个消费者一条,按均分配;

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

生产者:

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author: 学相伴-飞哥
 * @description: Producer 简单队列生产者
 * @Date : 2021/3/2
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) {
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者work1:

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author: 学相伴-飞哥
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work1 {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work1-开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

work2

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author: 学相伴-飞哥
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Work2 {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("192.168.157.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work2");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, true, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            //channel.basicQos(1);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try{
                        System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println("Work2-开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

work1和work2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。

往队列发送20条消息,
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

结果就是轮询给消费者拿消息,即便有的消费者消费很快(也只能按照新顺序拿消息),也只能按照轮询一个一个拿,

也就是说,不会因为某个消费者所在的服务器满,而导致少消费,一定是公平消费
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

5.4.1Work模式 - 公平分发模式(Round-Robin)

根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

相比较轮询模式,公平分发的不同在于:修改应答方式为手动
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

qos设置为1,就代表消费者拿到cpu的执行权就每次从只拿走一条消息,一条一条的拿。若不设置,就是默认轮询拿一条
所以根据队列堆积的消息条数以及内存和磁盘空间来合理设置qos

【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
这个时候,性能好的消费者就会消费得多,而性能差的消费者就消费得少,能者多劳
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java
【RabbitMQ】消息队列-RabbitMQ篇章,RabbitMq,rabbitmq,java

更新中------
参考来自:狂神文章来源地址https://www.toymoban.com/news/detail-659284.html

到了这里,关于【RabbitMQ】消息队列-RabbitMQ篇章的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(57)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(79)
  • 3.精通RabbitMQ—消息队列、RabbitMQ

    RabbitMQ面试题 (总结最全面的面试题) 入门RabbitMQ消息队列,看这篇文章就够了 消息队列 是一种基于 队列 ,用于解决 不同进程或应用 之间 通讯 的 消息中间件 。 支持多种 消息传递模式 ,如 队列模型 、 发布/订阅模型 等。 业务解耦 :通过 发布/订阅 模式,减少系统的 耦

    2024年02月15日
    浏览(79)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(82)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • RabbitMq消息模型-队列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 队列消息特点: 消息不会丢失 并且 有先进先出的顺序。 消息接收是有顺序的,不是随机的,仅有一个消费者能拿到数据,而且不同消费者拿不到同一份数据。 基本模型: SimpleQueue 在上图的模型中,有以下几个概念: P:为生产

    2024年02月09日
    浏览(48)
  • 【RabbitMQ】RabbitMQ 消息的堆积问题 —— 使用惰性队列解决消息的堆积问题

    消息的堆积问题是指在消息队列系统中,当生产者以较快的速度发送消息,而消费者处理消息的速度较慢,导致消息在队列中积累并达到队列的存储上限。在这种情况下,最早被发送的消息可能会在队列中滞留较长时间,直到超过队列的容量上限。当队列已满且没有更多的可

    2024年02月05日
    浏览(50)
  • 【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月09日
    浏览(43)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(55)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(78)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包