1.基本概况
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
2.使用场景
学习或者使用一项技术,首先要先知道他能够解决那些痛点,而不要过分关注如何使用它,先理解后使用
-
异步通信:
- 将耗时的任务异步执行,以提高系统的响应性。生产者将任务放入消息队列,而消费者从队列中获取任务并执行。
-
系统解耦:
- 在分布式系统中,通过消息队列实现各个组件之间的解耦,使得系统更加灵活、可扩展,并降低各个组件之间的依赖性。
-
削峰填谷:
- 处理流量峰值,通过消息队列缓冲和平滑大量的请求,以防止系统过载。生产者将请求发送到队列,而消费者按照其处理能力逐渐处理这些请求。
-
日志处理:
- 在分布式系统中,将日志异步发送到消息队列,然后由日志处理服务消费,以便进行日志聚合、监控和分析。
-
事件驱动架构:
- 构建事件驱动的体系结构,通过消息队列进行组件之间的通信。当一个组件触发事件时,它将消息发布到队列,而其他组件则订阅并处理这些事件。
-
任务分发和调度:
- 将任务分发到多个工作者节点,以实现任务的并行处理。生产者将任务放入队列,而多个消费者从队列中获取任务并执行。
-
订单处理:
- 在电子商务系统中,通过消息队列处理订单和支付流程。当用户下订单时,将订单信息发送到队列中,而订单处理服务异步处理这些订单。
-
微服务通信:
- 在微服务架构中,通过消息队列实现微服务之间的通信。微服务可以通过发布和订阅消息进行协作,而不直接调用彼此的 API。
-
实时数据处理:
- 将实时生成的数据通过消息队列传递给分析引擎或仪表板,以便实时监控和分析系统的状态。
-
发布/订阅模式:
- 实现发布/订阅模式,使得多个消费者可以订阅并处理同一类事件。这在通知系统和广播消息时非常有用。
3.大体介绍
3.1四大核心概念
生产者(publisher)->交换机(exchange)->队列(queues)->消费者(consumer)
3.2核心部分
取自官网
3.2.1 生产者
RabbitMQ 中的生产者负责将消息发送到队列。
创建一个简单的生产者通常需要以下步骤:
-
创建连接工厂: 使用
ConnectionFactory
创建连接到 RabbitMQ 服务器的连接。 -
创建连接: 调用
newConnection
方法创建一个连接对象。 -
创建通道: 调用
createChannel
方法创建一个通道,所有的操作都在通道上进行。 -
声明交换器和队列: 使用
exchangeDeclare
方法声明交换器,使用queueDeclare
方法声明队列。 -
发布消息: 使用
basicPublish
方法发布消息到交换器。
import com.rabbitmq.client.*;
public class SimpleProducer {
private static final String EXCHANGE_NAME = "my_direct_exchange";
private static final String ROUTING_KEY = "my_routing_key";
private static final String MESSAGE = "Hello, RabbitMQ!";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明直连交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 发布消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
System.out.println("Message sent: " + MESSAGE);
}
}
}
消息的基本属性
使用 basicPublish
方法发布消息时,可以设置消息的基本属性,例如:
- 交换器名称(Exchange Name): 指定消息发送的交换器。
- Routing Key(路由键): 对于 Direct、Topic 类型的交换器,消息的路由键与绑定键进行匹配,以决定将消息路由到哪个队列。
- BasicProperties(基本属性): 可以设置消息的基本属性,如消息的 ID、内容类型、优先级等。
发布确认机制
RabbitMQ 提供了发布确认机制,用于确保消息成功发送到交换器。可以通过调用 confirmSelect
方法开启确认模式,并通过 waitForConfirms
或 waitForConfirmsOrDie
方法等待确认。
// 开启发布确认模式
channel.confirmSelect();
// 发布消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
// 等待确认
if (channel.waitForConfirms()) {
System.out.println("Message sent successfully!");
} else {
System.err.println("Failed to send message!");
}
3.2.2 exchange(交换器)
//声明交换器的方法
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException {
return this.exchangeDeclare(exchange, type, durable, autoDelete, false, arguments);
}
-
exchange(交换器名称):
- 类型:
String
- 描述:要声明的交换器的名称。
- 类型:
-
type(交换器类型,下方有对交换器类型的详解):
- 类型:
String
- 描述:交换器的类型,有四种类型可以选择:
- direct: 将消息路由到与消息的 routing key 完全匹配的队列。
- fanout: 将消息广播到所有与交换器绑定的队列。
- topic: 将消息路由到与消息的 routing key 匹配的队列,支持通配符。
- headers: 使用消息的 headers 属性进行匹配。
- 类型:
-
durable(持久性):
- 类型:
boolean
- 描述:如果设置为
true
,交换器将在服务器重启后仍然存在(持久性交换器)。如果设置为false
,交换器将在服务器重启后被删除(非持久性交换器)。
- 类型:
-
autoDelete(自动删除):
- 类型:
boolean
- 描述:如果设置为
true
,交换器将在至少有一个队列与之绑定时存在。当所有与交换器绑定的队列都解绑时,交换器将被删除。
- 类型:
-
arguments(其他参数):
- 类型:
Map<String, Object>
- 描述:其他的一些交换器参数,以键值对的形式传递。例如,在使用 Headers Exchange 类型时,可以通过设置
x-match
参数来定义匹配规则。
- 类型:
生产者推送到具体交换器,交换器推送到零个或多个队列中,具体推送规则取决于交换器类型以及相应的路由规则
Exchange type(交换器类型) | Default pre-declared names |
---|---|
Direct exchange(直接) | (Empty string) and amq.direct |
Fanout exchange(扇出) | amq.fanout |
Topic exchange(主题) | amq.topic |
Headers exchange(标头) | amq.match (and amq.headers in RabbitMQ) |
交换器还有许多其他属性,例如名字,持久性(rabbitMQ重启之后是否仍然存在该交换器),自动删除(最后一个队列与交换器解除绑定之后删除),参数(可选,由插件或其他特定功能使用)
- 默认交换器
他是一个没有名称的直接交换器(名字为空字符串),它会使默认没有绑定的队列自动绑定到该交换器上,默认交换器不支持绑定/解绑操作,如果进行该操作会报错
2. 直接交换器
他会根据路由key直接将消息推送到响应名称队列中
如上图,a队列绑定了aa,b队列绑定了bb,cc这两个键值,exchange就会根据消息所包含的路由来进行消息的推送选择
3.fanout扇出交换器
该交换器会将消息发送到所有与他绑定的队列中,从而忽视路由键的关联
4.topic主题交换器
该交换器可以更加方便的进行自定义队列推送,适用于比较复杂的业务场景
注意:1.当一个队列绑定键是#,那么他将接收所有交换器推送的消息
2.如果队列中没有#与*好出现,那么该队列就如同direct一样的效果
5.标头交换器
他与直接交换器功能个差不多一致只是传参不一样,从下面源码可以看出,标头交换器是根据arguments进行路由选择的
声明标头交换器的时候可以设置是全部匹配还是部分匹配(下述为示例代码)
// 声明 Header Exchange,并设置匹配模式为 "any"(部分匹配即可),"all"(需要全部匹配才可)
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put("x-match", "any");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, false, false, exchangeArgs);
3.2.3 queue(队列)
相关属性name(队列名称),Durable(队列在服务端重启之后是否仍然存在),Exclusive(独占,只由一个队列连接使用,当该链接关闭时,删除连接),Auto-delete(自动删除,如果没有消费者连接到这个队列,那么队列将被自动删除),Arguments(可选;由插件和代理特定功能(如消息 TTL、队列长度限制等)使用)
//该方法为声明队列的方法,关于其中的参数下述进行详解
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
validateQueueNameLength(queue);
return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}
-
queue(队列名称):
- 类型:
String
- 描述:要声明的队列的名称。如果不指定队列名称,RabbitMQ 将会为队列生成一个唯一的名称。
- 类型:
-
durable(持久性):
- 类型:
boolean
- 描述:如果设置为
true
,队列将在服务器重启后仍然存在(持久性队列)。如果设置为false
,队列将在服务器重启后被删除(非持久性队列)。
- 类型:
-
exclusive(排他性):
- 类型:
boolean
- 描述:如果设置为
true
,队列将只允许当前连接创建的消费者使用。一旦连接关闭,队列将被删除。
- 类型:
-
autoDelete(自动删除):
- 类型:
boolean
- 描述:如果设置为
true
,队列将在至少有一个消费者连接到它时存在。当所有消费者断开连接时,队列将被删除。
- 类型:
-
arguments(其他参数):
- 类型:
Map<String, Object>
- 描述:其他的一些队列参数,以键值对的形式传递。例如,可以通过设置
x-message-ttl
参数来定义消息的过期时间。
- 类型:
返回值为 AMQP.Queue.DeclareOk
,其中包含有关已声明队列的一些信息。
3.2.4Bindings (绑定)
用来绑定交换机与队列的路由关系,也可以进行交换器与交换器关系绑定(一般用不上)
//队列绑定
public com.rabbitmq.client.impl.AMQImpl.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
validateQueueNameLength(queue);
return (com.rabbitmq.client.impl.AMQImpl.Queue.BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Bind.Builder()).queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();
}
//交换器绑定
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
return (BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Exchange.Bind.Builder()).destination(destination).source(source).routingKey(routingKey).arguments(arguments).build()).getMethod();
}
3.2.5 消费者
rabbitMQ是进行消息转发,不涉及业务处理,需要创建消费者来处理队列中的数据
主要使用步骤为:
-
创建连接工厂: 使用
ConnectionFactory
创建连接到 RabbitMQ 服务器的连接。 -
创建连接: 调用
newConnection
方法创建一个连接对象。 -
创建通道: 调用
createChannel
方法创建一个通道,所有的操作都在通道上进行。 -
声明队列: 使用
queueDeclare
方法声明要消费的队列。 -
创建消费者: 创建
Consumer
对象,实现DefaultConsumer
抽象类的handleDelivery
方法,该方法在接收到消息时被调用。 -
订阅队列: 调用
basicConsume
方法订阅队列,传入队列名称和消费者对象。
import com.rabbitmq.client.*;
public class SimpleConsumer {
private static final String QUEUE_NAME = "my_queue";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理接收到的消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 订阅队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 等待接收消息,此处可以添加逻辑来保持程序运行
Thread.sleep(10000);
}
}
}
消费者的基本属性
-
Consumer Tag(消费者标签): 在调用
basicConsume
时,可以传入一个消费者标签。消费者标签用于标识消费者,可以在取消订阅时使用。如果不提供标签,RabbitMQ 将生成一个唯一的标签。 -
Auto Acknowledgment(自动应答): 在调用
basicConsume
时,可以传入一个布尔值表示是否自动应答。如果设置为true
,一旦消息被接收,就会自动向 RabbitMQ 发送应答。如果设置为false
,需要在处理完消息后手动调用basicAck
方法发送应答。 -
Delivery(传送信息):
handleDelivery
方法的参数Delivery
包含有关消息的信息,如交付标签、路由键、交换器等。 -
BasicProperties(基本属性):
handleDelivery
方法的参数BasicProperties
包含有关消息的基本属性,如消息的 ID、内容类型、优先级等。
消费者的应答机制
RabbitMQ 提供了两种消费者应答机制:
-
自动应答: 设置
autoAck
参数为true
,一旦消息被接收,就会自动向 RabbitMQ 发送应答。 -
手动应答: 设置
autoAck
参数为false
,在处理完消息后,需要手动调用basicAck
方法发送应答。
自动应答简化了应用程序的逻辑,但在某些情况下可能导致消息的丢失。手动应答允许应用程序更细粒度地控制消息的确认。选择使用哪种应答机制取决于应用程序的需求和消息处理的语义。
消费者的错误处理文章来源:https://www.toymoban.com/news/detail-790345.html
在实际应用中,消费者应该具备良好的错误处理机制。可以捕获异常并根据需要执行适当的操作,例如记录日志、重试、拒绝消息等。文章来源地址https://www.toymoban.com/news/detail-790345.html
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// 处理接收到的消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
// 模拟处理中的异常
if (message.contains("error")) {
throw new RuntimeException("Simulated error during message processing");
}
// 手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 异常处理逻辑
System.err.println("Error processing message: " + e.getMessage());
// 可以选择拒绝消息并重新入队,或者进行其他错误处理
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
到了这里,关于rabbitMQ大致讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!