【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理

这篇具有很好参考价值的文章主要介绍了【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

分布式事务概念

讨论分布式事务之前我们分清两个概念:本地事务分布式事务

本地事务是解决单个数据源上的数据操作的一致性问题的话,而分布式事务则是为了解决跨越多个数据源上数据操作的一致性问题。

百度官方对分布式事务的定义是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

也就是说我们在操作一个业务逻辑过程中,涉及两个数据源(A、B),且很多时候A、B这两个数据源属于两个不同的物理环境。当我们操作A数据源过程中出现异常情况,那么必须让针对B数据源的操作回滚,同时A数据源的操作也回滚。

在Java开发过程中事务一般使用Spring为我们提供了方便的声明式事务方法@transactional。但是默认的Spring事务只支持单数据源,而实际上一个系统往往需要写多个数据源,这个时候我们就需要考虑如何通过Spring实现对分布式事务的支持。
SpringBoot官方提供推荐了Atomikos和 Bitronix两种无需服务器支持的分布式事务组件

JAVA领域中针对分布式事务的解决方案就是JTA(即Java Transaction API);

XA和JTA概述

XA 是由 X/Open 组织提出的分布式事务的一种协议(或者称之为分布式架构)。它主要定义了两部分的管理器,全局事务管理器及资源管理器。在 XA 的设计理念中,把不同资源纳入到一个事务管理器进行统一管理,例如数据库资源,消息中间件资源等,从而进行全部资源的事务提交或者取消,目前主流的数据库,消息中间件都支持 XA 协议。

JTA 叫做 Java Transaction API,它是 XA 协议的 JAVA 实现。目前在 JAVA 里面,关于 JTA 的定义主要是两部分

  • 事务管理器接口-----javax.transaction.TransactionManager
  • 资源管理器接口-----javax.transaction.xa.XAResource

在一般应用采用 JTA 接口实现事务,需要一个外置的 JTA 容器来存储这些事务,像 Tomcat。今天我们要讲的是 Atomikos,它是一个独立实现了 JTA 的框架,能够在我们的应用服务器中运行 JTA 事务。

SpringBoot集成atomikos

【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理,# SpringBoot,spring boot,分布式,后端

数据库结构

【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理,# SpringBoot,spring boot,分布式,后端

CREATE TABLE `tb_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  `name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '商品名称',
  `price` bigint DEFAULT NULL COMMENT '商品价格',
  `num` int DEFAULT '0' COMMENT '商品数量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=137 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

添加了username为唯一索引,方便后面测试多数据插库异常事务回滚

