概念
MQ,是Message Queue的缩写,遵循先进先出的原则
在项目中用于流量削峰,应用解耦,异步处理
流量削峰:处理大量订单时,将订单分散,部分用户下单后需要进行等待,但却防止了服务宕机
应用解耦:当进行服务之间的通信时,普通方式不能保证当前模块调用其他模块出错后还能正确运行,增加消息队列后,当前模块执行完后可直接返回,等待其他模块执行完毕,如果其他模块执行异常,也不会影响当前流程。
异步处理:在异步调用服务中,部分服务处理时间较长,此时可交给消息队列,当服务执行完毕后,发送一条消息告诉当前服务执行完毕了
核心1-生产者:消息发送者
核心2-交换机:接收生产者发送过来的数据,并将消息发送到一个队列或者多个队列,或者丢弃消息
核心3-队列:接收来自交换机的数据,并保存消息直到消费者消费消息
核心4-消费者:消费从队列中获取到的消息,大多数时等待接收消息
安装
这里我将RabbitMQ安装在docker中,使用版本为3.9.5
docker run -d --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management
解释:
- 容器名字为
some-rabbit
- 5672是rabbitmq中的标准端口,将暴露在外部供我们使用
- 15672 是 RabbitMQ 管理插件使用的端口,暴露出来为了使用 Web 应用程序进行管理,在
http://localhost:15672/
中,可以找到一个用于 RabbitMQ 配置的管理页面。来自 Docker 容器的端口 15672 将暴露在宿主机上以供使用 - docker中运行的容器将在后台进行运行
这里我是将docker安装在虚拟机中,所以需要将两个端口暴露出来方便使用
ufw allow 5672
ufw allow 15672
意思就是允许放行5672和15672两个端口
AMQP
AMQP是高级消息队列协议的缩写。它是一个开放的标准协议,允许系统之间的信息传递。不同的服务器/系统可以相互通信,不受技术限制。AMQP通过TCP/IP连接的代理服务实现消息传递。它同时定义了网络层协议和消息代理的高层架构。
模型
1.一条包括生产者指定的路由密钥的信息
2.生产者发送信息
3.绑定在交换机上的队列收到消息
4.消息一直保存在队列中,知道消费者消费消息
引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建连接
class CreateChannel {
public static Channel getChannel() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setPort(5672);
factory.setUsername("");
factory.setPassword("");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
配置文件
spring.rabbitmq.host=id address
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
队列
工作队列
工作队列(任务队列):避免大量消息集中处理,造成消息滞留无法处理,并且一个消息只能被处理一次
引入轮询的工作方式,工作线程之间竞争资源通过轮训的方式处理消息,防止消息被处理多次
在存在两个相同的队列时,生产者发送的数据是由消费者轮询消费
消息应答
自动应答
需要在高吞吐量和数据传输安全性方面进行权衡,这种模式仅试用于消费者可以高效并且在某种速率能够处理这些消息的情况下使用。不推荐使用
手动应答
手动应答分为确认应答和拒绝应答,确认应答是已经知道该消息且成功处理消息后将消息进行丢弃(basicAck)。拒绝应答分为单个消息拒绝和批量消息拒绝,即消费者获取通道中的首条消息或队列中存在的所有消息并拒绝(basicNack/basicReject)
手动应答可进行批量应答且减少网络拥堵
消息重新入队
消费者在消费信息过程中,可能出现丢失连接,导致消息在确认应答发送失败,此时该消息就处于未完全处理,可将消息重新入队,如果有其他消费者可进行处理,会将消息发送给可处理的消费者。保证消息不丢失。
持久化
为了保证消息不丢失,需要同时确保消息队列和交换机不丢失
队列持久化
在创建队列是修改durable参数为true
消息持久化
在生产者发送消息时声明消息是持久化的,在其他参数中添加MessageProperties.PERSISTENT_TEXT_PLAIN
,这里声明的消息持久化并不能完全保证消息不丢失,可能在准备存储到磁盘但是没有完全存储完过程中出现问题
消息不公平分发
如果采用轮询方法,当某个消费者处理消息很慢时,会导致消息的堆积
在消费者消费消息过程中,设置不公平消费消息channel.basicQos
,默认值为0,公平/轮询分发,1为不公平分发
使用不公平分发,绑定的消费者将不再一次消费消息,能者多劳
预取值
指定每个消费者消费多少条消息channel.basicQos()
,当该方法中参数大于1后,表示当前消费者能消费多少条消息,该方法使用的前提是队列非自动应答,并且最好写在DeliverCallback中,我写在里面才生效
DeliverCallback deliverCallback = (consumeTag, message) -> {
// 接收4条消息
channel.basicQos(4);
try {
Thread.sleep(10000);
System.out.println(new String(message.getBody()));
// 增加消息是否批量处理
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
发布确认
能够解决消息丢失的问题
在生产者发送消息时,即使保证队列持久化、队列中的消息也保证了持久化,但是在传送到队列保存磁盘的过程中会导致信息丢失
发布确认即为生产者将消息发送到队列中,队列将消息保存到磁盘后,再给生产者发送一个消息
单个确认
是一种同步确认方法,能够确保每条消息持久化成功,但是发布速度慢,需要等待每条消息完成确认后才能对后续消息进行确认,造成消息堆积
开启确认 channel.confirmSelect()
消息发布后,通过channel.waitForConfirms()
获取当前消息是否确认发布了
批量确认
相比单个确认来说,批量确认极大提高了吞吐量,但是当发生故障时,无法确认是那条消息出现问题,会造成信息丢失。需要将消息预先保存在内存中,为出现故障时数据可重新进行操作。
通过发送计数的方式,当达到某一阈值后再发送请求确认的消息
异步确认
通过异步方式,在不占用主要请求资源的同时,完成发送确认。相对于单个确认和批量确认,处理速度进一步的提升,可以明确的知道处理成功和处理失败的消息
解决发送失败的消息
处理未完成发送的消息时,最好将这些消息放到一个基于内存能够被发布线程访问的队列中,如ConcurrentLinkedQueue
。
在发送消息时,将所有消息以id为key,消息体为value放入ConcurrentSkipMap中,在确认回调时,如果是批量确认,通过headMap(Long deliveryTag)
方法获取到所有发送成功的消息,然后clear()
,单个删除就通过id进行删除。成功回调处理完后,剩下的就为确认失败的消息。
交换机
消息传递模型的核心思想是,生产者生产的消息从来不会直接发送到队列。并且生产者不知道这些消息传递到那些队列中。
生产者只能将消息发送到交换机中。交换机的主要工作是接收生产者发送的消息,再将它们推入队列中。交换机必须准确地知道如何处理这些消息,如放入特定队列还是许多队列或者是丢弃他们
类型
默认交换机
默认交换是一个预先声明的直接交换,其名称被设置为空字符串""。当一个消息被发送到默认交换所时,它将被路由到队列,队列名称等于消息路由键。每个创建的队列都会自动绑定到默认交换所,其路由键与队列名称相同。
Direct-直接
直接交换使用消息的路由密钥,将消息路由到一个队列中。路由键是由生产者设置的,作为消息头。消息中的路由密钥必须与绑定中指定的路由密钥完全匹配–它就像一个地址,告诉交换所消息应该去哪里。
Fanout-广播(发布订阅)
广播/扇出交换复制并将收到的消息发送到与之绑定的所有队列。提供的路由密钥将被简单地忽略。
Topic-主题
订阅交换机根据路由键通配符对消息进行路由。消息被传递到一个或多个队列。尽管支持带有通配符的路由键,但它不是强制性的–你仍然可以隐含地提供路由键,不需要任何通配符。
订阅模式中路由键是一个用.字符分隔的单词列表,例如,taxi.type.small和taxi.type.large。
并且可以使用 * 符号来匹配特定位置的任何单词,如taxi.*.large。# 表示匹配0个或更多的词,如taxi.type.# ,它将匹配所有以taxi.type开始的所有路由键。
绑定
让交换机与队列之间通过routingkey产生联系,根据绑定之将消息发送到对应的队列中
交换机使用
Direct Exchange
当你想进行一次交换时,直接交换很有用,并根据路由密钥过滤,指定信息的目标 “地址”。
一个例子是一个报告系统,它以指定的格式异步生成报告。用户点击 "生成报告 "按钮并选择目标格式–PDF或DOCX文件。系统通过向x.generate-report交换发送消息来安排报告的生成,并指定目标目的地为路由键–pdf或docx。在这种情况下,生产者为信息提供一个地址,它应该被传送到哪里。
Fanout Exchange
当你想把消息发送到一个或多个队列时,Fanout模式很有用,它是广播的理想选择。它类似于发布/订阅模型–作为消费者,你订阅了一些东西,而发布者将消息发送给所有订阅者
。消息可以被不同的消费者以不同的方式处理。路由键将被忽略。
// 声明交换机,队列,RouteKey
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE_A = "fanout.queue.a";
public static final String FANOUT_QUEUE_B = "fanout.queue.b";
public static final String FANOUT_ROUTE_PARENT = "fanout.route.*";
// 创建一个fanout类型的交换机
// 参数为:交换机名,交换机类型,是否持久化
channel.exchangeDeclare(FANOUT_EXCHANGE, “fanout”, true);
// 创建两个队列
// 参数为:队列名,是否持久化,是否排他,是否自动删除,其他参数
channel.queueDeclare(FANOUT_QUEUE_A, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_B, true, false, false, null);
// 绑定交换机和队列
channel.queueBind(FANOUT_QUEUE_A, FANOUT_EXCHANGE, FANOUT_ROUTE_PARENT);
channel.queueBind(FANOUT_QUEUE_B, FANOUT_EXCHANGE, FANOUT_ROUTE_PARENT);
// 发送消息到交换机
channel.basicPublish(FANOUT_EXCHANGE,"",null,message,getBytes(StandardCharsets.UTF_8));
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(FANOUT_QUEUE_A, true, deliverCallback, consumerTag -> {});
运行后,虽然使用的空串路由键,但是当前交换机绑定的两个队列都能收到消息
Topic-主题
相对于路由交换机和广播交换机,主题交换机在使用时更加的灵活
主题交换将多个消息消费者与过滤结合在一起,基于消息路由键和通配符匹配。当你想把一个消息发送到一个以上的有限队列,但不是所有的队列时。
该交换机使用的路由键必须为列表,以点号进行隔开,可以为任意单词进行拼接。其中还可以使用替换符,*可以代替一个单词,#可以替代零个或多个单词
如果路由键为**#**,那么这个队列能接收到所有消息;如果路由键中没有出现 ***** 和 # 时,这个队列就相当于直连模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE_A = "topic.queue.a";
public static final String TOPIC_QUEUE_B = "topic.queue.b";
public static final String TOPIC_QUEUE_C = "topic.queue.c";
public static final String TOPIC_ROUTE_PARENT_A = "topic.route.*";
public static final String TOPIC_ROUTE_PARENT_B = "*.topic.route";
public static final String TOPIC_ROUTE_PARENT_C = "topic.#.route";
public static final String TOPIC_ROUTE_A = "topic.route.a";
public static final String TOPIC_ROUTE_B = "b.topic.route";
public static final String TOPIC_ROUTE_C = "topic.dd.aa.c";
channel.exchangeDeclare(TOPIC_EXCHANGE, TOPIC, true);
channel.queueDeclare(TOPIC_QUEUE_A, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_B, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_C, true, false, false, null);
channel.queueBind(TOPIC_QUEUE_A, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_A);
channel.queueBind(TOPIC_QUEUE_B, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_B);
channel.queueBind(TOPIC_QUEUE_C, TOPIC_EXCHANGE, TOPIC_ROUTE_PARENT_C);
String sendMsg = "队列C能收到的消息";
channel.basicPublish(TOPIC_EXCHANGE,
TOPIC_ROUTE_C,
new AMQP.BasicProperties.Builder()
.headers(headerC)
.build(),
sendMsg.getBytes(StandardCharsets.UTF_8));
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("收到的消息为:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(TOPIC_QUEUE_C,true,deliverCallback,consumerTag -> {});
该模式发送了消息后,能够在管理页面中看到,只有队列c接收到了信息,使用的TOPIC_ROUTE_PARENT_C
路由键,其中的 # 替代了dd.aa,所以只有队列c能收到消息。
死信队列
死信队列表示消费者在某些特定的情况下无法消费队列中的消息,并且这些消息没有后续处理,这些消息就变为死信。当消息成为死信后,可以将它们发送到一个新的队列中,这个队列就称为死信队列。即死信队列信息来源于普通队列
在实际使用过程中,死信队列用来保证业务不丢失。或者处理设置了时间过期的消息(延迟队列)
当对消息设置了过期时间、队列满了无法再添加数据到队列中、消息被拒绝,都是出现死信的来源
一个简单的死信队列结构包括,一个生产者,两个消费者,两个队列。当普通队列中的消息发生过期等情况,将该条消息所在的队列通过绑定的队列发送到新的队列中
设置TTL
class ConsumerA {
static final String NORMALEXCHANGE = "normal_exchange";
static final String DEADEXCHANGE = "dead_exchange";
static final String NORMALQUEUE = "normal_queue";
static final String DEADQUEUE = "dead_queue";
public static void main(String[] arg) throws Exception {
Channel channel = CreateChannel.getCahnnel();
channel.exchangeDeclare(NORMALEXCHANGE,BuiltinExchangeType.Direct,false);
channel.exchangeDeclare(DEADEXCHANGE,BuiltinExchangeType.Direct,false);
Map<String,Objcet> properties = new HashMap<>();
// 普通队列设置死信交换机
properties.put("x-dead-letter-exchange",DEADEXCHANGE);
// 死信路由
properties.put("x-dead-letter-routing-key","dead.key");
// 过期时间
properties.put("x-message-ttl",10000);
channel.queueDeclare(NORMALQUEUE,false,false,false,properties);
channel.queueDeclare(DEADQUEUE,false,false,false,null);
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接受到的消息为:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMALQUEUE,true,deliverCallback,consumerTag -> {});
}
}
class Producter {
static final String NORMALEXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = CreateChannel.getChannel();
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().expiration("4000").build();
// 发送消息
for(int i = 0; i < 10; i++) {
String message = "发送的消息为:"+i;
channel.basicPublish(NORMALEXCHANGE,
"normal.key",
properties,
message.getBytes(StandardCharsets.UTF_8))
}
}
}
先启动ConsumerA,用来创建两个交换机和两个队列,然后启动Producter发送消息,此时ConsumerA能接收到消息
此时ConsumerA断开连接,Producter再次发送消息,此时消息堆积在队列中没有消费者消费消息,当过了设置的消息过期时间后,普通队列中的消息将会转移到死信队列中,这时可以在创建一个消费者来处理这些未完成消费的消息。
队列达到最大长度
class ConsumerA {
static final String DEADEXCHANGE = "dead_exchange";
static final String NORMALQUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = CreateChannel.getChannel();
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange",DEADEXCHANGE);
properties.put("x-dead-letter-routing-key","dead.key");
// 设置队列最大消息数
properties.put("x-max-length",4);
channel.queueDeclare(NORMALQUEUE, false, false, false, properties);
DeliverCallback deliverCallback = ((consumerTag, message) -> {
System.out.println("ConsumerA收到的消息:"+new String(message.getBody(),"UTF-8"));
});
channel.basicConsume(NORMALQUEUE,true,deliverCallback,consumerTag -> {});
}
}
class Product {
static final String NORMALEXCHANGE = "normal_exchange";
static final String DEADEXCHANGE = "dead_exchange";
static final String NORMALQUEUE = "normal_queue";
static final String DEADQUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = CreateChannel.getChannel();
channel.exchangeDeclare(NORMALEXCHANGE, BuiltinExchangeType.DIRECT, false);
channel.exchangeDeclare(DEADEXCHANGE, BuiltinExchangeType.DIRECT, false);
channel.queueBind(NORMALQUEUE, NORMALEXCHANGE, "normal.key");
channel.queueBind(DEADQUEUE, DEADEXCHANGE, "dead.key");
for (int i = 0; i < 9; i++) {
String message = "消息"+i;
channel.basicPublish(NORMALEXCHANGE,
"normal.key",
null,
message.getBytes(StandardCharsets.UTF_8));
}
}
}
当ConsumerA出现宕机等情况导致无法接收消息时,Product发送的消息将堆积在normal_queue
中,但是normal_queue
中设置了队列的最大容量,所以当队列堆满后,其他的消息将被发送到死信队列中。
消息被拒
class ConsumerA {
static final String DEADEXCHANGE = "dead_exchange";
static final String NORMALQUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = CreateChannel.getChannel();
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange", DEADEXCHANGE);
properties.put("x-dead-letter-routing-key", "dead.key");
channel.queueDeclare(NORMALQUEUE, false, false, false, properties);
DeliverCallback deliverCallback = ((consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if ("消息5".equals(msg)) {
System.out.println("拒绝的消息为:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("ConsumerA收到的消息:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
});
// 开启拒绝应答时,这里不能再自动应答,否则不生效
channel.basicConsume(NORMALQUEUE, false, deliverCallback, consumerTag -> {
});
}
}
开启拒绝后,其他消息需要开启手动应答,而channel.basicConsume
中的自动应答因为false,否则所有消息将会被当前消费者消费,不存在消息拒绝了。
延迟队列
延迟队列是死信队列中的一种,死信队列中的TTL就是延迟队列。
延迟队列内部是有序的
延迟队列使用场景:订单未在指定时间内支付就取消;发起申请未审批进行提醒;…
延迟队列在处理这些指定时间的消息时,相当于是定时任务,但是如果在这期间服务宕机,之前的定时任务将不再生效,且定时任务在处理大量任务时是通过轮询的方式进行处理,性能较低
Springboot中TTL延迟队列
模型:交换机X通过XA、XB路由键分别绑定QA、QB队列;QA、QB设置死信交换机Y,和死信路由键QABCY;死信队列Y通过路由键QABCY绑定队列QC
当QA、QB设置了不同的消息过期时间,当收到消息后,没有消费者消费,到达过期时间后,将消息发送给死信队列
声明配置交换机和队列的配置类
@Configuration
class TTLQueueConfig {
// 交换机
private static final String X_EXCHANGE = "X";
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 队列
private static final String A_QUEUE = "QA";
private static final String B_QUEUE = "QB";
private static final String C_DEAD_LETTER_QUEUE = "QC";
private static final String QAEX = "XA";
private static final String QBEX = "XB";
private static final String QABCEY = "YC";
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 队列A并绑定死信交换机
*
* @return {@link Queue}
*/
@Bean("queueA")
public Queue queueA() {
Map<String, Object> properties = new HashMap<>();
// 设置死信交换机和routingKey,过期时间(10s)
properties.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
properties.put("x-dead-letter-routing-key", QABCEY);
properties.put("x-message-ttl", 10000);
return QueueBuilder.durable(A_QUEUE).withArguments(properties).build();
}
@Bean("queueB")
public Queue queueB() {
Map<String, Object> properties = new HashMap<>();
// 设置死信交换机和routingKey,过期时间(40s)
properties.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
properties.put("x-dead-letter-routing-key", QABCEY);
properties.put("x-message-ttl", 40000);
return QueueBuilder.durable(B_QUEUE).withArguments(properties).build();
}
@Bean("queueC")
public Queue queueC() {
return QueueBuilder.durable(C_DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueABindX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange exchange) {
return BindingBuilder.bind(queueA).to(exchange).with(QAEX);
}
@Bean
public Binding queueBBindX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange exchange) {
return BindingBuilder.bind(queueB).to(exchange).with(QBEX);
}
@Bean
public Binding queueCBindY(@Qualifier("queueC") Queue queueC,
@Qualifier("yExchange") DirectExchange exchange) {
return BindingBuilder.bind(queueC).to(exchange).with(QABCEY);
}
}
配置类用来监听死信队列QC
@Component
@Slf4j
class QueueCListenner {
@RabbitListener(queues = "QC")
public void consumerC(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody(), "UTF-8");
log.info("当前时间{},死信队列收到消息{}", new Date().toString(), msg);
}
}
通过接口发送消息
@RestController
@RequestMapping("send")
class SendController {
@Autowired
RabbitTemplate template;
@GetMapping("info/{message}")
public void sendMessage(@PathVariable String message) {
template.convertAndSend("X", "XA", "10s队列:" + message);
template.convertAndSend("X", "XB", "40s队列:" + message);
}
}
现象
消耗时间Tue Jan 10 08:55:33 CST 2023,发送的消息为:卡上的纠纷
当前时间Tue Jan 10 08:55:52 CST 2023,死信队列收到消息10s队列:卡上的纠纷
当前时间Tue Jan 10 08:56:13 CST 2023,死信队列收到消息40s队列:卡上的纠纷
注意:这里QA、QB、QC使用的路由键需要一致,我刚开始操作的过程中可能没保持一致,导致发消息无法转发到死信队列中;QA、QB中配置的死信信息注意检查,配置死信交换机为x-dead-letter-exchange
,死信路由x-dead-letter-routing-key
TTL的优化
以上的方式是通过在队列中设置延迟时间完成消息过期,但是在实际生产中,如果再产生一个新的队列,可能需要再添加一个队列的配置,所以接下来通过交换机发送消息时为队列设置过期时间。声明队列时,同样设置死信队列和死信路由,但是不设置过期时间。
template.convertAndSend("交换机","路由键","消息",pro->{pro.getMessageProperties().setExpiration("过期时间")})
TTL中存在的问题
RabbitMQ指挥检查第一个消息是否过期,当第二个消息时间比第一个消息时间短时,也不会优先执行第二个消息
解决-基于插件化实现延迟队列
发布确认
正常情况下,消息生产者发送的消息到交换机后,只要有消费者,消息就会被消费
在未知情况下,可能出现rabbitmq出现宕机,在此期间消息投递失败,进而消息丢失,使用消息确认机制,将消息放入缓存中
交换机确认
交换机通过回调接口,感知消息
回调接口处于RabbitTemplate中的内部接口ConfirmCallback,自定义回调函数时需要实现这个内部接口,重写conrim方法,如下
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机消息确认回调函数
*
* @param correlationData 保存回调消息的ID即相关信息,需要在发送消息时设置其中的数据,否则为null
* @param ack 是否接收成功,成功为true,失败为false
* @param cause 原因,成功时为null,失败时为失败原因,想要失败时,可以在发送消息时用为申明的交换机
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String dataId = correlationData.getId() == null ? "" : correlationData.getId();
if (ack) {
log.info("交换机成功接收信息,消息ID为:{}", dataId);
} else {
log.info("交换机接收消息失败,失败原因为:{}",cause);
}
}
}
虽然实现了内部接口且重写了这个方法,但是这个方法实际上并没有生效,此时启动,RabbitTemplate依然会找自己内部的这个接口方法,所以我们需要将自定义的回调方法类注入到RabbitTemplage中
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate template;
/**
* 将当前实现的类注入到RabbitTemplate中的内部实现类
* 如果不注入,将继续走原有的内部接口
*/
@PostConstruct
public void init() {
template.setConfirmCallback(this);
}
...
}
此时注入了,但依然没有收到这个回调函数发送的信息,需要添加配置,开启交换机确认,实现回调功能
spring.rabbitmq.publisher-confirm-type=correlated
,当前使用
spring.rabbitmq.publisher-confirm-type=none
,禁用发布模式,默认值
spring.rabbitmq.publisher-confirm-type=simple
,第一种效果于correlated一致,第二种效果是消息发布成功后使用RabbitTemplate中的waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果判断下一步逻辑,如果waitForConfirmsOrDie返回了false,也就将channel关闭,后续不能进行操作。且该模式为同步确认消息(发送一条确认一条),消耗时间较长。
队列确认
生产者发送消息,如果交换机确认收到了,会直接发送给队列,在这过程中出现路由不存在,队列断开等情况,消息将被丢弃,而生产者不知道丢弃这个事件。
解决
开启回退功能spring.rabbitmq.publisher-returns = true
实现RabbitTemplate中的ReturnCallBack内部接口,重写returnedMessage方法,并将当前类注入到RabbitTemplate中,让其调用自定义的returnedMessage方法
class MyCallBack implements RabbitTemplate.ReturnsCallback{
@Autowired
RabbitTemplate template;
/**
* 将当前实现的类注入到RabbitTemplate中的内部实现类
* 如果不注入,将继续走原有的内部接口
*/
@PostConstruct
public void init() {
template.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returned) {
Message message = returned.getMessage();
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
String exchange = returned.getExchange();
int replyCode = returned.getReplyCode();
String replyText = returned.getReplyText();
String routingKey = returned.getRoutingKey();
log.info("从交换机{}退回的消息为{},失败状态为{},失败原因是{},使用的路由键为{}", exchange, msg, replyCode, replyText, routingKey);
}
}
验证上述功能可在发送消息时修改一个不存在的路由键,以下为测试结果
从交换机confirm_exchange退回的消息为第二条消息,失败状态为312,失败原因是NO_ROUTE,使用的路由键为confirm.key12
备份交换机
备份交换机本质是当前交换机失效,就转给其他交换机继续处理消息,可实现备份和报警功能
模型:直连交换机无法将消息路由到队列时,将消息转发给其他交换机,通过其他交换机进行消息的备份或者预警
此时直连交换机将重新声明,指定备份交换机
public DirectExchange declareExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE)
.durable(true)
.withArgument("alternate-exchange",BACKUP_EXCHANGE)
.build();
}
其他
幂等性
同一操作发起的一次或多次请求的结果是一致的,不会产生副作用
想要达成可通过设置全局ID,写入唯一标识。文章来源:https://www.toymoban.com/news/detail-741123.html
消费端的幂等性:唯一ID+指纹码机制、Redis的原子性文章来源地址https://www.toymoban.com/news/detail-741123.html
到了这里,关于RabbitMq介绍和使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!