RocketMQ事务消息机制

这篇具有很好参考价值的文章主要介绍了RocketMQ事务消息机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。

一、事务消息的实现步骤

@rocketmqtransactionlistener,java,分布式,架构

事务消息发送步骤:
1. 发送方将半事务消息发送至RocketMQ服务端。
2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。
3. 发送方开始执行本地事务逻辑。
4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:
1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

二、程序实现

       事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到RocketMQ服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。

       此外,要使该类生效,还需要加@RocketMQTransactionListener注解。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现"illegal state"的异常,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。

       既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。

//发送半事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
		topicAndTag,
		MessageBuilder.withPayload(msg)
			.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
			.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
			.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
			.build(),
		def
);

其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。

@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			//保存消息记录
			String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
			JSONObject jsonBody = JSONObject.parseObject(body);
			BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
			TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
			ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
			String[] tags = def.getMsgTags();
			if(tags !=null && tags.length > 0) {
				StringBuilder tag = new StringBuilder();
				for(int i = 0; i<tags.length; i++) {
					tag.append(tags[0]);
					if(i != tags.length-1) {
						tag.append("||");
					}
				}
				producerLog.setMsgTag(tag.toString());
			}
			producerLog.setBizId(dto.getBizId());
			producerLog.setTxId(dto.getTxId());
			producerLog.setBizType(dto.getBizType());
			producerLog.setGroupName(dto.getProducerGroup());
			producerLog.setMsgBody(body);
			producerLogService.save(producerLog);
			//执行事务方法
			SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

 放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。文章来源地址https://www.toymoban.com/news/detail-723016.html

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		try {
			String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); 
			String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
			Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
			//执行检查方法
			Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
			if(ret.booleanValue())
				return RocketMQLocalTransactionState.COMMIT;
			else
				return RocketMQLocalTransactionState.ROLLBACK;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

到了这里,关于RocketMQ事务消息机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式事务,zookeeper,dubbo,rocketmq

    CAP理论是分布式领域中非常重要的一个指导理论,C(Consistency)表示强一致性,A(Availability)表示可用性,P(Partition Tolerance)表示分区容错性,CAP理论指出在目前的硬件条件下,一个分布式系统是必须要保证分区容错性的,而在这个前提下,分布式系统要么保证CP,要么保

    2024年04月12日
    浏览(46)
  • RocketMQ分布式事务 -> 最终一致性实现

    · 分布式事务的问题常在业务与面试中被提及, 近日摸鱼看到这篇文章, 阐述的非常通俗易懂, 固持久化下来我博客中, 也以便于我二刷 转载源 : 基于RocketMQ分布式事务 - 完整示例 本文代码不只是简单的demo,考虑到一些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务

    2024年02月15日
    浏览(56)
  • RocketMQ教程-(5)-功能特性-事务消息

    事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。 事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。 以电商交易场景为例

    2024年02月15日
    浏览(47)
  • SpringCloudAlibaba集成RocketMQ实现分布式事务事例(一)

    业务需求 用户请求订单微服务 order-service 接口删除订单(退货),删除订单时需要调用 account-service的方法给账户增加余额,一个典型的分布式事务问题。 代码实现 事务消息有三种状态: TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息 TransactionStatus.Roll

    2024年02月13日
    浏览(107)
  • RocketMQ 事务消息 原理及使用方法解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年3月24日 🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只

    2024年01月25日
    浏览(38)
  • 分布式消息队列RocketMQ概念详解

    目录 1.MQ概述 1.1 RocketMQ简介 1.2 MQ用途 1.3 常见MQ产品 2.RocketMQ 基本概念 2.1 消息 2.2 主题 2.3 标签 2.4 队列  2.5 Producer 2.6 Consumer 2.7 NameServer 2.8 Broker 2.9 RocketMQ 工作流程   RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息

    2024年02月03日
    浏览(65)
  • 分布式消息中间件RocketMQ的应用

    所有代码同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息发送分类 ​ Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。 同步发送消息 ​ 同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。

    2024年02月05日
    浏览(87)
  • RocketMQ 消息消费 轮询机制 PullRequestHoldService

    先来看看 RocketMQ 消费过程中的轮询机制是啥。首先需要补充一点消费相关的前置知识。 RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式 Pull 模式:用户自己进行消息的拉取和消费进度的更新 Push 模式:Broker 将新的消息自动发送给用户进行消费 我们一般使用 RocketMQ 时用

    2024年02月13日
    浏览(45)
  • RocketMQ消息ACK机制及消费进度管理

    consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的(RocketMQ有保证消息肯定消费成功的特性(失败则重试)? 本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。 由于以上工作所有的机制

    2023年04月08日
    浏览(71)
  • 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

    痛点背景 业务场景 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 之前方案 最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)30分钟,则关闭订单。 方案评估 优点:是实

    2024年02月13日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包