RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

这篇具有很好参考价值的文章主要介绍了RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、RabbitMQ消息可靠性概述

1、引出问题

我们先看一串代码,并思考一下为什么要先入库然后发MQ:

public int add(Merchant merchant) {
    int k = merchantMapper.add(merchant);
    System.out.println("aaa : "+merchant.getId());
    JSONObject title = new JSONObject();
    String jsonBody = JSONObject.toJSONString(merchant);
    title.put("type","add");
    title.put("desc","新增商户");
    title.put("content",jsonBody);
    rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
    return k;
}

如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。

2、RabbitMQ消息可靠性保证的四个环节

RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递
① 消息从生产者发送到Broker
生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接收?如果Broker不给应答,生产者发送的消息也无法知道成功还是失败。

② 消息从Exchange路由到Queue
Exchange交换机维护一个与Queue的绑定列表,它的职责是分发消息。如果交换机处理消息的分发出现问题怎么办?

③ 消息存储在Queue
RabbitMQ的队列有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,消息要一直存储在队列中。队列中的消息有没有丢失的可能?怎么保证消息在队列中稳定的存储呢?

④ 消费者订阅Queue并消费消息
队列是先进先出的,消息投递给消费者是一条一条投递的,如何能保证消费者正确地消费了消息?

二、保证生产者消息发送到RabbitMQ服务器

生产者发送RabbitMQ消息时,如果遇到了网络问题或者Broker的问题(硬盘故障、磁盘写满等),就会导致消息发送失败,生产者无法确定Broker是否正确地接收到消息。

在RabbitMQ中提供了两种生产者确认机制,也就是说生产者发送消息给RabbitMQ时,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

1、服务端确认:Transaction模式

在事务模式里面,只有收到了服务端的Commit-OK指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。
RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试事务模式。发送消息的效率比较低,建议使用Confirm模式
 * 参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class TransactionProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
        	// 开启事务
            channel.txSelect();
            // 发送消息,发布了4条,但只确认了3条
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit(); // 提交
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            int i =1/0;
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (Exception e) {
            channel.txRollback(); // 回滚
            System.out.println("消息已经回滚");
        }

        channel.close();
        conn.close();
    }
}

(2)springbootAPI

rabbitTemplate.setChannelTransacted(true);

2、服务端确认:Confirm模式

Confirm模式有三种,单个确认、批量确认、异步确认。

RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 普通确认模式
 */
public class NormalConfirmProducer {

    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Normal Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启发送方确认模式
        channel.confirmSelect();

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // 普通Confirm,发送一条,确认一条
        if (channel.waitForConfirms()) {
            System.out.println("消息发送成功" );
        }else{
            System.out.println("消息发送失败");
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试批量Confirm模式
 */
public class BatchConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Batch Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
            // 开启confirm模式
            channel.confirmSelect();
            for (int i = 0; i < 5; i++) {
                // 发送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            }
            // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
            // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
            // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送完毕,批量确认成功");
        } catch (Exception e) {
            // 发生异常,可能需要对所有消息进行重发
            e.printStackTrace();
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * 消息生产者,测试异步Confirm模式
 *  参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class AsyncConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, Async Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 用来维护未确认消息的deliveryTag
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
        // 异步监听确认和未确认的消息
        // 如果要重复运行,先停掉之前的生产者,清空队列
        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未确认消息,标识:" + deliveryTag);
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // 这里添加重发的方法
            }
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一个元素
                    confirmSet.remove(deliveryTag);
                }
                System.out.println("未确认的消息:"+confirmSet);
            }
        });

        // 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 发送消息
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            confirmSet.add(nextSeqNo);
        }
        System.out.println("所有消息:"+confirmSet);

        // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
        //channel.close();
        //conn.close();
    }
}

(2)springbootAPI

Confirm模式是在Channel上开启的,RabbitTemplate对Channel进行了封装。

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("--------收到服务端异步确认--------");
        System.out.println("received ack: "+ack);
        System.out.println("cause: "+cause);
        System.out.println("correlationId: "+correlationData.getId());
        if (!ack) {
            System.out.println("发送消息失败:" + cause);
            throw new RuntimeException("发送异常:" + cause);
            // 做一些回滚、重试、记录处理
        } else {
			System.out.println("消息确认成功");
		}
    }
});

三、保证消息从交换机路由到队列

如果routingkey错误,或者队列不存在(但是生产环境基本不会出现这么低级的错误),就会导致消息从交换机无法路由到队列。

我们有两种方式处理无法路由的消息,一种是服务端回发给生产者,一种是让交换机路由到另一个备份的交换机。

1、消息回发

(1)JavaAPI

