rabbitmq | rabbitTemplate的convertAndSend部分源码解析

这篇具有很好参考价值的文章主要介绍了rabbitmq | rabbitTemplate的convertAndSend部分源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在RabbitMQ中,事务是一种确保消息发送的可靠性的机制。Spring AMQP提供了对RabbitMQ事务的抽象,而RabbitTemplate作为Spring AMQP的核心组件,提供了许多简化消息发送的方法。在这篇博客中,我们将深入探讨RabbitMQ事务机制的源码实现,以及Spring封装的RabbitTemplate的使用。

RabbitMQ事务机制

RabbitMQ的事务机制基于AMQP协议提供的事务操作,主要涉及到三个关键方法(可看Channel提供的源码):txSelect(启动事务)、basicPublish(消息发送)、txCommit(事务提交)和 txRollback(事务回滚)。下面我们将分别解释它们在队列事务中的作用。

txSelect - 启动事务

public interface Channel extends ShutdownNotifier, AutoCloseable {
  
 txSelect();

 ..........
   
}

txSelect 方法用于启动一个新的事务。在事务开始后,所有后续的消息发送操作都将在这个事务中进行。这意味着,直到事务被提交或回滚之前,这些消息都不会真正地到达消息队列。

basicPublish - 消息发送

channel.basicPublish(exchange, routingKey, properties, body);

basicPublish 方法用于发送消息到消息队列。在事务中,所有的消息发送都会被暂时缓存起来,直到事务被提交。这样可以确保只有在事务提交后,所有的消息才会真正地被发送到消息队列。

txCommit - 提交事务

channel.txCommit();

txCommit 方法用于提交当前事务。一旦调用了 txCommit,之前通过 basicPublish 发送的所有消息将被一起提交到消息队列中。

Spring RabbitTemplate事务源码解析

在Spring中,RabbitTemplate封装了RabbitMQ的事务操作,使得事务的管理更加方便。下面我们将重点关注convertAndSend 方法和 MessageProperties 的源码层面,解释事务如何开启、生效和回滚。

	@Override
	public void convertAndSend(String exchange, String routingKey, final Object object,
			@Nullable CorrelationData correlationData) throws AmqpException {

		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
	}
