RabbitMQ 的基本概念

这篇具有很好参考价值的文章主要介绍了RabbitMQ 的基本概念。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一 MQ 的基本概念

1 MQ概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。消息队列就是所谓的存放消息的队列。 消息队列解决的不是存放消息的队列的⽬的,解决的是通信问题

  • 传统方式,系统之间直接调用 (http协议 httpclient/openFeign)

RabbitMQ 的基本概念,rabbitmq,分布式

  • 中间件

RabbitMQ 的基本概念,rabbitmq,分布式

2 MQ 的优势

异步、 解耦、 削峰

1 应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。以购物为例子

RabbitMQ 的基本概念,rabbitmq,分布式

使用 MQ 使得应用间解耦,提升容错性和可维护性。

RabbitMQ 的基本概念,rabbitmq,分布式

2 异步提速

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms,用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

RabbitMQ 的基本概念,rabbitmq,分布式

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

RabbitMQ 的基本概念,rabbitmq,分布式

3 削峰填谷

RabbitMQ 的基本概念,rabbitmq,分布式

RabbitMQ 的基本概念,rabbitmq,分布式

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。使用MQ后,可以提高系统稳定性。

RabbitMQ 的基本概念,rabbitmq,分布式

3 MQ 的劣势

1 系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

2系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况?

4 常见的 MQ 产品

RabbitMQ 的基本概念,rabbitmq,分布式

二 RabbitMQ安装

1 上传软件

erlang­18.3­1.el7.centos.x86_64.rpm

socat­1.7.3.2­5.el7.lux.x86_64.rpm

rabbitmq­server­3.6.5­1.noarch.rpm

2 安装Erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

3 安装RabbitMQ

rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

4 开启管理界面及配置

rabbitmq-plugins enable rabbitmq_management

5 启动

service rabbitmq-server start # 启动服务

service rabbitmq-server stop # 停止服务

service rabbitmq-server restart # 重启服务

6 登录

需要关闭防火墙, 远程服务器开启15672和5672开启

http://192.168.56.140:15672/

如果登录报错, 这是因为rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问

vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

RabbitMQ 的基本概念,rabbitmq,分布式

删除loopback_users 中的 <<"guest">>

RabbitMQ 的基本概念,rabbitmq,分布式

云服务器记得开放15672端口

默认账号和密码都是 guest

三 界面介绍和操作

1 添加用户

RabbitMQ 的基本概念,rabbitmq,分布式

# 角色说明: 
	1、 超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 
	2、 监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情 况,磁盘使用情况等) 
	3、 策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红 框标识的部分)。 
	4、 普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 
	5、 其他 无法登陆管理控制台,通常就是普通的生产者和消费者。

2 创建虚拟机

1 点击图中的Virtual Hosts

2 创建虚拟机路径,记得要带 / 

RabbitMQ 的基本概念,rabbitmq,分布式

3 将虚拟机分配给用户

RabbitMQ 的基本概念,rabbitmq,分布式

四 RabbitMQ概念

1 架构图

RabbitMQ 的基本概念,rabbitmq,分布式

2 相关概念

Publisher - ⽣产者:发布消息到RabbitMQ中的Exchange

Consumer - 消费者:监听RabbitMQ中的Queue中的消息

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker,也就是我们的RabbitMQ服务器

Virtual host:出于多租户和安全因素设计的,在RabbitMQ中可以创建出多个虚拟消息服务器VirtualHost。

Connection:publisher/consumer 和 broker 之间的 TCP 连接

channel-信道: 网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务 , 信道有特定的功能,比如创建交换机,创建队列。

Exchange - 交换机:和⽣产者建⽴连接并接收⽣产者的消息 ,并且不能保存消息。

Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进⾏交互 ,队列是可以保存消息的。

Routes - 路由:交换机以什么样的策略将消息发布到Queue。生产者发消息的时候,可以给消息贴一个标签,为了让指定的消费者接收消息。

  • 结构解读:

