theme: vue-pro
前言
前面章节我们至少知道了rabbitmq
的几个核心组件, 比如 exchange
queue
和 routing key
还有java
编程方面的 channel
和 connection
但是这些还不够运用于生产环境
本章内容
- 消息确认机制(
message confirm
) - 消息
return
机制
消息确认机制(confirm)
开始confirm机制, 生产者消息投递到exchange, exchange就会立即ack给生产者, 如果无法投递到exchange, 那么生产者就是产生nack
事务机制和
publisher confirm
机制确保的是消息能够正确地发送至RabbitMQ
,这里的“发送至RabbitMQ
”的含义是指消息被正确地发往至RabbitMQ
的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。
问: 事务机制为什么不讲?
答: 事务机制效率比较慢, P发送消息需要等待C的响应, 会导致极大的耗费性能
问: return
机制或者confirm
机制收到消息不成功后, 你要怎么处理呢?
答: 一般处理方式是: 重试几次, 如果还不通过, 那么打印日志, 交给人工处理
这里的重试可以考虑使用指数退避(重试频率从快到慢的过程)
异步方式消息确认
详细过程是在发送消息前保存消息到一个队列中, 在拿到遇到消息发送失败后, 我们拿取到队列中的消息, 然后尝试重试补偿
注意
Confirm
机制需要主动开启, 上图缺少了这段代码:channel.confirmSelect();
问: 这里引入
redis
保存这消息不合适吧?答: 看起来这里还可以将消息保存到一个高并发队列中, 开一个定时器定时解决(立马再次发送消息大概率还是会失败, 每次等他个几百毫秒吧), 失败累计次数, 直到超过域值
但是使用高并发队列只不过在内存中, 进程崩溃后改消息队列中的消息也将丢失
问: 那有别的方式吗?
答: 有, 保存到
mysql
或者直接持久化到queue
中, 等到管理员找到原因并恢复后再次发送
同步消息确认
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
channel.confirmSelect();
stopWatch.start();
for (int i = 0; i < 1000; i++) {
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));
// 单个消息确认机制
boolean flag = channel.waitForConfirms();
if (flag) {
System.err.println("消息发送成功");
}
}
但是1000次循环就需要调用 1000次
waitForConfirms
这效率特别低
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
channel.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
stopWatch.start();
for (int i = 0; i < 1000; i++) {
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));
outstandingMessageCount++;
if (batchSize == outstandingMessageCount) {
// 这是批量消息处理
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
这里就改成100次消息发送, 处理一次同步操作消息等待
消息return机制
当前的消息通过exchange
投递到queue
不成功, 就会触发return
机制的函数
mandatory参数的作用是什么?
文字描述: 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
immediate参数的作用(已弃用)
RabbitMQ 3.0版本开始去掉了对immediate参数的支持,原因是因为immediate参数会影响镜像队列的性能、增加了代码的复杂度,建议采用TTL和DLX的方法替代。
消息回退机制代码实现
Producer
的思路非常简单, 设置mandatory = true
, 然后channel.addReturnListener
public static final String EXCHANGE = "exchange";
public static final String ROUTING_KEY = "";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true,
true, null);
ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("消息发送成功...");
if (multiple) {
ConcurrentNavigableMap<Long, String> headMap = map.headMap(deliveryTag);
headMap.clear();
} else {
map.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("消息发送失败");
// 意味着这里存在多个消息被否定, 可以使用 head 拿到小于 deliveryTag 的消息, 全部进行重试
if (multiple) {
ConcurrentNavigableMap<Long, String> messageHeadMap = map.headMap(deliveryTag);
// 这里可以将消息保存到数据库或者redis中
messageHeadMap.forEach((id, s) -> {
System.err.println("id: " + id + " message: " + s);
});
messageHeadMap.clear();
} else {
String messageNode = map.remove(deliveryTag);
// 这里可以将消息保存到数据库或者redis中
System.err.println("message: " + messageNode);
}
}
});
channel.addReturnListener(returnMessage -> {
System.err.println("消息被return回来了...");
int replyCode = returnMessage.getReplyCode();
String replyText = returnMessage.getReplyText();
String exchange = returnMessage.getExchange();
String routingKey = returnMessage.getRoutingKey();
// AMQP.BasicProperties properties = returnMessage.getProperties();
String message = new String(returnMessage.getBody(), Charset.defaultCharset());
System.err.println("message: " + message);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("replyText: " + replyText);
System.err.println("replyCode: " + replyCode);
});
String message = "message";
map.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish(EXCHANGE, ROUTING_KEY, true, false, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(Charset.defaultCharset()));
}
Consumer:
public static final String QUEUE = "queue";
public static final String EXCHANGE = "exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true, true, null);
channel.queueDeclare(QUEUE, true, false, true, null);
channel.queueBind(QUEUE, EXCHANGE, "");
channel.basicConsume(QUEUE, false, (consumerTag, message) -> {
System.err.println("消息消费成功, consumeTag: " + consumerTag + " message: "
+ new String(message.getBody(), Charset.defaultCharset()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> System.err.println("消息接受失败, consumeTag: " + consumerTag));
}
问题思考
有了 mandatory
参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。
但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
而且设置 mandatory
参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
另一个return机制: exchange备机
来总结下前面的问题
- 消息回退增加消费者代码复杂度
- 回退回来的消息要怎么处理? 记录日志也不知道怎么处理? 难道要手动处理?
在 RabbitMQ
中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?
备份交换机可以理解为 RabbitMQ
中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,**当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout
,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。**当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
备机架构设计
备机和return机制同时存在默认走哪个机制?
备份exchange模式优先级高于消息return模式
什么情况的消息会进入备机?
消息无法投递到queue的原因, 我想想都有哪些?
- queue不存在
- queue满了
- 根据 routing key 找不到 queue (和第一种很像)
消息进入备机后, 我们该做什么?
答:
- 保存消息
- 报警告, 推消息告知管理员
- 最后在解决问题后将消息转发给正常的
exchange
有哪些方法保存消息?
答:
- 可以保存到数据库, 但有
IO
上限, 不是很好的选择. - 将消息持久化到
queue
中, 但是不创建自动消费的消费者, 我们可以在问题被发现并解决后再手动启动处理备份消息 - 保存到
redis
中, 方案不错, 但是在整个分布式系统中引入了新的变量, 导致项目不太问题
最终我还是选择了
redis
, 因为业务需求, 我们需要对备份的消息进行发送和查询甚至是删除操作, 持久化到queue
不好操作, 当然如果你持久化到queue
那么你需要保证消息另一个原因是
redis
是分布式系统不可或缺的一个组件, 早晚都会用上的
我们虽然选择了
redis
方案, 但不意味着第二个方案(持久化到queue
)不好, 我本人更加推荐使用方案二, 但是第二种方案我们需要及时处理问题, 否则可能导致消息爆满, 当然redis
方案也是
发送消息的目的地是哪里?
正常是 exchange
那么怎么发送消息?
首先我们需要对消息进行包装, 不仅仅有内容还需要有发送地址
大致结构是这样:
消息id, 消息内容, 消息创建时间, 消息优先级
- 消息id方式消息重复消费
- 消息创建时间检查消息超过一定域值, 还没被消费, 后续可以做严重警告
- 消息优先级: 可以当做消息重要性, 数字越大优先级越高
public class ConfirmConsumer {
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<>();
// 将exchange无法发送的消息, 转发给备份exchange
arguments.put("alternate-exchange", BACKUP_EXCHANGE);
channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);
channel.queueDeclare(CONFIRM_QUEUE, true, false, true, null);
channel.queueBind(CONFIRM_QUEUE, CONFIRM_EXCHANGE, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.err.println("消息接收时间: "
+ DatePattern.NORM_DATETIME_FORMAT.format(new Date())
+ " consumerTag: " + consumerTag + " message: " + message);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = System.err::println;
channel.basicConsume(CONFIRM_QUEUE, false, deliverCallback, cancelCallback);
}
}
注意上面这段代码:
HashMap<String, Object> arguments = new HashMap<>(); // 将exchange无法发送的消息, 转发给备份exchange arguments.put("alternate-exchange", BACKUP_EXCHANGE); channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);
在正常的
exchange
上绑定备份exchange
public class BackupConsumer {
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String BACKUP_QUEUE = "backup.queue";
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);
channel.queueDeclare(BACKUP_QUEUE, true, false, true, null);
channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");
scanner.next();
System.err.println("发送消息: " + ", messageId: " + message.getProperties().getMessageId()
+ ", 消息内容: " + new String(message.getBody(), Charset.defaultCharset()));
channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = System.err::println;
channel.basicConsume(BACKUP_QUEUE, false, deliverCallback, cancelCallback);
}
}
public class WarningConsumer {
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String WARNING_QUEUE = "warning.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);
channel.queueDeclare(WARNING_QUEUE, true, false, true, null);
channel.queueBind(WARNING_QUEUE, BACKUP_EXCHANGE, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
AMQP.BasicProperties properties = message.getProperties();
Integer priority = properties.getPriority();
String messageId = properties.getMessageId();
Date timestamp = properties.getTimestamp();
System.err.println("日志警告, 消息等级: " + priority + ", 消息id: " + messageId
+ ", 消息创建时间: " + DatePattern.NORM_DATETIME_FORMAT.format(timestamp) + ", 消息内容: "
+ new String(message.getBody(), Charset.defaultCharset()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = System.err::println;
channel.basicConsume(WARNING_QUEUE, false, deliverCallback, cancelCallback);
}
}
public class Producer {
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<>();
// 将exchange无法发送的消息, 转发给备份exchange
arguments.put("alternate-exchange", BACKUP_EXCHANGE);
channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {
String dateTime = DatePattern.NORM_DATETIME_FORMAT.format(new Date());
System.err.println(deliveryTag + ": 消息应答" + " '发送'时间为: " + dateTime);
}, (deliveryTag, multiple) -> System.err.println(deliveryTag + ": 消息未得到应答"));
// 消费者提供消息超时功能
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain");
builder.deliveryMode(2);
builder.priority(0);
for (int i = 0; i < 10; i++) {
builder.timestamp(new Date());
builder.messageId(UUID.fastUUID().toString(true));
String message = "content: " + UUID.fastUUID().toString(false) + i;
channel.basicPublish(CONFIRM_EXCHANGE, "",
builder.build(), message.getBytes(Charset.defaultCharset()));
}
}
}
记住, 这里我们的备份消费者并没有做真实的操作, 而是使用文章来源:https://www.toymoban.com/news/detail-790756.html
System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");
scanner.next();
channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());
模拟了管理员找问题, 解决问题最后将消息转发给正常的exchange
的过程文章来源地址https://www.toymoban.com/news/detail-790756.html
到了这里,关于rabbitmq的消息确认和消息回退的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!