channel.addReturnListener(new ReturnListener() {
	   public void handleReturn(int replyCode,
	                            String replyText,
	                            String exchange,
	                            String routingKey,
	                            AMQP.BasicProperties properties,
	                            byte[] body)
	           throws IOException {
	       System.out.println("=========监听器收到了无法路由,被返回的消息============");
	       System.out.println("replyText:"+replyText);
	       System.out.println("exchange:"+exchange);
	       System.out.println("routingKey:"+routingKey);
	       System.out.println("message:"+new String(body));
	   }
});

(2)SpringbootAPI

Springboot消息回发是使用mandatory参数和ReturnListener(在SPringAMQP中是ReturnCallback)

// 为RabbitTemplate设置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            System.out.println("--------收到无法路由回发的消息--------");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
            System.out.println("properties:" + message.getMessageProperties());
            System.out.println("body:" + new String(message.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
});

2、消息路由到备份交换机

在创建交换机时,从属性中指定备份交换机。

(1)JavaAPI

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
        contentEncoding("UTF-8").build();

// 备份交换机
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");

// 在声明交换机的时候指定备份交换机
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);


// 发送到了默认的交换机上,由于没有任何队列使用这个关键字跟交换机绑定,所以会被退回
// 第三个参数是设置的mandatory,如果mandatory是false,消息也会被直接丢弃
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只为更好的你".getBytes());

四、保证消息在队列中可靠存储

如果消息一直没有被消费者消费,消息会一直存储在队列中,并且队列中的数据存储在数据库中。

如果RabbitMQ服务或者硬件发生了故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘中。

1、队列持久化

声明队列时可以设置几个重要的参数:

// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// springboot中
@Bean("reliableQueue")
public Queue queue() {
	// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}

参数解析:
durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会丢失。设置为true可以保证消息持久化
exclusive:排他性队列的特点是:只对首次声明它的连接(Connection)可见;会在其连接断开的时候自动删除。
autoDelete:没有消费者连接的时候,自动删除。

2、交换机持久化

声明交换机时可以设置这几个重要的参数,和队列类似:

// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {
	// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}

3、消息持久化

在生产者发送消息时,可以指定消息的配置:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)   // 2代表持久化
        .contentEncoding("UTF-8")  // 编码
        .expiration("10000")  // TTL,过期时间
        .headers(headers) // 自定义属性
        .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用
        .messageId(String.valueOf(UUID.randomUUID()))
        .build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一条正常的消息".getBytes(), messageProperties);

4、搭建集群

如果只有一个RabbitMQ节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件故障,RabbitMQ的服务一样是不可以的。

所以为了提高MQ服务的可用性,保证消息的传输,我们需要有多个RabbitMQ的节点。

RabbitMQ集群搭建与高可用实现

五、保证消费者成功消费消息

如果消费者收到消息后没来得及处理或者发生了一场,就会导致消费失败。RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端。

如果消费者拿到消息没有ACK会怎么样?
没有收到ACK的消息,消费者断开连接之后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

1、自动ACK

默认就是自动ACK。消费者收到消息的时候就会自动发送ACK,而不是方法执行成功的时候发送ACK。这种情况RabbitMQ只会关心消费者是否接收到了消息,对消息的处理结果是不关心的,所以通常来说我们会选择手动ACK。

// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");
        System.out.println("consumerTag : " + consumerTag );
        System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
    }
};
// 开始获取消息 autoAck参数为true代表自动ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);

同样的,在Springboot中如果没有特殊配置,默认的就是自动ACK。

2、手动ACK

// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");

        if (msg.contains("拒收")){
            // 拒绝消息
            // requeue:是否重新入队列,true:是,会再发给其他消费者;false:直接丢弃,相当于告诉队列可以直接删除掉
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicReject(long deliveryTag, boolean requeue)
            channel.basicReject(envelope.getDeliveryTag(), false);
        } else if (msg.contains("异常")){
            // 批量拒绝
            // requeue:是否重新入队列
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicNack(long deliveryTag, boolean multiple, boolean requeue)
            channel.basicNack(envelope.getDeliveryTag(), true, false);
        } else {
            // 手工应答
            // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
            // basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(envelope.getDeliveryTag(), true);
        }
    }
};

// 开始获取消息,注意这里开启了手工应答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

在Springboot中,可以这样配置应答方式:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中这样设置:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

消费者中获取到Channel对象,就可以进行应答了:

@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent,Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

注意这个枚举值:
NONE:自动ACK
MANUAL:手动ACK
AUTO:如果方法未抛出异常,则发送ACK。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队。如果抛出异常是AmqpRejectAndDontRequeueException则发送nack并不会重新入队。

注意!拒绝消息时如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者处理,如果只有一个消费者时,这种方式可能会出现无限循环重复消费的情况。

六、保证消息一致之消费者回调

消费者成功消费了消息之后,如何告知生产者该消息已经被成功消费了?

虽然说使用MQ的目的之一是解耦,但是某些一致性要求很高的场景,比如说金融业务,还是很有必要通知生产者的。

1、调用生产者API