-----------------------------------------------------------------------------------

	@Override
	public void send(final String exchange, final String routingKey,
			final Message message, @Nullable final CorrelationData correlationData)
			throws AmqpException {
		execute(channel -> {
			doSend(channel, exchange, routingKey, message,
					(RabbitTemplate.this.returnsCallback != null
							|| (correlationData != null && StringUtils.hasText(correlationData.getId())))
							&& isMandatoryFor(message),
					correlationData);
			return null;
		}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
	}

convertAndSend 方法是 RabbitTemplate 提供的一个高级发送消息的方法,它会自动进行消息的序列化和转换。

这个方法的主要作用是通过 execute 方法执行一个 ChannelCallback,而 doSend 方法则是真正执行消息发送的逻辑。

doSend 方法:

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
        boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
    // ...(部分代码省略)
    
    // 设置 confirm 机制
    setupConfirm(channel, messageToUse, correlationData);
    
    // 如果有 userIdExpression,则设置 userId
    if (this.userIdExpression != null && messageProperties.getUserId() == null) {
        String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
        if (userId != null) {
            messageProperties.setUserId(userId);
        }
    }
    
    // 打印 debug 日志
    if (logger.isDebugEnabled()) {
        logger.debug("Publishing message [" + messageToUse
                + "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
    }
    
    // 发送消息到 RabbitMQ 服务器
    sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
    
    // 如果 channel 是本地事务,则提交事务
    if (isChannelLocallyTransacted(channel)) {
        // Transacted channel created by this template -> commit.
        RabbitUtils.commitIfNecessary(channel);
    }
}

事务相关的解释:

  1. setupConfirm 方法: 该方法用于设置消息的 Confirm 机制,确保消息被正确地投递到 Exchange。如果开启了 Confirm 机制,当消息成功到达 Exchange 时,会触发 ConfirmCallback。在这里,它主要是设置 PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY 以及注册 ConfirmCallback。

  2. isChannelLocallyTransacted 方法: 判断当前的 Channel 是否是本地事务。如果是本地事务,会调用 RabbitUtils.commitIfNecessary(channel) 来提交事务。

  3. sendToRabbit 方法: 实际上将消息发送到 RabbitMQ 服务器。这一步之后,如果开启了 Confirm 机制,就会等待服务器的 Confirm。

  4. RabbitUtils.commitIfNecessary(channel) 方法: 这是 Spring AMQP 提供的工具方法,用于提交 Channel 上的本地事务。

为什么先发送消息再执行事务提交?

这是因为 RabbitMQ 的 Confirm 机制是异步的。当调用 sendToRabbit 发送消息后,消息会先被异步地发送到 RabbitMQ 服务器。如果开启了 Confirm 机制,你可以注册一个 ConfirmCallback,在消息成功到达 Exchange 时,RabbitMQ 会异步地调用你的 ConfirmCallback。这样就能确保消息被正确地发送到了 Exchange,然后再执行事务的提交。

这样的设计是为了提高消息发送的吞吐量。如果等待每条消息都同步等待确认,会降低发送消息的速度。异步确认机制能够更好地适应高并发的场景,同时在 ConfirmCallback 中处理确认逻辑。文章来源地址https://www.toymoban.com/news/detail-804533.html

到了这里,关于rabbitmq | rabbitTemplate的convertAndSend部分源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ初级的部分面试题

            RabbitMQ是一个消息代理:它接受和转发消息。能够实现异步消息处理,达到业务解耦,错峰流控的功能。它的实现是基于消息队列(Queue)。         消息队列(Message Queue MQ)是实现应用之间数据通信的一种机制,采用先进先出的数据结构和生产者消费者设计

    2024年02月19日
    浏览(33)
  • RabbitTemplate的创建与配置

    RabbitTemplate是我们在与SpringAMQP整合的时候进行发送消息的关键类 该类提供了丰富的发送消息的方法,包括可靠性消息投递、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等同样我们需要注入到Spring容器中,然后直接使用。 在与spring整合时需要实例化,但是在

    2024年02月09日
    浏览(38)
  • 【消息通过rabbitTemplate.converAndSend发送后请求头丢失】

    目录 问题: 解决: 原因 扩展 版本:springboot版本:2.3.4.RELEASE  amqp-client:5.9.0 问题:封装的消息通过rabbitTemplate.converAndSend发送后没有请求头 排查:通过debug构建消息时数据是否完整 请求头有数据 思路:换成 send发送进行对比 代码如下: 1. send 发送 org.springframework.amqp.core

    2024年02月04日
    浏览(30)
  • java | RabbitTemplate消息模板发送消息收不到、队列不存在

    RabbitTemplate的配置和使用就不介绍了,就说一下遇到的问题 问题:RabbitTemplate发送消息时候,如果队列不存在,它收不到消息,也不报错,对于新手或者调试的时候会有干扰 解决办法:简单粗暴,发送消息前先查一下要发送的队列存不存在,不存在就创建再发送

    2024年02月14日
    浏览(55)
  • 王力面试(未完全解析)(部分)

    Java对象的创建过程?参考答案 1 , 2 , 3 : 检查类是否已经被加载; 为对象分配内存空间; 为实例变量赋默认值(0或者null);(附1这段代码可证明此步早于实例变量初始化 ) 设置对象头 执行内部生成的 init 方法进行初始化 其中第5步的具体步骤为:递归调用超类构造器----实

    2024年02月13日
    浏览(25)
  • 华为DATACOM认证HCIE-891-部分试题解析-179题部分重复题目

    1.【单选题】1分 如图所示是某位网络工程师在排查OSPF故障时的输出信息。据此判断,以下哪种原因可能导致邻接关系无法正常建立?   A Hello报文发送间不一致 B认证密码不一致 C接口的IP地址掩码不一致 D 区域类型不一致 正确答案: C 【答案解析】 直接看图中的非0数字就能

    2024年02月03日
    浏览(48)
  • 上海联影面试(部分)(未完全解析)

    Spring Boot为什么可以自启动,且变成一个web项目? 本地连不上网,Maven缺一个jar包,怎么解决? linux用什么命令找到占用指定端口的进程,并杀掉?Answer by new bing: 查找被占用端口的PID: sudo netstat -nlp | grep :端口号 杀掉这个进程: sudo kill -9 PID redis的线程模型是怎样的?为什么

    2024年02月02日
    浏览(28)
  • 部分地区解析速度慢的原因和解决方法

    大家都知道,域名解析跟网站是密切相关的。但是在实际上网中,大家会发现有些网站上网速度很快,但是有些网页打开速度非常慢。为什么会出现部分地区解析速度慢,网站打不开? 网页打开快慢问题 特别是在海外上网的朋友,经常会碰到这种情况,大部分网站都能打开,

    2024年02月10日
    浏览(35)
  • LinkedList部分底层源码分析

    JDK版本为1.8.0_271,以插入和删除元素为例,LinkedList部分源码如下: 插入删除结点的过程如图所示: 只有1个元素的LinkedList 包含4个元素的LinkedList add(E e)方法 add(int index,E e)方法 remove(Object obj)方法 remove(int index)方法

    2024年04月13日
    浏览(37)
  • JavaScript的三大组成部分是什么?JavaScript的核心组成部分解析:语法、BOM和DOM

    🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐 🌊 《100天精通Golang(基础入门篇)》学会Golang语言

    2024年02月10日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包