首先安装好的RabbitMQ就是一个Broker,如果我们想将MQ给多个用户使用并且互不影响,那我们就需要将MQ通过虚拟化的方式分割成多个提供MQ的服务,也就是Virtual host,每个Virtual host都有独立的路径,并且和用户绑定。这样我们就可以在自己的世界里发消息了。

  • 通信解读:一条消息到底是怎么从生产者到了消费者的?

    • 首先生产者通过连接的方式连接到MQ的一个虚拟机,需要知道MQ的ip,端口,虚拟机路径,用户名和密码,准备好了以后就可以建立连接了TCP 连接Connection连接,

    • 但是建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的,所以我们使用信道changel的方式发送和接受消息。

    • 消息进入MQ的第一站是Exchange交换机,交换机的作用:① 接收生产者发送的消息 ②和队列绑定。交换机是不保存信息的。生产者发消息的时候可以指定一个路由键,路由键可以理解为就是给消息贴了一个标签(做标记作用,消费者接收消息的时候有用)

    • 消息进入第二站queue,消费者要接收消息,需要一直监听着queue,那么消费者在监听queue的时候需要先指定队列要和那个交换机绑定,绑定的时候也需要指定路由键,如果发消息时的路由键和接收消息时候路由键一样,那么这个消息就会进入到这个队列。

    • 最后消费者就拿到消息了。需要说明的一点,所有的交换机和队列创建的时候都是需要起名字的。

3 RabbitMQ的通讯

官网介绍:RabbitMQ Tutorials — RabbitMQ

RabbitMQ 的基本概念,rabbitmq,分布式

RabbitMQ 的基本概念,rabbitmq,分布式

主题

五 案例解释

新建maven工程,Spring整合MQ。因为MQ中有很多概念在boot中是体会不到的,boot屏蔽了很多概念。

1 简单队列模式

RabbitMQ 的基本概念,rabbitmq,分布式

1 代码

生产者和消费者都导入maven依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
  • 生产者代码,记得最后需要关闭资源。

    package com.xinzhi.product;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class MyProduct {
        //队列名
        private static final String QUEUE_NAME = "my_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2.设置连接地址
            connectionFactory.setHost("192.168.32.11");
            // 3.设置端口号:
            connectionFactory.setPort(5672);
            // 4.设置账号和密码
            connectionFactory.setUsername("laohan123");
            connectionFactory.setPassword("laohan123");
            // 5.设置VirtualHost
            connectionFactory.setVirtualHost("/laohan");
            Connection connection = connectionFactory.newConnection();
            // 6.获取信道
            Channel channel = connection.createChannel();
            // 7.创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
            // 7.1第一个参数:队列名称
            // 7.2第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
            // 7.3第三个参数:该队列是否是私有的
            // 7.4第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
            // 7.5队列的其他参数, 一般都是null
            channel.queueDeclare("my_queue", false, false, false, null);
            String message = "欣知大数据";
            //四个参数
            //exchange 交换机,如果使⽤了"",表示使⽤了默认交换机,默认交换机会隐式绑定到队列,
            //routingKey路由键:如果使⽤了默认交换机,那么路由键就可以用队列名来代替。
            //props header信息,一般设置null
            //最后一个参数是要传递的消息字节数组
            channel.basicPublish("",  //使⽤默认交换机
                    "my_queue",    //因为⽤了默认交换机,于是参数就是队列名称
                    null,
                    message.getBytes()        消息内容
            );
            channel.close();
            connection.close();
            System.out.println("发送成功");
    
        }
    }
    
  • 消费者代码

    package com.xinzhi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class MyConsumer {
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2.设置连接地址
            connectionFactory.setHost("192.168.32.11");
            // 3.设置端口号:
            connectionFactory.setPort(5672);
            // 4.设置账号和密码
            connectionFactory.setUsername("laohan123");
            connectionFactory.setPassword("laohan123");
            // 5.设置VirtualHost
            connectionFactory.setVirtualHost("/laohan");
            Connection connection = connectionFactory.newConnection();
            // 6.获取信道
            Channel channel = connection.createChannel();
            // 7.声明队列
            channel.queueDeclare("my_queue", false, false, false, null);
            // 8.创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // consumerTag 消息的唯一标识,一般用来确认消息是否被消费
                // envelope  封装了mq的基本方法
                // properties 封装了mq的基本属性
                // body       监听到的消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            };
            // 9.消费者监听某个队列  autoAck自动签收
            channel.basicConsume("my_queue", false, consumer);
        }
    }
    RabbitMQ 的基本概念,rabbitmq,分布式

2 代码解读:

envelope:单词 信封的意思,在这里是封装了MQ的一些基本方法