CREATE TABLE `tb_user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '收件人',
  `address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_uername` (`username`)
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

pom

技术栈 版本号
springboot 2.3.2.RELEASE
druid 1.1.10
mysql驱动 8.0.33
mybatis-plus 3.1.1
<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.3.2.RELEASE</version>
</parent>

<dependencies>
    <!-- druid-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.10</version>
    </dependency>
    <!-- druid-->
    <!-- mysql-connector-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
       <version>8.0.33</version>
    </dependency>
    <!-- mysql-connector-->
    <!-- mybatis-plus-->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.1.1</version>
    </dependency>
    <!-- mybatis-plus-->

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.25</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>32.0.0-jre</version>
    </dependency>
</dependencies>


<build>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include>**/*.xml</include>
            </includes>
            <filtering>false</filtering>
        </resource>
    </resources>
</build>

通用工具类

R

/**
 * @ClassName: R
 * @Description: 统一返回实体
 */
@Getter
@Setter
@SuppressWarnings({"AlibabaClassNamingShouldBeCamel"})
@Accessors(chain = true)
public class R<T> {
    public static final String DEF_ERROR_MESSAGE = "系统繁忙,请稍候再试";
    public static final String HYSTRIX_ERROR_MESSAGE = "请求超时,请稍候再试";
    public static final int SUCCESS_CODE = 0;
    public static final int FAIL_CODE = -1;
    public static final int TIMEOUT_CODE = -2;
    /**
     * 统一参数验证异常
     */
    public static final int VALID_EX_CODE = -9;
    public static final int OPERATION_EX_CODE = -10;
    /**
     * 调用是否成功标识,0:成功,-1:系统繁忙,此时请开发者稍候再试 详情见[ExceptionCode]
     */
    private int code;

    /**
     * 调用结果
     */
    private T data;

    /**
     * 结果消息,如果调用成功,消息通常为空T
     */
    private String msg = "ok";


    private String path;
    /**
     * 附加数据
     */
    private Map<String, Object> extra;

    /**
     * 响应时间
     */
    private long timestamp = System.currentTimeMillis();

    private R() {
        super();
    }

    public R(int code, T data, String msg) {
        this.code = code;
        this.data = data;
        this.msg = msg;
    }

    public static <E> R<E> result(int code, E data, String msg) {
        return new R<>(code, data, msg);
    }

    /**
     * 请求成功消息
     *
     * @param data 结果
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data) {
        return new R<>(SUCCESS_CODE, data, "ok");
    }

    public static R<Boolean> success() {
        return new R<>(SUCCESS_CODE, true, "ok");
    }

    /**
     * 请求成功方法 ,data返回值,msg提示信息
     *
     * @param data 结果
     * @param msg  消息
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data, String msg) {
        return new R<>(SUCCESS_CODE, data, msg);
    }

    /**
     * 请求失败消息
     *
     * @param msg
     * @return
     */
    public static <E> R<E> fail(int code, String msg) {
        return new R<>(code, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> fail(String msg) {
        return fail(OPERATION_EX_CODE, msg);
    }

    public static <E> R<E> fail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(OPERATION_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> fail(BaseExceptionCode exceptionCode) {
        return validFail(exceptionCode);
    }

    public static <E> R<E> fail(BizException exception) {
        if (exception == null) {
            return fail(DEF_ERROR_MESSAGE);
        }
        return new R<>(exception.getCode(), null, exception.getMessage());
    }

    /**
     * 请求失败消息,根据异常类型,获取不同的提供消息
     *
     * @param throwable 异常
     * @return RPC调用结果
     */
    public static <E> R<E> fail(Throwable throwable) {
        return fail(FAIL_CODE, throwable != null ? throwable.getMessage() : DEF_ERROR_MESSAGE);
    }

    public static <E> R<E> validFail(String msg) {
        return new R<>(VALID_EX_CODE, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> validFail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(VALID_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> validFail(BaseExceptionCode exceptionCode) {
        return new R<>(exceptionCode.getCode(), null,
                (exceptionCode.getMsg() == null || exceptionCode.getMsg().isEmpty()) ? DEF_ERROR_MESSAGE : exceptionCode.getMsg());
    }

    public static <E> R<E> timeout() {
        return fail(TIMEOUT_CODE, HYSTRIX_ERROR_MESSAGE);
    }


    public R<T> put(String key, Object value) {
        if (this.extra == null) {
            this.extra = Maps.newHashMap();
        }
        this.extra.put(key, value);
        return this;
    }

    /**
     * 逻辑处理是否成功
     *
     * @return 是否成功
     */
    public Boolean getIsSuccess() {
        return this.code == SUCCESS_CODE || this.code == 200;
    }

    /**
     * 逻辑处理是否失败
     *
     * @return
     */
    public Boolean getIsError() {
        return !getIsSuccess();
    }

    @Override
    public String toString() {
        return JSONObject.toJSONString(this);
    }
}

BaseController

/**
 * @ClassName: BaseController
 * @Description: controller 抽象基类
 */
public abstract class BaseController {
    /**
     * 成功返回
     *
     * @param data
     * @return
     */
    public <T> R<T> success(T data) {
        return R.success(data);
    }

    public R<Boolean> success() {
        return R.success();
    }

    /**
     * 失败返回
     *
     * @param msg
     * @return
     */
    public <T> R<T> fail(String msg) {
        return R.fail(msg);
    }

    public <T> R<T> fail(String msg, Object... args) {
        return R.fail(msg, args);
    }

    /**
     * 失败返回
     *
     * @param code
     * @param msg
     * @return
     */
    public <T> R<T> fail(int code, String msg) {
        return R.fail(code, msg);
    }

    public <T> R<T> fail(BaseExceptionCode exceptionCode) {
        return R.fail(exceptionCode);
    }

    public <T> R<T> fail(BizException exception) {
        return R.fail(exception);
    }

    public <T> R<T> fail(Throwable throwable) {
        return R.fail(throwable);
    }

    public <T> R<T> validFail(String msg) {
        return R.validFail(msg);
    }

    public <T> R<T> validFail(String msg, Object... args) {
        return R.validFail(msg, args);
    }

    public <T> R<T> validFail(BaseExceptionCode exceptionCode) {
        return R.validFail(exceptionCode);
    }
}

BaseExceptionCode


/**
 * @ClassName: BaseExceptionCode
 * @Description: 公共异常编码类
 */
public interface BaseExceptionCode {
    /**
     * 异常编码
     *
     * @return
     */
    int getCode();

    /**
     * 异常消息
     * @return
     */
    String getMsg();
}

ExceptionCode

/**
 * 全局错误码 10000-15000
 * <p>
 * 预警异常编码    范围: 30000~34999
 * 标准服务异常编码 范围:35000~39999
 * 邮件服务异常编码 范围:40000~44999
 * 短信服务异常编码 范围:45000~49999
 * 权限服务异常编码 范围:50000-59999
 * 文件服务异常编码 范围:60000~64999
 * 日志服务异常编码 范围:65000~69999
 * 消息服务异常编码 范围:70000~74999
 * 开发者平台异常编码 范围:75000~79999
 * 搜索服务异常编码 范围:80000-84999
 * 共享交换异常编码 范围:85000-89999
 * 移动终端平台 异常码 范围:90000-94999
 * <p>
 * 安全保障平台    范围:        95000-99999
 * 软硬件平台 异常编码 范围:    100000-104999
 * 运维服务平台 异常编码 范围:  105000-109999
 * 统一监管平台异常 编码 范围:  110000-114999
 * 认证方面的异常编码  范围:115000-115999
 *
 */
public enum ExceptionCode implements BaseExceptionCode {

    //系统相关 start
    SUCCESS(0, "成功"),
    SYSTEM_BUSY(-1, "系统繁忙~请稍后再试~"),
    SYSTEM_TIMEOUT(-2, "系统维护中~请稍后再试~"),
    PARAM_EX(-3, "参数类型解析异常"),
    SQL_EX(-4, "运行SQL出现异常"),
    NULL_POINT_EX(-5, "空指针异常"),
    ILLEGALA_ARGUMENT_EX(-6, "无效参数异常"),
    MEDIA_TYPE_EX(-7, "请求类型异常"),
    LOAD_RESOURCES_ERROR(-8, "加载资源出错"),
    BASE_VALID_PARAM(-9, "统一验证参数异常"),
    OPERATION_EX(-10, "操作异常"),


    OK(200, "OK"),
    BAD_REQUEST(400, "错误的请求"),
    /**
     * {@code 401 Unauthorized}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7235#section-3.1">HTTP/1.1: Authentication, section 3.1</a>
     */
    UNAUTHORIZED(401, "未经授权"),
    /**
     * {@code 404 Not Found}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7231#section-6.5.4">HTTP/1.1: Semantics and Content, section 6.5.4</a>
     */
    NOT_FOUND(404, "没有找到资源"),
    METHOD_NOT_ALLOWED(405, "不支持当前请求类型"),

    TOO_MANY_REQUESTS(429, "请求超过次数限制"),
    INTERNAL_SERVER_ERROR(500, "内部服务错误"),
    BAD_GATEWAY(502, "网关错误"),
    GATEWAY_TIMEOUT(504, "网关超时"),
    //系统相关 end

    REQUIRED_FILE_PARAM_EX(1001, "请求中必须至少包含一个有效文件"),
    //jwt token 相关 start

    JWT_TOKEN_EXPIRED(40001, "会话超时,请重新登录"),
    JWT_SIGNATURE(40002, "不合法的token,请认真比对 token 的签名"),
    JWT_ILLEGAL_ARGUMENT(40003, "缺少token参数"),
    JWT_GEN_TOKEN_FAIL(40004, "生成token失败"),
    JWT_PARSER_TOKEN_FAIL(40005, "解析token失败"),
    JWT_USER_INVALID(40006, "用户名或密码错误"),
    JWT_USER_ENABLED(40007, "用户已经被禁用!"),
    //jwt token 相关 end

    ;

    private int code;
    private String msg;

    ExceptionCode(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    @Override
    public int getCode() {
        return code;
    }

    @Override
    public String getMsg() {
        return msg;
    }


    public ExceptionCode build(String msg, Object... param) {
        this.msg = String.format(msg, param);
        return this;
    }

    public ExceptionCode param(Object... param) {
        msg = String.format(msg, param);
        return this;
    }
}


BaseException

/**
 * @ClassName: BaseException
 * @Description: 异常接口类
 */
public interface BaseException {

    /**
     * 统一参数验证异常码
     */
    int BASE_VALID_PARAM = -9;

    /**
     * 返回异常信息
     *
     * @return
     */
    String getMessage();

    /**
     * 返回异常编码
     *
     * @return
     */
    int getCode();
}

BaseUncheckedException

/**
 * @ClassName: BaseUncheckedException
 * @Description: 非运行期异常基类,所有自定义非运行时异常继承该类
 */
public class BaseUncheckedException extends RuntimeException implements BaseException {

    private static final long serialVersionUID = -778887391066124051L;

    /**
     * 异常信息
     */
    protected String message;

    /**
     * 具体异常码
     */
    protected int code;

    public BaseUncheckedException(int code, String message) {
        super(message);
        this.code = code;
        this.message = message;
    }

    public BaseUncheckedException(int code, String format, Object... args) {
        super(String.format(format, args));
        this.code = code;
        this.message = String.format(format, args);
    }


    @Override
    public String getMessage() {
        return message;
    }
    @Override
    public int getCode() {
        return code;
    }
}

BizException

/**
 * @ClassName: BizException
 * @Description: 业务异常 用于在处理业务逻辑时,进行抛出的异常。
 */
public class BizException extends BaseUncheckedException {

    private static final long serialVersionUID = -3843907364558373817L;

    public BizException(String message) {
        super(-1, message);
    }

    public BizException(int code, String message) {
        super(code, message);
    }

    public BizException(int code, String message, Object... args) {
        super(code, message, args);
    }

    /**
     * 实例化异常
     *
     * @param code    自定义异常编码
     * @param message 自定义异常消息
     * @param args    已定义异常参数
     * @return
     */
    public static BizException wrap(int code, String message, Object... args) {
        return new BizException(code, message, args);
    }

    public static BizException wrap(String message, Object... args) {
        return new BizException(-1, message, args);
    }

    public static BizException validFail(String message, Object... args) {
        return new BizException(-9, message, args);
    }

    public static BizException wrap(BaseExceptionCode ex) {
        return new BizException(ex.getCode(), ex.getMsg());
    }

    @Override
    public String toString() {
        return "BizException [message=" + message + ", code=" + code + "]";
    }
}

application.yml

spring:
  datasource:
    druid:
      order:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root
      user:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root



mybatis-plus:
  #mybatis日志
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

logging:
  level:
    cn.zysheep.dao: debug

数据源配置类

OrderXADataSourceConfig

/**
 * @ClassName: OrderXADataSourceConfig
 * @Description: mybatis配置类 Order
 */
@Configuration
@MapperScan(basePackages = OrderXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.order";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/order/*Mapper.xml";

    /**
     * 将这个对象放入spring容器中(交给Spring管理)
     * @ConfigurationProperties 自动配置属性
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.order")
    public XADataSource getDataSourceOrder(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    /**
     * 创建Atomikos数据源
     * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
     * @param xaDataSource
     * @return
     */
    @Bean
    @DependsOn("getDataSourceOrder")
    @Primary
    public DataSource dataSourceOrder(@Qualifier("getDataSourceOrder") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        // 必须为数据源指定唯一标识
        atomikosDataSourceBean.setUniqueResourceName("dataSourceOrder");
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);

        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory orderSqlSessionFactory(@Qualifier("dataSourceOrder") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

UserXADataSourceConfig

/**
 * @ClassName: UserXADataSourceConfig
 * @Description: mybatis配置类 User
 */
@Configuration
@MapperScan(basePackages = UserXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "userSqlSessionTemplate")
public class UserXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.user";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/user/*Mapper.xml";


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.user")
    public XADataSource getDataSourceUser(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    @Bean
    @DependsOn("getDataSourceUser")
    public DataSource dataSourceUser(@Qualifier("getDataSourceUser") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("dataSourceUser");
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory userSqlSessionFactory(@Qualifier("dataSourceUser") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate userSqlSessionTemplate(@Qualifier("userSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

1、每个数据源对应一个配置类
2、每个配置类的@MapperScan注解不一样,各自对应自己mapper接口文件夹(这就是为什么要将不同数据源的mapper接口写在不同文件夹的原因了)
3、在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定。
4、配置工厂类的时候,需要指定各自mapper.xml存放的路径(这就是为什么要将不同数据源的mapper.xml写在不同文件夹的原因了)
5、配置工厂类的时候,需要手动将分页插件加进去。因为数据源相关的自动配置被我们关闭了,创建传统PaginationInterceptor类的方法已经不好使了

实体类

Order

@Builder
@Data
@TableName("tb_order")
public class Order {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("user_id")
    private Long userId;
    @TableField("name")
    private String name;
    @TableField("price")
    private Long price;
    @TableField("num")
    private Integer num;
}

User

@Builder
@Data
@TableName("tb_user")
public class User {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("username")
    private String username;
    @TableField("address")
    private String address;
}

Mapper

OrderMapper

【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理,# SpringBoot,spring boot,分布式,后端

public interface OrderMapper extends BaseMapper<Order> {
}

配置文件 OrderMapper.xml,Mapper类路径、和配置路径必须数据源配置类的路径一致否则会报错
【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理,# SpringBoot,spring boot,分布式,后端

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.order.OrderMapper">

</mapper>

UserMapper

public interface UserMapper extends BaseMapper<User> {
}

配置文件UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.user.UserMapper">

</mapper>

Service

OrderService

public interface OrderService extends IService<Order> {
    /**
     * 保存订单
     */
    void saveOrder() throws Exception;
}
@Service
@AllArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {

    private final OrderMapper orderMapper;

    private final UserMapper userMapper;

    /**
     * 实现多数据库操作
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        // order数据源
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);

        // user数据源
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        userMapper.insert(user);

        // throw new Exception("12312");
    }
}
测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

1、正常保存

2、order数据源保存成功,user数据源保存成功,方法其他地方抛出异常,方法事务回滚

3、order数据源保存成功,user数据源保存失败,方法事务回滚

4、order数据源保存失败,user数据源保存不执行,方法事务回滚

UserService

public interface UserService extends IService<User> {
    /**
     * 保存用户
     * @throws Exception
     */
    void saveUser() throws Exception;
}
@Service
@AllArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    private final OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveUser() throws Exception {
        // order数据源  抛出异常,方法事务回滚
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);


        // 2、user数据源 抛出异常,方法事务回滚
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        saveBatch(Collections.singletonList(user));

        // 1、主方法抛出异常,方法事务回滚
        // throw new Exception("12312");
    }
}
测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

这里主要测试Mybaits-Plus提供的批量新增是否支持 atomikos多数据源分布式事务,测试是方法内部其他数据源发生异常事务是可以回滚的;文章来源地址https://www.toymoban.com/news/detail-568122.html

Controller

OrderController

@RestController
@RequestMapping("/order")
@AllArgsConstructor
public class OrderController extends BaseController {

    private final OrderService orderService;

    @PostMapping("/save")
    public R save() throws Exception {
        orderService.saveOrder();
        return success();
    }
}

UserController

@RestController
@RequestMapping("/user")
@AllArgsConstructor
public class UserController extends BaseController {
    private final UserService userService;

    @PostMapping("/batchSave")
    public R batchSave() throws Exception {
        userService.saveUser();
        return success();
    }
}

启动类

@SpringBootApplication
public class AtomikosApplication {
    public static void main(String[] args) {
        SpringApplication.run(AtomikosApplication.class, args);
    }
}

到了这里,关于【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot集成Druid实现多数据源的两种方式

    目录 项目场景: 一、集成com.baomidou的方式 1、maven依赖: 2、 配置文件: 3、 使用注解切换数据源:  二、基于AOP手动实现多数据源原生的方式 1、maven依赖: 2、 配置文件: 3、 多数据源名称类 4、自定义注解  5、配置类 6、动态数据源配置  7、AOP切面拦截注解 8、Dao层,

    2024年02月09日
    浏览(44)
  • springboot+mybatis实现多数据源

    最近做项目碰到了一个需要连4个不同数据库的需求,其中db1、db2表结构都不相同;另外两个数据库same_db_private、same_db_public表结构完全相同,一个对内一个对外,只是从物理上隔离了数据而已。 所以打算通过静态配置包路径的方式来实现db1、db2的操作,并且通过扩展Spring的

    2024年02月09日
    浏览(43)
  • springboot 多数据源的2种实现

    这里介绍2种多数据源: 固定数据源 动态数据源 业务场景:项目涉及多个数据库,比如本项目数据库、财务系统数据库、物资系统数据库。系统需要操作多个数据库。 1.1 分别定义各个数据源 第1个数据源: dataSourceMySql 第2个数据源: dataSource2 第3个数据源: dataSource3 1.2 动态

    2024年02月07日
    浏览(41)
  • MyBatis整合Springboot多数据源实现

    数据源,实际就是数据库连接池,负责管理数据库连接,在 Springboot 中,数据源通常以一个 bean 的形式存在于 IOC 容器中,也就是我们可以通过依赖注入的方式拿到数据源,然后再从数据源中获取数据库连接。 那么什么是多数据源呢,其实就是 IOC 容器中有多个数据源的 bea

    2023年04月22日
    浏览(65)
  • SpringBoot通过自定义注解实现多数据源

    ✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏: MySQL学习 🥭本文内容:SpringBoot通过自定义注解实现多数据源 📚个人知识库: Leo知识库,欢迎大家访问 大家好

    2024年02月03日
    浏览(60)
  • SpringBoot结合MyBatis实现多数据源配置

    SpringBoot框架实现多数据源操作,首先需要搭建Mybatis的运行环境。 由于是多数据源,也就是要有多个数据库,所以,我们创建两个测试数据库,分别是:【sp-demo01】和【sp-demo02】,如下图所示: 具体SQL代码: 创建【sp-demo01】数据库。 创建【sp-demo02】数据库。 MyBatis框架中,

    2024年02月09日
    浏览(38)
  • Spring | 基于SpringBoot的多数据源实战 - 使用seata实现多数据源的全局事务管理

    在软件开发中, 多数据源 的应用越来越普遍,特别是在 微服务架构 和 业务模块化 的场景下。多数据源能够让不同的业务模块和微服务拥有各自独立的数据存储,大大提高了系统的灵活性和可维护性。本文将深入探讨多数据源的配置和实施,以及在 Spring Boot 环境下,如何通

    2024年02月07日
    浏览(60)
  • 【微服务】springboot 适配多数据源设计与实现

    目录 一、问题背景 1.1 mysql读写分离 1.2 适配多种类型数据库 1.3 多数据源

    2024年02月16日
    浏览(33)
  • springboot实现多数据源配置(Druid/Hikari)

    使用springboot+mybatis-plus+(Druid/Hikari)实现多数据源配置 操作步骤: 引入相应的maven坐标 编写mybatis配置,集成mybatis或mybatis-plus(如果已集成可跳过) 编写数据源配置类 编写注解,并通过aop进行增强(编写数据源切换代码) 类或方法中使用注解,对数据源进行切换 第一步:

    2024年02月13日
    浏览(52)
  • springboot+mybatis实现mysql和oracle多数据源

    在实际项目中很多时候会涉及到多个数据库的访问,或者数据库读写分离的形式。 下面通过使用 Aspect+注解来实现mysql+oracle的多数据源配置(注意:事务一致性未提供) 首先要去oracle官网下载ojdbc的jar包,根据oracle的版本去下载,或者在下载的oracle的jdbc包下的lib里面有,然后

    2024年02月07日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包