初始消息队列
消息队列初识
- 消息队列∶接收并转发消息。类似于“快递公司”
- producer :消息的发送者、生产者
- consumer :消息的消费者,从队列获取消息,并且使用
- queue∶先进先出的消息队列,一个queue可以对应多个 consumer
为什么要使用消息队列?
- 代码解耦,提高系统稳定性。
- 应对流量高峰,降低流量冲击。
- 异步执行,提高系统响应速度。
消息队列的特性
- 性能好。
- 基础组件。(类似于mysql,是通用的系统)
- 支持消息确认。(当断电了重启之后,可以对消息进行重新处理)保持了一致性。
- 削峰(请求峰值)
RabbitMQ介绍
- 官网:https://rabbitmq.com/
特点
- 路由能力灵活强大
- 开源免费
- 支持编程语言多
- 应用广泛,社区活跃
- 有开箱即用的监控和管理后台
核心概念
Linux(CentOs7) 下安装:
官方安装指南:https://www.rabbitmq.com/install-rpm.html
我们将要安装的RabbitMQ的版本是3.8.2
https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.2-1.el7.noarch.rpm
不需要单独安装Erlang环境。
安装前配置:
- 前提:在一个新建的阿里云的Cent OS 7.6上安装,不要对yum换源,否则可能会安装失败。
- echo “export LC_ALL=en_US.UTF-8” >> /etc/profile
- source /etc/profile
- 将编码格式设置为 UTF-8
Erlang下载安装
-
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
- 先下载安装Erlang环境
-
安装已下载的rpm包(可根据刚才自己选择的版本修改下面的版本号)
- yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
-
sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
RabbitMQ下载安装
- wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
- rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
- 需要运行命令来将 Key 导入
- 最后,使用 yum 进行本地安装(可根据自己选择的版本修改下面的版本号)
-
yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
-
若报错:rabbitmq Unregistered Authentication Agent for unix- process:6485:746263
-
执行以下命令:rpm -ivh --nodeps rabbitmq-server-3.8.13-1.el7.noarch.rpm
-
再重试
-
- journalctl -xe:可以查看 rabbitmq-server 日志
- systemctl start rabbitmq-server:启动 rabbitmq 服务器
- rabbitmqctl status:查看 状态信息。
- systemctl enable rabbitmq-server:设置开机自启动
linux下安装rabbitmq可能会遇到的问题
相关博客:https://blog.csdn.net/m0_67402914/article/details/123972575
Linux 常用命令
- 开启 web 管理界面:rabbitmq-plugins enable rabbitmq_management
- 启动RabbitMQ:systemctl start rabbitmq-server
- 设置开机启动:systemctl enable rabbitmq-server
- 停止RabbitMQ:rabbitmqctl stop
- 查看状态信息:rabbitmqctl status
- 检查 RabbitMQ 服务器的状态:systemctl status rabbitmq-server
RabbitMQ Web 界面管理
- 默认情况下,是没有安装web端的客户端插件,需要安装才可以生效。执行命令:
- rabbitmq-plugins enable rabbitmq_management
- 安装完毕以后,重启服务即可,执行命令:
- systemctl restart rabbitmq-server
- 在服务器上开放 15672 端口。
- rabbitmq 有一个默认账号和密码是: guest 。
- 默认情况只能在 localhost 本机下访问,因此想要远程访问就需要新增一个远程登录的用户。
新增用户
-
将账号密码都设置为 admin:
- rabbitmqctl add_user 账号 密码
-
设置用户分配操作权限:
- rabbitmqctl set_user_tags admin administrator:设置为管理员
-
进入虚拟主机,添加允许访问的用户。
-
访问 http://IP地址:15672 ,输入新增的用户
- 如果访问不了,看是否开放了防火墙,或阿里云服务器是否开启了安全组。
管理界面
-
amqp:5673:用于客户端连接
-
clustering:25672:用于集群
-
http:15672:用于http协议 登录管理后台
-
exchanges:交换机,默认会有 7 个
- 每个交换机点进去,可以绑定队列、消息发送 等操作
-
admin:用户管理,可以用户进行 CURD 操作等。
-
可以在右侧进入虚拟主机的管理页面
- 进入虚拟主机,可以进行 CURD 操作,添加允许访问的用户等
Java 基础应用
Maven:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.17.0</version>
</dependency>
<!-- rabbitmq要求的内部用于记录日志的 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>2.0.7</version>
</dependency>
创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
设置 RabbitMQ 地址
- factory.setHost(String ip):设置发送对象 RabbitMQ服务器的 ip
- factory.setUsername(String username):设置登录的用户
- factory.setPassword(String password):密码
- 需要在云服务器设置安全组,开启 5672 端口,授权对象为 0.0.0.0/0
建立连接
- Connection connection = factory.newConnection();
- 会抛出 IOException, TimeoutException
获得信道
- Channel channel = connection.createChannel();
- 会抛出 IOException
声明队列
- channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException- queue:队列名
- durable:是否声明持久队列(该队列将在服务器重启后继续存在)。
- exclusive:是否声明一个独占队列(仅限于此连接)。
- autoDelete:是否声明一个自动删除队列(服务器将在不再使用时删除它)。
- Map<String, Object> arguments:队列的其他属性(构造参数),没有则为 null。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
发布 或 消费(接收)消息
- 消息发送之后会存储在队列里,当有其他消费者进行接收时,就会出队。
发布消息:
- channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body):发布消息。
- 发布到不存在的交换将导致通道级协议异常,从而关闭通道。如果资源驱动的警报生效,Channel#basicPublish 的调用将最终阻塞。
-
exchange:要将消息发布到的交换机。
- 可置为空串 “”,代表默认的交换机,此时会使用 routingKey 进行 查找
- routingKey:路由键,可以是队列名
- props:消息的其他属性配置-路由头等,没有则置为 null
- body:消息主体,需要声明编码方式;例如: msg.getBytes(StandardCharsets.UTF_8)
String msg = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
消费(接收)消息:
- channel.basicConsume(String queue, boolean autoAck, Consumer callback):启动一个非本地的、非独占的消费者,一个服务器生成的消费者标签。
- queue:队列名
-
autoAck:设置服务器是否 自动确认消息处理完毕。
- 若为false,则需要在 callback 函数里,当消息处理完毕时使用 basicAck() 函数来确认。
- channel.basicAck(long deliveryTag, boolean multiple):确认一个或多个接收到的消息。
- deliveryTag:envelope.getDeliveryTag()。接收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签。
- multiple:true确认所有到并包括所提供的交付标记的消息;false表示只确认所提供的交付标签。
- callback:使用者处理对象的接口
- 一般就是一个 DefaultConsumer 的匿名内部类,重写 handleDelivery() 方法
-
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body):当接收到此使用者的 basic.deliver 时调用。- consumerTag:与消费者相关联的消费者标签。
- envelope:打包消息的数据。
- properties:消息的内容头数据
- body:消息体(不透明的、特定于客户端的字节数组),转换为字符串时需要指定编码
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("收到消息:" + msg);
}
});
关闭连接
- 接收消息时可以不用关闭,它就能一直处于待接收状态。
- 秉承 先开后关 原则。
channel.close();
connection.close();
交换机工作模式
不会接收到 消费端启动前的消息。
- fanout:广播模式,将消息广播到所有与之绑定的队列,不需要设置路由键。
- direct:直接模式,根据 RoutingKey 匹配消息路由到指定的队列。
- topic:主题模式,生产者指定 RoutingKey 消息,根据消费端指定的队列通过模糊匹配的方式进行相应转发。
- headers:根据发送消息内容中的 headers 属性来匹配(基本不用)。
fanout 广播模式
- 将消息广播到所有与之绑定的队列,不需要设置路由键。
direct 直接模式
- 根据 RoutingKey 匹配消息路由到指定的队列。
- 允许多个队列绑定相同的 RoutingKey。
- 允许多个队列绑定相同的 RoutingKey。
topic 主题模式:
- direct 的升级。生产者指定 RoutingKey 消息,根据消费端指定的队列- 通过模糊匹配的方式进行相应转发。
- * 可以代替一个单词。
- # 可以替代零个或多个单词。
- 注意:是单词,不是字符。
应用:
定义交换机:
- 发送和接收时都需要定义。
- channel.exchangeDeclare(String exchange, BuiltinExchangeType type):主动声明一个不带额外参数的非自动删除、非持久的交换机。
- exchange:交换机名字
- type:交换机类型。
- 其值是 BuiltinExchangeType 枚举类:DIRECT, FANOUT, TOPIC, HEADERS
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- 注意:当已存在相同交换机名且类型不一致时,会报出 IOException。描述:received 'direct' but current is 'fanout'
接收时需要绑定队列:
- channel.queueBind(String queue, String exchange, String routingKey):将队列绑定到交换机,不带额外参数。
- routingKey:路由键。
- 一个交换机可以绑定多个queue,甚至queueName也可以一至(同一个queue绑定了多个RoutingKey)
// 获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
广播模式完整案例:
public class EmitLog {
private static String EXCHANGE_NAME = "direct-logs";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建工厂,建立连接,获取信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String msg1 = "info: Hello World";
String msg2 = "warning: Hello World";
String msg3 = "error: Hello World";
channel.basicPublish(EXCHANGE_NAME, "info", null, msg1.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "warning", null, msg2.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "error", null, msg3.getBytes(StandardCharsets.UTF_8));
System.out.println("发送了消息");
channel.close();
connection.close();
}
}
public class ReceiveLogs {
private static String EXCHANGE_NAME = "direct-logs";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建工厂,建立连接,获取信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.128");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取临时队列名
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("收到消息:" + new String(body, "UTF-8"));
}
});
}
}
其他知识点
消息分配
- 默认 Rabbitmq 是按照消息的 数量来进行一次性平均分配 的。
- 比如,有10个消息,2个消费者,则奇数位的消息都分配给第一个消费者,偶数位的消息分配给第二个消费者,而不考虑每个消息所需耗费的执行时间,(尽管可能某一个消费者的消息都是很费时的消息)
按压力进行平均分配(公平派遣):
-
干完了手头上的工作,再分配得到第二个工作
-
channel.basicQos(int prefetchCount):设置此通道最希望处理的消息数量。
- 注意预取计数必须在 0到65535 之间(AMQP 0-9-1中的unsigned short)。
- 在数量 < prefetchCount 之前,不会再接收下一个任务。
-
关闭自动确认:
- 置 channel.basicConsume(String queue, boolean autoAck, Consumer callback) 方法的 autoAck 参数为 false。
-
在 basicConsume() 的 callback 函数里,当消息处理完毕时使用 basicAck() 函数来确认。文章来源:https://www.toymoban.com/news/detail-437742.html
-
channel.basicAck(long deliveryTag, boolean multiple):确认一个或多个接收到的消息。文章来源地址https://www.toymoban.com/news/detail-437742.html
- deliveryTag:envelope.getDeliveryTag()。接收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签。
- multiple:true确认所有到并包括所提供的交付标记的消息;false表示只确认所提供的交付标签。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 接收消息并消费
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("收到消息:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
临时队列
- 当接收者不存在时,临时队列就会自动删除,以节省空间。
- channel.queueDeclare().getQueue();
String queueName = channel.queueDeclare().getQueue();
到了这里,关于RabbitMQ入门(详细)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!