- getDeliveryTag() 获取此参数信封中包含的交货标签
- isRedeliver()    如果这是在 ack 失败后是否重新投递
- getExchange()
- getRoutingKey()

3 流程解读

这是RabbitMQ最简单的工作方式

  • 生产者声明好队列,然后把信息给了MQ默认的交换机,交换机将信息发给队列

  • 消费者也声明好队列,然后监听队列获取信息

4 抽出工具类

因为生产者和消费者都是相同的获取信道的方式

public static Connection getConnection(){
        // 1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置连接地址
        connectionFactory.setHost("192.168.56.140");
        // 3.设置端口号:
        connectionFactory.setPort(5672);
        // 4.设置账号和密码
        connectionFactory.setUsername("laohan123");
        connectionFactory.setPassword("laohan123");
        // 5.设置VirtualHost
        connectionFactory.setVirtualHost("/laohan");
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }

2 work queue

RabbitMQ 的基本概念,rabbitmq,分布式

队列模式: 能者多劳模式

1 代码

  • 生产者

    package com.xinzhi.work.product;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class MyProduct {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("work_queue", true, false, false, null);
            //3 发消息(消息先到了默认交换机,交换机和队列绑定了,所以信息也会直接到了queue)
            for (int i = 1; i <101 ; i++) {
                String message = "xinzhi"+i;
                channel.basicPublish("","work_queue",null,message.getBytes());
            }
            //4 提示和释放资源
            System.out.println("发送成功");
            channel.close();
            connection.close();
        }
    }
    
  • 消费者 将下面的代码再复制两份MyConsumer1,MyConsumer2,等待时间设置成100,500

    package com.xinzhi.work.consumer;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer1 {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("work_queue", true, false, false, null);
            //3 声明消费者一次只接受一条消息
            channel.basicQos(1);
            // 4 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费标签是" + consumerTag + "消息体是" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("work_queue", consumer);
        }
    }
    

2 代码解读

在简单模式的基础上添加了多个消费者,每个消费者添加了等待时间。

生产者一次往队列里投放多条消息,消费者根据能力来消费这里面的所有消息,性能强的消费的消息多,所以是能者多劳

3 订阅发布

平分秋色

RabbitMQ 的基本概念,rabbitmq,分布式

交换机类型 fanout

发布订阅,这次使用了交换机,之前的两种方式都是没有显式的声明使用交换机,之前其实用的系统默认的交换机。

这次使用了交换机,但是 没有使用路由键。只要和交换机绑定了的对了都可以接受到消息,也就是上图两个队列中可以收到相同的消息。

1 代码

  • 生产者

    package com.xinzhi.fanout;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Product {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明交换机和类型
            channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
            //3 将信息发给交换机
            for (int i = 1; i <101 ; i++) {
                String message = "laohan"+i;
                channel.basicPublish("fanout_exchange","",null,message.getBytes());
            }
            System.out.println("success");
            channel.close();
            connection.createChannel();
        }
    }
    
  • 消费者1

    package com.xinzhi.fanout;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer1 {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("fanout_queue1", true, false, false, null);
            //3 声明交换机
            channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
            //4 交换机和队列绑定
            channel.queueBind("fanout_queue1", "fanout_exchange", "", null);
            // 5 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("fanout_queue1", consumer);
        }
    }
    
  • 消费者2

    package com.xinzhi.fanout;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer2 {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("fanout_queue2", true, false, false, null);
            //3 声明交换机
            channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
            //4 交换机和队列绑定
            channel.queueBind("fanout_queue2", "fanout_exchange", "", null);
            // 5 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("fanout_queue2", consumer);
        }
    }
    

4 .路由 routing

暗送秋波

1 概念

交换机direct

  • 在⽣产者发送消息时指明routing-key

  • 在消费者声明队列和交换机的绑定关系时,指明routing-key

RabbitMQ 的基本概念,rabbitmq,分布式

  • 解决的问题是:

    • 因为交换机和两个队列都绑定了,但是为了给队列里发送的消息不一样,也就是区分给那个队列发什么样 的消息,就有了routing key的概念。发消息的时候指定一下路由键,接收消息的时候队列要和交换机绑定,这时候也需要指定路由键,如果这两次的路由键一样,那么这个消息就放着这个队列里面

