基于RocketMQ实现分布式事务

这篇具有很好参考价值的文章主要介绍了基于RocketMQ实现分布式事务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于RocketMQ实现分布式事务

背景

在一个微服务架构的项目中,一个业务操作可能涉及到多个服务,这些服务往往是独立部署,构成一个个独立的系统。这种分布式的系统架构往往面临着分布式事务的问题。为了保证系统数据的一致性,我们需要确保这些服务中的操作要么全部成功,要么全部失败。通过使用RocketMQ实现分布式事务,我们可以协调这些服务的操作,保证数据的一致性。

功能原理

RocketMQ的分布式事务消息功能,在普通消息基础上,支持二阶段的提交。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

整个事务消息的详细交互流程如下图所示:

基于RocketMQ实现分布式事务

1、生产者将消息发送至RocketMQ服务端。

2、RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到生产者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者集群中任一生产者实例发起消息回查。

6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

7、生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

注意问题

消息类型
事务消息仅支持在MessageType为Transaction的主题使用,即事务消息只能发送至类型为事务消息的主题中。

消息消费
RocketMQ事务消息保证生产者本地事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务自行保证消息正确处理,建议消费端做好消费重试。

中间状态
RocketMQ事务消息一致性为最终一致性,即在消息提交到下游消费端处理完成之前,下游和上游事务之间的状态会不一致。因此,事务消息仅适合能接受异步执行的场景。

事务超时
RocketMQ事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

示例代码

以下为RocketMQ 4.x版本事务消息示例代码,

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;

public class RocketMqTransactionDemo {
	public static void main(String[] args) throws Exception {
		// 创建事务消息生产者
		TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
		producer.setNamesrvAddr("127.0.0.1:9876");

		// 设置事务监听器
		TransactionListener transactionListener = new MyTransactionListener();
		producer.setTransactionListener(transactionListener);

		// 设置事务回查的线程池,可以不必设置,如果不设置也会默认生成一个
		ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue <Runnable> (2000), new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("client-transaction-msg-check-thread");
				return thread;
			}
		});
		producer.setExecutorService(executorService);

		// 启动生产者
		producer.start();

		// 发送事务消息
		Message message = new Message("transaction_topic", "test_tag", "test_key", "Hello RocketMQ".getBytes());
		producer.sendMessageInTransaction(message, null);

		// 关闭生产者
		producer.shutdown();
	}
}

/**
 * 事务监听器
 */
class MyTransactionListener implements TransactionListener {
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// 执行本地事务操作
		System.out.println("执行本地事务操作,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE; // 提交事务,允许消费者消费该消息
		// return LocalTransactionState.ROLLBACK_MESSAGE;// 回滚事务,消息将被丢弃不允许消费。
		// return LocalTransactionState.UNKNOW;// 暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
	}

	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		// 检查本地事务状态
		System.out.println("检查本地事务状态,消息内容:" + new String(msg.getBody()));
		return LocalTransactionState.COMMIT_MESSAGE;
	}
}

代码解释:
1、事务消息的生产者使用TransactionMQProducer创建。
2、MyTransactionListener作为事务监听器,实现了接口TransactionListener,该接口有两个方法,分别是:

  • executeLocalTransaction
    半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
    LocalTransactionState.COMMIT_MESSAGE: 提交事务,允许消费者消费该消息。
    LocalTransactionState.ROLLBACK_MESSAGE: 回滚事务,消息将被丢弃不允许消费。
    LocalTransactionState.UNKNOW: 暂时无法判断状态,等待固定时间以后RocketMQ服务端根据回查规则向生产者进行消息回查。

  • checkLocalTransaction
    二次确认消息没有收到,RocketMQ服务端回查生产者端事务结果的方法。回查规则:本地事务执行完成后,若RocketMQ服务端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则RocketMQ服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。文章来源地址https://www.toymoban.com/news/detail-838864.html

到了这里,关于基于RocketMQ实现分布式事务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

    痛点背景 业务场景 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 之前方案 最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)30分钟,则关闭订单。 方案评估 优点:是实

    2024年02月13日
    浏览(53)
  • (快手一面)分布式系统是什么?为什么要分布式系统?分布式环境下会有哪些问题?分布式系统是如何实现事务的?

    《分布式系统原理与泛型》中这么定义分布式系统: “ 分布式系统是若干独立计算机的集合, 这些计算机对于用户来说就像单个相关系统 ”, 分布式系统(distributed system)是建立在网络之上的软件系统。 就比如:用户在使用京东这个分布式系统的时候,会感觉是在使用一

    2024年02月08日
    浏览(70)
  • Spring Boot实现分布式事务的协调和管理

    在现代的分布式系统中,往往存在多个服务协同完成一个业务操作的情况。而在这种情况下,如何保证所有服务的数据一致性成为了一个重要的问题。Spring Boot作为一个流行的Java开发框架,提供了多种方法来实现分布式事务的协调和管理。本文将介绍一些常用的方式和技术来

    2024年02月08日
    浏览(41)
  • JAVA微服务分布式事务的几种实现方式

    一致性(Consistency) :在分布式系统中所有的数据备份,在同一时刻都保持一致状态,如无法保证状态一致,直接返回错误; 可用性(Availability):在集群中一部分节点故障,也能保证客户端访问系统并得到正确响应,允许一定时间内数据状态不一致; 分区容错性(Partiti

    2024年02月12日
    浏览(51)
  • 分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)

    1、解决的应用场景是分布式事务,每个服务有独立的数据库。 2、例如:A服务的数据库是A1,B服务的数据库是B2,A服务通过feign接口调用B服务,B涉及提交数据到B2,业务是在B提交数据之后,在A服务内报错。 所以,希望B能回滚事务。这就是跨库的数据回滚 seata下载地址 注意

    2024年02月11日
    浏览(43)
  • springboot dubbo seata nacos集成 分布式事务seata实现

    官网:http://seata.io/zh-cn/docs/overview/what-is-seata.html Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 官网;https://cn.dubbo.apache.org/zh-cn/overview/what/

    2024年02月13日
    浏览(47)
  • 实现声明式锁,支持分布式锁自定义锁、SpEL和结合事务

    目录 2.实现 2.1 定义注解 2.2 定义锁接口 2.3 锁的实现 2.3.1 什么是SPI 2.3.2 通过SPI实现锁的多个实现类 2.3.3 通过SPI自定义实现锁 3.定义切面 3.1 切面实现 3.2 SpEL表达式获取动态key 3.3 锁与事务的结合 4.测试 4.1 ReentrantLock测试 4.2 RedissonClient测试 4.3 自定义锁测试 5.尾声 5.1 todo list

    2023年04月19日
    浏览(112)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(65)
  • 分布式:一文吃透分布式事务和seata事务

    什么是事务 事务是并发控制的单位,是用户定义的一个操作序列。 事务特性 原子性(Atomicity): 事务是数据库的逻辑工作单位,事务中包括的诸操作要么全做,要么全不做。 一致性(Consistency): 事务执行的结果必须是使数据库从一个一致性状态变到另一个一致性状态。一致性

    2024年02月07日
    浏览(61)
  • 【万字长文】SpringBoot整合Atomikos实现多数据源分布式事务(提供Gitee源码)

    前言:在最近的实际开发的过程中,遇到了在多数据源的情况下要保证原子性的问题,这个问题当时遇到了也是思考了一段时间,后来通过搜集大量资料与学习,最后是采用了分布式事务来解决这个问题,在讲解之前,在我往期的博客提前搭好了一个SpringBoot整合MyBatis搭建M

    2024年02月14日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包