例如,提单系统给其他系统发送了保险消息后,其他系统必须在处理完消息之后调用提单系统提供的API,来修改提单系统中这笔数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

但是这种方式又从解耦的状态变成了耦合状态,还是需要根据实际情况来判断是否要采用这种方式。

2、发送响应消息给生产者

例如商业银行与人民银行二代支付系统(使用IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。

整个通信的流程设计非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的。
RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

七、保证消息一致性之补偿机制

如果消费者消费了消息之后,一直迟迟没有通知给生产者,我们如何知道消费者已经成功消费了消息?
就像是银行A给银行B发起了一笔转账,银行B一直没有给银行A回调,此时我们如何确定银行B确定是已经处理了该消息?

此时生产者与消费者之间应该约定一个超时时间,对于超出这个时间没有得到响应的消息,才确定为消费失败,比如说5分钟。

1、消息结果查证

假如说生产者5分钟内没有收到消费者消费成功的回调,生产者可以主动发起一次结果查证,通过业务要素或者唯一流水号,查证该业务在消费者是否正常消费。

2、消息重发

假如说消息一直没有结果,就需要考虑消息重发了。

可以启动一个定时任务,比如30秒跑一次,查询业务表业务状态是中间状态的记录,查询出来,构建MQ消息,重新发送。也可以单独设计一张消息表,把本系统所有发送出去的消息全部异步登记,状态是未回复的消息,进行重发(这种方式会对数据库性能造成一定损耗)。

重发机制也不可能一直重复发,如果消费者确实是有bug或者其他问题,如果一直重复发送会导致死循环了。我们可以设置一个衰减机制,第一次间隔一分钟,第二次间隔2分钟,最终发送三次,三次过后如果还没有收到回复,就需要将消息设置为特殊状态,进行人工干预。

3、消息幂等性

消息重发就意味着要处理消息的幂等。

什么是服务的幂等?为什么要实现幂等?
接口的幂等性——详细谈谈接口的幂等即解决方案

4、最终一致性

在一些一致性很强的接口调用中,比如说转账操作,通常会在一天的业务结束之后,第二天营业之前,生产者和消费者之间会进行一次消息对账。生成一个对账文件,两者分别解析对方的文件进行对账,如果确实有消息不一致的情况,会通过短信或者邮件的方式通知业务人员进行手动处理,要么把钱退回,要么把钱补上。

八、消息的顺序性

在RabbitMQ中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用队列)。

除非负载的场景,不要用多个消费者消费消息,可以保证消息的顺序性。文章来源地址https://www.toymoban.com/news/detail-504697.html

到了这里,关于RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(37)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(39)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(41)
  • RabbitMQ 能保证消息可靠性吗

    手把手教你,本地RabbitMQ服务搭建(windows) 消息队列选型——为什么选择RabbitMQ RabbitMQ灵活运用,怎么理解五种消息模型 推或拉? RabbitMQ 消费模式该如何选择 死信是什么,如何运用RabbitMQ的死信机制? 前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    浏览(34)
  • 如何保证 RabbitMQ 的消息可靠性?

    项目开发中经常会使用消息队列来 完成异步处理、应用解耦、流量控制等功能 。虽然消息队列的出现解决了一些场景下的问题,但是同时也引出了一些问题,其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。 如果在项目中遇到需要保证消息一定被消费的场景

    2024年02月07日
    浏览(38)
  • Rabbitmq怎么保证消息的可靠性?

    一、消费端消息可靠性保证 : 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成

    2024年04月14日
    浏览(45)
  • rabbitmq如何保证消息的可靠性

    RabbitMQ可以通过以下方式来保证消息的可靠性: 在发布消息时,可以设置消息的delivery mode为2,这样消息会被持久化存储在磁盘上,即使RabbitMQ服务器重启,消息也不会丢失。 可以创建持久化的队列,这样即使RabbitMQ服务器重启,队列也不会丢失。 在消费者端,可以 设置手动

    2024年01月23日
    浏览(38)
  • rabbitmq如何保证消息的可靠性传输(简述版本)?

    我需要从三点去考虑, 生产者弄丢了数据,生产者将消息发送的Exchange并且路由到队列 队列需要将消息给它持久化 消费者要成功消费队列中的消息 RabbitMQ提供了confirm机制,保证了消息消息发送的Exchange交换机,那么还提供了return机制,可以保证消息从exchange路由到队列中,如

    2024年02月13日
    浏览(30)
  • RabbitMQ如何保证消息的可靠性6000字详解

    RabbitMQ通过生产者、消费者以及MQ Broker达到了解耦的特点,实现了异步通讯等一些优点,但是在消息的传递中引入了MQ Broker必然会带来一些其他问题,比如如何保证消息在传输过程中可靠性(即不让数据丢失,发送一次消息就会被消费一次)?这篇博客将详细从生产者,MQ B

    2024年02月16日
    浏览(30)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包