2 代码

  • 生产者

    package com.xinzhi.direct;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Product {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明交换机和类型,并且持久化
            channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
            //3 将信息发给交换机,并且指定路由键
            String message1 = "laohan1";
            String message2 = "laohan2";
            channel.basicPublish("direct_exchange","han",null,message1.getBytes());
            channel.basicPublish("direct_exchange","man",null,message2.getBytes());
            System.out.println("success");
            channel.close();
            connection.close();
        }
    }
    
  • 消费者1

    package com.xinzhi.direct;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer1 {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("direct_queue1", true, false, false, null);
            //3 声明交换机
            channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
            //4 交换机和队列绑定
            channel.queueBind("direct_queue1", "direct_exchange", "han");
    
            // 5 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("direct_queue1", consumer);
        }
    }
    
  • 消费者2

    package com.xinzhi.direct;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer2 {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("direct_queue2", true, false, false, null);
            //3 声明交换机
            channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);
            //4 交换机和队列绑定
            channel.queueBind("direct_queue2", "direct_exchange", "man");
    
            // 5 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("direct_queue2", consumer);
        }
    }

5 通配符模式

你的心思我要

1 概念

交换机是 topic

  • 因为路由模式里是精确匹配,比较局限,使用通配符方式,通配符,提⾼了匹配的范围,扩展业务。

  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert。

2 代码

  • 生产者

    package com.xinzhi.topic;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Product {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明交换机和类型,并且持久化
            channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
            //3 将信息发给交换机,并且指定路由键
            String message1 = "laohanxueit";
            channel.basicPublish("topic_exchange","xinzhi.15",null,message1.getBytes());
            System.out.println("success");
            channel.close();
            connection.close();
        }
    }
    
  • 消费者

    package com.xinzhi.topic;
    
    import com.rabbitmq.client.*;
    import com.xinzhi.utils.RabbitUtil;
    
    import java.io.IOException;
    
    public class MyConsumer {
    
        public static void main(String[] args) throws IOException {
            //1 获取连接和信道
            Connection connection = RabbitUtil.getConnection();
            Channel channel = connection.createChannel();
            //2 声明队列
            channel.queueDeclare("topic_queue", true, false, false, null);
            //3 声明交换机
            channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);
            //4 交换机和队列绑定
            channel.queueBind("topic_queue", "topic_exchange", "xinzhi.#");
    
            // 5 声明消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("tag:" + consumerTag + ",message:" + new String(body));
                    // 消息消费成功以后的唯一标识
                    System.out.println(envelope.getDeliveryTag());
                    // 确认签收当前的一条消息,如果是true是签收队列里面所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume("topic_queue", consumer);
        }
    }

六 SpringBoot整合

1 发布订阅

1 新建boot项目

2 导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3 配置文件

server:
  port: 8099
spring:
  rabbitmq:
    host: 192.168.56.140
    port: 5672
    username: laohan123
    password: laohan123
    virtual-host: /laohan

4 配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    public static final String EXCHANGE_NAME = "fanout_exchage";
    public static final String QUEUE_NAME = "fanout_queue";

    @Bean("queue")
    public Queue queue(){
//        return new Queue(QUEUE_NAME, true, false, false);
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean("exchange")
    public Exchange exchange(){
//        return new FanoutExchange(EXCHANGE_NAME, true, false);
        return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

}

5监听类

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class RabbitListen {

@RabbitListener(queues = {RabbitConfig.QUEUE_NAME})
public void listener(String body,Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag==>"+msgTag);
        System.out.println("message==>"+message);
        System.out.println("body==>"+body);
    }
}

6 测试类发送消息

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"","老韩学it");
}

2 topic

  • 在发布订阅的基础上修改交换机名称和路由绑定就可以了

    package com.xinzhi.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
    //    public static final String EXCHANGE_NAME = "fanout_exchage";
        public static final String EXCHANGE_NAME = "topic_exchange";
    //    public static final String QUEUE_NAME = "fanout_queue";
        public static final String QUEUE_NAME = "topic_queue";
    
        @Bean("queue")
        public Queue queue(){
    //        return new Queue(QUEUE_NAME, true, false, false);
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
    
        @Bean("exchange")
        public Exchange exchange(){
    //        return new FanoutExchange(EXCHANGE_NAME, true, false);
    //        return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
        @Bean
        public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("xinzhi.#").noargs();
        }
    
    }
    
  • 发送消息验证

     @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        void contextLoads() {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
        }

