Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

这篇具有很好参考价值的文章主要介绍了Rabbitmq入门与应用(六)-rabbitmq的消息确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

rabbitmq的消息确认机制

确认消息是否发送给交换机

配置
server:
  port: 11111
spring:
  rabbitmq:
    port: 5672
    host: 192.168.201.81
    username: admin
    password: 123
    publisher-confirm-type: correlated
编码RabbitTemplate.ConfirmCallback

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);

配置类

rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);

CorrelationData:

1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。

@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate);
    rabbitTemplate.setMessageConverter(messageConverter());
    rabbitTemplate.setConfirmCallback(new RabbitConfirmCallbackImpl());
    return rabbitTemplate;
}

/**
 * 确保消息是否发送到交换机
 */
class RabbitConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         log.warn("****Exchange callback-检验是否发送成功********");
         log.warn("correlationData->相关数据:{}",correlationData);
         log.warn("ack->Exchange响应:{}",ack);
         log.warn("cause->错误原因:{}",cause);
 }
}
测试发送

测试向交换机发送数据,测试交换机是否成功收到。

假设给一个错误的Exchange
@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
         //错误的Exchange名称,实际名称为:ssc_sc_routing_exchange
        final String EXCHANGE = "ssc_sc_routing_exchangex";
        final String ROUTING_KEY = "ssc_sc_routing_key";

        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

如果Exchange正确
@Override
public void sendMessage(String msg) {
    final String EXCHANGE = "ssc_sc_routing_exchange";
    final String ROUTING_KEY = "ssc_sc_routing_key";
    rabbitTemplate.convertAndSend(
            EXCHANGE,
            ROUTING_KEY,
            msg
    );
}

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

确认消息是否从交换机发送到队列RabbitTemplate.ReturnsCallback

设置ResturnsCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

配置文件
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true     #检查是否绑定到队列中
配置
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitConfirmReturnCallbackImpl());

class RabbitConfirmReturnCallbackImpl implements RabbitTemplate.ReturnsCallback{
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
       log.warn("message:{}",returnedMessage.getMessage());
       log.warn("exchange:{}",returnedMessage.getExchange());
       log.warn("replyCode:{}",returnedMessage.getReplyCode());
       log.warn("replyText:{}",returnedMessage.getReplyText());
       log.warn("routingKey:{}",returnedMessage.getRoutingKey());
    }
}
测试

修改routingkey的值,让交换机不能路由到指定Queue。

package com.wnhz.ssc.cloud.mq.service.impl;

import com.wnhz.ssc.cloud.mq.service.IMqService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
        final String EXCHANGE = "ssc_sc_routing_exchange";
        //修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_key
        final String ROUTING_KEY = "ssc_sc_routing_keyx";
        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

返回message:

message:(
Body:'"hello confirm call back"'
    MessageProperties
    [
        headers={
                __TypeId__=java.lang.String
                 },
        contentType=application/json,
        contentEncoding=UTF-8,
        contentLength=0,
        receivedDeliveryMode=PERSISTENT,
        priority=0,
        deliveryTag=0
   ]
)

消费确认信息

消费监听模式
  • Simple模式

    Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

    Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。

  • Direct模式

    Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

    压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。

Message对象结构

Message对象的结构,

消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

消息确认方式
  1. AcknowledgeMode.AUTO:自动确认。
  2. AcknowledgeMode.NONE:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

direct模式:

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

simple模式:

Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java
消费端监听发送
@RabbitListener(queues = "data_confirm_queue")
@Override
public void receiveBookFromMq(Message message, Channel channel, Book book) {

    log.debug("message:{}", message);
    log.debug("message.getMessageProperties().getHeaders()===>{}",
            message.getMessageProperties().getHeaders());
    log.debug("[order消费者:]接收到消息: {}", book);

    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("消息队列确认: {},{}",
                message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
    } catch (IOException e) {
        e.printStackTrace();
    }
}
手动确认方式
  1. Basic.Ack 命令:用于确认当前消息。
  2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
  3. Basic.Reject 命令:用于拒绝当前消息
channel.basicAck(long deliveryTag,boolean multiple)

basicAck 方法用于确认当前消息。

public void basicAck(long deliveryTag, boolean multiple) throws IOException {
    this.delegate.basicAck(deliveryTag, multiple);
}
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

  • multiple:为了减少网络流量,手动确认可以被批处理。

    • true: 代表批量应答 channel 上未应答的消息,比当前tag小的未应答的也一并应答(如5,6,7未应答)。

      Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java
    • false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

      Rabbitmq入门与应用(六)-rabbitmq的消息确认机制,rabbitmq,rabbitmq,ruby,分布式,java

basicNack

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
    this.delegate.basicNack(deliveryTag, multiple, requeue);
}
basicReject(long deliveryTag, boolean requeue)

basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。

public void basicReject(long deliveryTag, boolean requeue) throws IOException {
    this.delegate.basicReject(deliveryTag, requeue);
}

消息遗弃或入队,一般建议消息丢弃重新发。文章来源地址https://www.toymoban.com/news/detail-830583.html

  • requeue: true :重回队列,false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义。
出现异常的解决方案
package com.wnhz.mq.order.service.impl;

import com.rabbitmq.client.Channel;
import com.wnhz.domain.Book;
import com.wnhz.mq.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {

    private void buildException(){
        throw  new RuntimeException("[消费者:] 消费出现异常......");
    }

    @RabbitListener(queues = "data_confirm_queue")
    @Override
    public void receiveBookFromMq(Message message, Channel channel, Book book) {
        try {
            //制造异常测试
            buildException();
            log.debug("message:{}", message);
            log.debug("message.getMessageProperties().getHeaders()===>{}",
                    message.getMessageProperties().getHeaders());
            log.debug("[order消费者:]接收到消息: {}", book);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

            log.debug("消息队列确认: {},{}",
                    message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
        } catch (Exception e) {
          log.debug("消费异常: {}",e.getMessage());
            try {
                log.debug("尝试丢弃:{}消息.....................",book);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

到了这里,关于Rabbitmq入门与应用(六)-rabbitmq的消息确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(39)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(51)
  • RabbitMQ的几种消息确认机制详细介绍

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的几种消息确认机制。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大家好,我是

    2023年04月25日
    浏览(40)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • RabbitMQ消息可靠性投递与ACK确认机制

    什么是消息的可靠性投递 保证消息百分百发送到消息队列中去 保证MQ节点成功接收消息 消息发送端需要接收到MQ服务端接收到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者–交换机–队列–消费者 通过两个节点控制

    2024年02月20日
    浏览(33)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(29)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • 【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

    我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势 首先创建一个 SpringBoot 项目,用于后续的演示 springboot 版本为 2.2.1.RELEASE rabbitmq 版本为  3.7.5   依赖配置文件 pom.xml 在 a

    2024年01月18日
    浏览(34)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储

    在RabbitMQ中,生产者(Producer) 负责发送消息 ,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则 负责处理接收到的这些消息 。在RabbitMQ中,生产者和消费者之间使用 交换器(Exchange)和队列(Queue)进行消息路由和存储 。生产者将消息发送到

    2024年02月01日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包