rabbitmq消息可靠性之消息回调机制

这篇具有很好参考价值的文章主要介绍了rabbitmq消息可靠性之消息回调机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

rabbitmq消息可靠性之消息回调机制

rabbitmq回调机制,java-rabbitmq,rabbitmq,分布式

rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制

  • 生产者确认机制

  • 消息持久化存储

  • 消费者确认机制

  • 失败重试机制

本文主要讲解生产者确认机制,也是rabbitmq提供的消息回调机制,这个机制可以解决生产者发送消息到交换机和交换机路由到队列过程中的消息丢失问题

这种机制必须给每个消息指定一个唯一ID,消息发送到rabbitmq之后会返回结果给生产者,表示消息是否发送成功,返回结果有以下两种

  • publisher-confirm:发送者确认:消息成功投递到交换机,返回 ack;消息未投递到交换机,返回 nack

  • publisher-return:发送者回执:消息成功投递到交换机,但是没有路由到队列。返回 ack,及路由失败原因

spring:
  rabbitmq:
    # rabbitMQ的ip地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 集群模式配置
    # addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
    username: admin
    password: 123456
    virtual-host: /
    # 消费者确认机制相关配置 
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
      mandatory: true

然后定义 ReturnCallback 回调,每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置

package com.gitee.small.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {

    //绑定键
    public final static String DOG = "topic.dog";
    public final static String CAT = "topic.cat";


    /**
     * Queue构造函数参数说明
     * new Queue(SMS_QUEUE, true);
     * 1. 队列名
     * 2. 是否持久化 true:持久化 false:不持久化
     */


    @Bean
    public Queue firstQueue() {
        return new Queue(DOG);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(CAT);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    /**
     * 将firstQueue和topicExchange绑定,而且绑定的键值为topic.dog
     * 这样只要是消息携带的路由键是topic.dog,才会分发到该队列
     */
    @Bean(name = "binding.dog")
    public Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
    }

    /**
     * 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
     * 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
     */
    @Bean(name = "binding.cat")
    public Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

接着定义 ConfirmCallback,ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同,上面已经定义好exchange 和 queue,新建RabbitMqTest测试类

package smallJ;

import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws InterruptedException {
        // 1.准备CorrelationData
        // 1.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 1.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.info("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{},原因:{}", correlationData.getId(), result.getReason());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
            // 可以重发消息
        });
        rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式测试-dog", correlationData);
        // 程序休眠两秒等待回调
        Thread.sleep(2000);
    }
}

加两个监听器进行测试

package com.gitee.small.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicRabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
    ))
    public void process(String msg) {
        log.info("dog-收到消息:{}", msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.cat"),
            exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
    ))
    public void  process2(String msg){
        log.info("cat-收到消息:{}", msg);
    }
}

测试结果如下文章来源地址https://www.toymoban.com/news/detail-717511.html

smallJ.RabbitMqTest   : 消息成功投递到交换机!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver   : dog-收到消息:路由模式测试-dog
c.g.small.rabbitmq.TopicRabbitReceiver   : cat-收到消息:路由模式测试-dog

到了这里,关于rabbitmq消息可靠性之消息回调机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ --- 消息可靠性

    消息队列在使用过程中,面临着很多实际问题需要思考:      消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 co

    2024年02月14日
    浏览(46)
  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(48)
  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(69)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(51)
  • RabbitMQ高级篇---消息可靠性

    1、消息可靠性: 消息从发送到消费者接受,会经历多个过程,每个消息传递的过程都可能导致消息的丢失: 常见的丢失原因: 发送时消息丢失原因: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 Rab

    2024年01月20日
    浏览(47)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(46)
  • RabbitMQ消息可靠性问题及解决

    说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 针对以上问题,提供以下解决方案: 消息确认:确认消息是否发送到交换机、队列;

    2024年02月16日
    浏览(43)
  • 【RabbitMQ】之消息的可靠性方案

    一、数据丢失场景 二、数据可靠性方案 1、生产者丢失消息解决方案 2、MQ 队列丢失消息解决方案 3、消费者丢失消息解决方案 MQ 消息数据完整的链路为 :从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消

    2024年02月14日
    浏览(40)
  • RabbitMQ之消息的可靠性传递

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 RabbitMQ之消息的可靠性传递 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在当今的信息化时代,消息传递在企业级应用和分布式

    2024年01月19日
    浏览(41)
  • rabbitmq如何保证消息的可靠性

    RabbitMQ可以通过以下方式来保证消息的可靠性: 在发布消息时,可以设置消息的delivery mode为2,这样消息会被持久化存储在磁盘上,即使RabbitMQ服务器重启,消息也不会丢失。 可以创建持久化的队列,这样即使RabbitMQ服务器重启,队列也不会丢失。 在消费者端,可以 设置手动

    2024年01月23日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包