【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

这篇具有很好参考价值的文章主要介绍了【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势,spring boot,java-rabbitmq,rabbitmq

我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势

I. 配置

首先创建一个 SpringBoot 项目,用于后续的演示

  • springboot 版本为2.2.1.RELEASE

  • rabbitmq 版本为 3.7.5 

依赖配置文件 pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/><!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

application.yml配置文件中,添加 rabbitmq 的相关属性

spring:
  rabbitmq:
    virtual-host:/
    username:admin
    password:admin
    port:5672
    host:127.0.0.1

II. 消息确认机制

本节来看一下消息确认机制的使用姿势,首先有必要了解一下什么是消息确认机制

1. 定义

简单来讲就是消息发送之后,需要接收到 RabbitMq 的正确反馈,然后才能判断消息是否正确发送成功;

一般来说,RabbitMq 的业务逻辑包括以下几点

  • 生产者将信道设置成 Confirm 模式,一旦信道进入 Confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(以 confirm.select 为基础从 1 开始计数)

  • 一旦消息被投递到所有匹配的队列之后,Broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了

  • 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

  • Broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号(此外 Broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理)

2. 基本使用 case

从上面的解释,可以知道发送消息端,需要先将信道设置为 Confirm 模式,RabbitProperties配置类中,有个属性,正好是用来设置的这个参数的,所以我们可以直接在配置文件application.yml中,添加下面的配置

spring:
  rabbitmq:
    # 在2.2.1.release版本中,下面这个配置属于删除状态,推荐使用后一种配置方式
    # publisher-confirms: true
    publisher-confirm-type:correlated
    # 下面这个配置,表示接收mq返回的确认消息
    publisher-returns:true

上面配置完毕之后,直接使用 RabbitTemplate 发送消息,表示已经支持 Confirm 模式了,但实际的使用,会有一点点区别,我们需要接收 mq 返回的消息,发送失败的回调(以实现重试逻辑等),所以一个典型的发送端代码可以如下

@Service
publicclass AckPublisher implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 接收发送后确认信息
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("ack send succeed: " + correlationData);
        } else {
            System.out.println("ack send failed: " + correlationData + "|" + cause);
        }
    }

    /**
     * 发送失败的回调
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("ack " + message + " 发送失败");
    }


    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    public String publish(String ans) {
        String msg = "ack msg = " + ans;
        System.out.println("publish: " + msg);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
        return msg;
    }
}

请注意上面的实现,首先需要给 RabbitTemplate 设置回调,这两个不可或缺

rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);

 

3. 手动配置方式

上面利用的是标准的 SpringBoot 配置,一般来说是适用于绝大多数的场景的;当不能覆盖的时候,还可以通过手动的方式来定义一个特定的 RabbitTemplate(比如一个项目中,只有某一个场景的消息发送需要确认机制,其他的默认即可,所以需要区分 RabbitTemplate)

在自动配置类中,可以手动的注册一个 RabbitTemplate 的 bean,来专职消息确认模式的发送

@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;

@Bean
public RabbitTemplate ackRabbitTemplate() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    // 设置ack为true
    connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    connectionFactory.setPublisherReturns(true);
    returnnew RabbitTemplate(connectionFactory);
}

至于使用姿势,和前面完全一致,只是将rabbitTemplate换成ackRabbitTemplate

III. 事务机制

消息确认机制属于异步模式,也就是说一个消息发送完毕之后,不待返回,就可以发送另外一条消息;这里就会有一个问题,publisher 先后发送 msg1, msg2,但是对 RabbitMq 而言,接收的顺序可能是 msg2, msg1;所以消息的顺序可能会不一致

所以有了更加严格的事务机制,它属于同步模式,发送消息之后,等到接收到确认返回之后,才能发送下一条消息

1. 事务使用方式

首先我们定义一个事务管理器

/**
 * 配置rabbitmq事务
 *
 * @param connectionFactory
 * @return
 */
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    returnnew RabbitTransactionManager(connectionFactory);
}

@Bean
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
    returnnew RabbitTemplate(connectionFactory);
}

事务机制的使用姿势,看起来和上面的消息确认差不多,无非是需要添加一个@Transactional注解罢了

@Service
publicclass TransactionPublisher implements RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate transactionRabbitTemplate;

    @PostConstruct
    public void init() {
        // 将信道设置为事务模式
        transactionRabbitTemplate.setChannelTransacted(true);
        transactionRabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("事务 " + message + " 发送失败");
    }

    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public String publish(String ans) {
        String msg = "transaction msg = " + ans;
        System.out.println("publish: " + msg);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
        return msg;
    }
}

请注意,核心代码设置信道为事务模式必不可少

// 将信道设置为事务模式
transactionRabbitTemplate.setChannelTransacted(true);

IV. 测试

我们这里主要测试一下事务和消息确认机制的性能对比吧,从定义上来看消息确认机制效率更高,我们简单的对比一下

@RestController
publicclass PubRest {
    @Autowired
    private AckPublisher ackPublisher;
    @Autowired
    private TransactionPublisher transactionPublisher;

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    @GetMapping(path = "judge")
    public boolean judge(String name) {
        for (int i = 0; i < 10; i++) {
            long start = System.currentTimeMillis();
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            ackPublisher.publish(name + atomicInteger.getAndIncrement());
            long mid = System.currentTimeMillis();
            System.out.println("ack cost: " + (mid - start));

            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            transactionPublisher.publish(name + atomicInteger.getAndIncrement());
            System.out.println("transaction cost: " + (System.currentTimeMillis() - mid));
        }
        returntrue;
    }
}

去掉无关的输出,仅保留耗时,对比如下(差距还是很明显的)文章来源地址https://www.toymoban.com/news/detail-801479.html

ack cost: 5
transaction cost: 111

ack cost: 3
transaction cost: 108

ack cost: 2
transaction cost: 101

ack cost: 3
transaction cost: 107

ack cost: 14
transaction cost: 106

ack cost: 2
transaction cost: 140

ack cost: 4
transaction cost: 124

ack cost: 4
transaction cost: 131

ack cost: 4
transaction cost: 129

ack cost: 2
transaction cost: 99

到了这里,关于【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(44)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(45)
  • Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

    确认消息是否发送给交换机 配置 编码RabbitTemplate.ConfirmCallback ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器, 也就是只确认是否正确到达 Exchange 中。 在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true); 配置类

    2024年02月20日
    浏览(46)
  • RabbitMQ的几种消息确认机制详细介绍

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的几种消息确认机制。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大家好,我是

    2023年04月25日
    浏览(55)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(39)
  • RabbitMQ消息可靠性投递与ACK确认机制

    什么是消息的可靠性投递 保证消息百分百发送到消息队列中去 保证MQ节点成功接收消息 消息发送端需要接收到MQ服务端接收到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者–交换机–队列–消费者 通过两个节点控制

    2024年02月20日
    浏览(51)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(37)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(89)
  • springBoot-rabbitMq手动确认消息

    代码基础怎么写我就不说了,看我的另一篇博客 springBoot整合RabbitMQ(Demo)_我要用代码向我喜欢的女孩表白的博客-CSDN博客 假设你要手动ack,怎么做呢? 通常自动是,mq发给服务端,服务端收到了就当处理过了,但是我们要保证数据不丢失。所以得处理完了,才告诉mq说我做

    2024年02月11日
    浏览(34)
  • 基于springboot实现的rabbitmq消息确认

    RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费

    2024年02月06日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包