七 消息的可靠性投递

1 什么是消息的可靠性投递

  • 保证消息一定能发到消息队列中

  • 细节

    • 保证mq节点成功接受消息

    • 消息发送端需要接受到mq服务端接收到消息的确认应答

    • 完善的消息补偿机制,发送失败的消息可以再感知并二次处理

  • RabbitMQ消息投递路径

    • 生产者-->交换机-->队列-->消费者

    • 通过两个点的控制,保证消息的可靠性投递

      • 生产者到交换机 confirmCallback

      • 交换机到队列 returnCallbakc

  • 建议

    • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ的整体效率变低,吞吐量下降严重,不是非常重要的消息不建议用消息确认机制

2 confirmCallback

  • 机制:

    生产者投递消息以后,如果Broker收到消息以后,会给生产者一个ACK,生产者通过ACK可以确认这条消息是否成功发送到Broker。

  • 开启confirmCallback

    spring.rabbitmq.publisher-confirm-type: correlated

    RabbitMQ 的基本概念,rabbitmq,分布式
  • 发送代码

@Test
void confirm(){
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
             * 消息到交换机的确认
             * @param correlationData  配置信息
             * @param ack              交换机确认  true消息接受成功  false消息接受失败
             * @param cause             消息发送失败原因
             */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("ConfirmCallback==========>");
            System.out.println("correlationData==========>"+correlationData);
            System.out.println("ack==========>"+ack);
            System.out.println("cause==========>"+cause);
            if(ack){
                System.out.println("发送成功");
                // 更新数据库  成功
            }else {
                System.out.println("发送失败,日志或数据库纪录");
                // 更新数据库  失败
            }
        }
    });
    rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
}

RabbitMQ 的基本概念,rabbitmq,分布式

  • 模拟失败场景,修改发送时候交换机名称

RabbitMQ 的基本概念,rabbitmq,分布式

2 returnCallback

  • return机制保证消息在rabbitmq中能够成功的投递到队列⾥

  • 两种模式:

    • 交换机到队列不成功,则丢弃消息(默认)

    • 交换机到队列不成功,返回生产者,触发returnCallback

  • 开启returnCallback,交换机到队列的可靠性投递

    spring.rabbitmq.publisher-returns=true

  • 修改投递到队列失败的策略

    spring.rabbitmq.template.mandatory=true

  • 发送消息验证.

     @Test
    void returnCallback(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                int code = returned.getReplyCode();
                System.out.println("code==>"+code);
                System.out.println("returned==>"+returned);
            }
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
    }
  • 发送消息以后,没有任何提示,我们修改路由键

RabbitMQ 的基本概念,rabbitmq,分布式

八 消息确认

1 背景

保证消息从队列到消费者的过程。

2 ACK介绍

  • 消费者从RabbitMQ中获取消息并且处理完成以后,反馈给RabbitMQ,RabbitMQ收到确认消息以后才能把消息从队列中删除

  • 消费者在处理消息的时候出现了网络不稳定、服务器异常等情况,那么就不会有ACK反馈,RabbitMQ认为这个消息没有正常消费,就将这个消息放回队列里面

  • 只有当消费者正确发送ack以后,RabbitMQ才会把消息从队列中删除

  • 消息的ack确认机制默认是打开的,消息如果未被进行ack的消息确认机制,这条消息将被锁定

3 确认方式

  • 自动

  • 手动manual

    spring.rabbitmq.listener.simple.acknowledge-mode=manual

RabbitMQ 的基本概念,rabbitmq,分布式

  • 发送消息,并且开启监听模式,虽然消息被消费了,但是因为开启了手动确认模式配置,但是代码里没有手动确认所以队列里的消息不会删除

RabbitMQ 的基本概念,rabbitmq,分布式

  • 代码中开启确认机制

 channel.basicAck(msgTag,false);
  • 消息拒绝

// false 一次拒绝一条   true 重新回到队列
channel.basicNack(msgTag,false,true);

结果就会看到控制台一直接受消息,因为对列有消息就会被监听到,监听以后拒绝了又放到队列里面,然后 又监听...

  • DeliveryTag

    表示消息投递的序号,每次消费消息或者消息重新投递以后,DeliveryTag都会+1

  • basicReject

    也是消息拒绝的,一次只能拒绝一条消息,也可以设置是否重新回如队列

九 死信队列

1 TTL

  • ttl time to live 消息存活时间

  • 如果消息在存活时间内未被消费,则会清除

  • RabbitMQ支持两种TTL设置

    • 单独消息进行TTL设置

    • 整个队列进行配置TTL

2 死信队列

  • 没有被及时消费的消息存放的队列

3 死信交换机

  • DLX(Dead Letter Exchange)当消息成为死信后,会重新发送到另一个交换机中给,这个交换机就是死信交换机

RabbitMQ 的基本概念,rabbitmq,分布式

4 消息成为死信的条件

  • 消息被拒签,并且没有重回队列,消息将成为死信,requeue=false

  • 消息过期了,消息将成为死信。

  • 队列⻓度有限,存不下消息了,存不下的消息将会成为死信。

5 死信队列代码

  • 在生产者和消费者都配置RabbitMQ的配置文件

package com.xinzhi.config;
​
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
import java.util.HashMap;
import java.util.Map;
​
@Configuration
public class RabbitConfig {
​
​
    //声明交换机和队列
    public static final String EXCHANGE_NAME = "topic_exchange";
    public static final String QUEUE_NAME = "topic_queue";
​
​
    public static final String EXCHANGE_DEAD = "dead_exchange";
    public static final String QUEUE_DEAD = "dead_queue";
​
    @Bean("queue")
    public Queue queue(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", EXCHANGE_DEAD);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "dead.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }
​
    @Bean("exchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
​
    @Bean
    public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("xinzhi.#").noargs();
    }
​
    //声明死信交换机
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_DEAD).durable(true).build();
    }
​
    // 声明死信队列
    @Bean("deadLetterQueue")
    public Queue deadLetterQueueA(){
        return new Queue(QUEUE_DEAD);
    }
​
    // 声明死信队列绑定关系
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                      @Qualifier("deadLetterExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dead.#").noargs();
    }
​
​
}
​
  • 生产者编写controller和service

@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQMsgController {
​
    @Autowired
    private IMessageSender sender;
​
    @RequestMapping("/sendmsg")
    public String  sendMsg(String msg){
        sender.sendMsg(msg);
        return "OK";
    }
}
import com.xinzhi.config.RabbitConfig;
import com.xinzhi.service.IMessageSender;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class MessageSenderImpl implements IMessageSender {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(RabbitConfig.EXCHANGE_NAME, "xinzhi.15", msg);
    }
}
  • 消费者的监听配置中添加对死信队列的监听

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
@Component
public class RabbitListen {
    
    @RabbitListener(queues = {RabbitConfig.QUEUE_NAME})
    public void listener(String body,Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息:"+ body);
        boolean ack = true;
        Exception exception = null;
        try {
            if (body.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            System.out.println("消息消费发生异常,error msg:"+ exception.getMessage());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
​
    @RabbitListener(queues = RabbitConfig.QUEUE_DEAD)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

如果代码报错 406,将队列和交换机删除以后,重新启动。

注意:

关于报错:Only one ConfirmCallback is supported by each RabbitTemplate

因为一个RabbitTemplate实例只能支持一次ConfirmCallback 回调,再次调用就会报错

解决方案:

将生产者springboot版本降低:2.6.9

RabbitMQ 的基本概念,rabbitmq,分布式

RabbitConfig配置文件中添加

@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    return template;
}

将rabbitTemplate改成prototype

并且在controller和service中添加

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

RabbitMQ 的基本概念,rabbitmq,分布式

RabbitMQ 的基本概念,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-796187.html

到了这里,关于RabbitMQ 的基本概念的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(55)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(60)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(40)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(53)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(48)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(45)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(64)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(46)
  • 【分布式和微服务1】一篇文章详细了解分布式和微服务的基本概念

    🍀 通俗一点说,高可用的意思是:在 高 并发的情况下,系统仍然是 可用 的 🍀 高可用的目的:保障业务的连续性( 实现在用户眼里,业务永远是正常对外提供服务的 ) 🍀 🍬 【上图】一个 SpringBoot 项目( apple.jar )被部署到服务器上运行,可向其发送 网络请求 对 数据

    2024年02月02日
    浏览(66)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包