Seata AT模式源码解析一(Seata Server端启动流程)

这篇具有很好参考价值的文章主要介绍了Seata AT模式源码解析一(Seata Server端启动流程)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

启动类 Server

seata-server的入口类在Server类中,源码如下:

public class Server {
    /**
    * The entry point of application.
    *
    * @param args the input arguments
    * @throws IOException the io exception
    */
    public static void main(String[] args) throws IOException {
        // 获取端口,默认是8091
        int port = PortHelper.getPort(args);
        System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
        
        // create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);
        if (ContainerHelper.isRunningInContainer()) {
            logger.info("The server is running in container.");
        }
        
        //参数解析器,用来解析启动的配置,包括file.conf和registry.conf
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        ParameterParser parameterParser = new ParameterParser(args);
        
        //initialize the metrics
        MetricsManager.get().init();
        
        // 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
        
        // netty的线程池
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                                                                   NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                                                                   new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                                                                   new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        
        // 创建NettyRemotingServer实例,主要就是创建一个NettyServerBootstrap,负责与TM,RM进行通信
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
        // 监听的端口,8091
        nettyRemotingServer.setListenPort(parameterParser.getPort());
        // 初始化UUIDGenerator,UUID生成器,基于雪花算法,用于生成全局事务id,分支事务id
        // 多个Server实例配置不同的ServerNode,保证id的唯一性
        UUIDGenerator.init(parameterParser.getServerNode());
        // SessionHodler负责事务日志(状态)的持久化存储,
        // 根据不同的存储模式来创建
        SessionHolder.init(parameterParser.getStoreMode());
        
        // 创建DefaultCoordinator实例并初始化,DefaultCoordinator是TC的核心事务逻辑处理类
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();
        // 将coordinator设置为事务消息处理器,处理netty接收到的事务请求
        nettyRemotingServer.setHandler(coordinator);
        // register ShutdownHook
        ShutdownHook.getInstance().addDisposable(coordinator);
        ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
        
        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            XID.setIpAddress(NetUtil.getLocalIp());
        }
        XID.setPort(nettyRemotingServer.getListenPort());
        
        try {
            // 初始化netty,开始监听端口并阻塞在这里
            nettyRemotingServer.init();
        } catch (Throwable e) {
            logger.error("nettyServer init error:{}", e.getMessage(), e);
            System.exit(-1);
        }
        
        System.exit(0);
    }
}

在阅读源码的时候,有些源码是要细看的,但是有些源码可以大致猜测一下它的作用,就直接略过去了,抓住真正的重点去看。

SessionHolder初始化

SessionHolder负责Session的持久化,一个session对象代表一个事务。SessionHolder包含了4个session管理器,用来操作session。

// 用于获取所有的session,以及session的创建,更新和删除
private static SessionManager ROOT_SESSION_MANAGER;
// 用于获取,更新所有异步commit的session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于获取,更新所有需要重试commit的session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于获取,更新所有需要重试rollback的session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

初始化方法init

public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
    }
    StoreMode storeMode = StoreMode.get(mode);
    // 数据库存储模式,一般也是推荐用数据库
    if (StoreMode.DB.equals(storeMode)) {p
        // SPI方式加载SessionManager
        // 这里4个SessionManager都是DataBaseSessionManager类的4个不同实例
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                      new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME}); // async.commit.data 表示是用来处理异步提交
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                      new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME}); // retry.commit.data 表示是用来处理重试提交的
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                       new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); // retry.rollback.data 表示是用来处理重试回滚的
    } else if (StoreMode.FILE.equals(storeMode)) {
        String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                                                   DEFAULT_SESSION_STORE_FILE_DIR);
        if (StringUtils.isBlank(sessionStorePath)) {
            throw new StoreException("the {store.file.dir} is empty.");
        }
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                          new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                      new Class[] {String.class, String.class}, new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                      new Class[] {String.class, String.class}, new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                       new Class[] {String.class, String.class}, new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});
    } else if (StoreMode.REDIS.equals(storeMode)) {
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                      StoreMode.REDIS.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                      StoreMode.REDIS.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                       StoreMode.REDIS.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    } else {
        // unknown store
        throw new IllegalArgumentException("unknown store mode:" + mode);
    }
    reload(storeMode);
}

DefaultCoordinator初始化

DefaultCoordinator是TC的核心事务逻辑处理类,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。

public DefaultCoordinator(RemotingServer remotingServer) {
    this.remotingServer = remotingServer;
    this.core = new DefaultCore(remotingServer);
}

在DefaultCoordinator里还创建了一个DefaultCore,该类是默认的 TC 事务操作实现,DefaultCoordinator的开启、提交、回滚全局事务,注册、提交、回滚分支事务都是委托给这个类。

public DefaultCore(RemotingServer remotingServer) {
    List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
        new Class[]{RemotingServer.class}, new Object[]{remotingServer});
    if (CollectionUtils.isNotEmpty(allCore)) {
        for (AbstractCore core : allCore) {
            coreMap.put(core.getHandleBranchType(), core);
        }
    }
}

在DefaultCore构造方法里又会去通过SPI方式加载AbstractCore的实现类,类名在META-INF.services/io.seata.server.coordinator.AbstractCore文件里。
Seata AT模式源码解析一(Seata Server端启动流程)
将这4个实例缓存在DefaultCore中的coreMap里,分别是AT,TCC,SAGA和XA模式下的事务处理类。

然后调用DefaultCoordinator的初始化方法init

public void init() {
    // 处理处于回滚状态可重试的事务的定时任务
    retryRollbacking.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryRollbackingLock();
        if (lock) {
            try {
                handleRetryRollbacking();
            } catch (Exception e) {
                LOGGER.info("Exception retry rollbacking ... ", e);
            } finally {
                SessionHolder.unRetryRollbackingLock();
            }
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 处理二阶段可以重试提交的状态可重试的事务的定时任务
    retryCommitting.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.retryCommittingLock();
        if (lock) {
            try {
                handleRetryCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception retry committing ... ", e);
            } finally {
                SessionHolder.unRetryCommittingLock();
            }
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 处理二阶段异步提交的事务的定时任务
    asyncCommitting.scheduleAtFixedRate(() -> {
        // 默认都是true
        boolean lock = SessionHolder.asyncCommittingLock();
        if (lock) {
            try {
                // 处理异步提交
                handleAsyncCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception async committing ... ", e);
            } finally {
                SessionHolder.unAsyncCommittingLock();
            }
        }
    }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 检查事务的第一阶段已经超时的事务,设置事务状态为TimeoutRollbacking,
    // 该事务会由其他定时任务执行回滚操作
    timeoutCheck.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.txTimeoutCheckLock();
        if (lock) {
            try {
                timeoutCheck();
            } catch (Exception e) {
                LOGGER.info("Exception timeout checking ... ", e);
            } finally {
                SessionHolder.unTxTimeoutCheckLock();
            }
        }
    }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    // 根据unlog的保存天数调用RM删除unlog
    undoLogDelete.scheduleAtFixedRate(() -> {
        boolean lock = SessionHolder.undoLogDeleteLock();
        if (lock) {
            try {
                undoLogDelete();
            } catch (Exception e) {
                LOGGER.info("Exception undoLog deleting ... ", e);
            } finally {
                SessionHolder.unUndoLogDeleteLock();
            }
        }
    }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

主要就是创建了5个定时任务,主要用于事务的重试机制,因为分布式环境的不稳定性会造成事务处于中间状态,所以要通过不断的重试机制来实现事务的最终一致性。这里面还有一个处理二阶段异步提交的事务的定时任务。

初始化NettyRemotingServer

在上面创建了NettyRemotingServer,所以在最后需要进行初始化,开始监听端口并阻塞在这里。

@Override
public void init() {
    // 注册与Client通信的Processor
    registerProcessor();
    // 再调用父类的init
    if (initialized.compareAndSet(false, true)) {
        super.init();
    }
}

NettyRemotingServer初始化时主要做了两件事:
1、注册与Client通信的Processor,每个事务请求类型都对应一个Processor。当NettyRemotingServer接收到请求后,从注册的Processor列表中选出一个适合的Processor进行处理。

private void registerProcessor() {
    // 1. 注册核心的ServerOnRequestProcessor,即与事务处理相关的Processor,
    // 如:全局事务开始、提交,分支事务注册、反馈当前状态等。
    // getHandler就是DefaultCoordinator
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. 注册ResponseProcessor,ResponseProcessor用于处理当Server端主动发起请求时,Client端回复的消息
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
    // 3. Client端发起RM注册请求时对应的Processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. Client端发起TM注册请求时对应的Processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. Client端发送心跳请求时对应的Processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

2、调用父类AbstractNettyRemotingServer去启动Netty服务端

@Override
public void init() {
    super.init();
    // 启动Netty
    serverBootstrap.start();
}

继续调用父类AbstractNettyRemoting方法,创建一个定时任务。文章来源地址https://www.toymoban.com/news/detail-465698.html

public void init() {
    // 用于定时清除超时的请求,3s执行一次
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                if (entry.getValue().isTimeout()) {
                    futures.remove(entry.getKey());
                    entry.getValue().setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }

            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

到了这里,关于Seata AT模式源码解析一(Seata Server端启动流程)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 30.Netty源码服务端启动主要流程

    •创建 selector •创建 server socket channel •初始化 server socket channel •给 server socket channel 从 boss group 中选择一个 NioEventLoop •将 server socket channel 注册到选择的 NioEventLoop 的 selector •绑定地址启动 •注册接受连接事件(OP_ACCEPT)到 selector上 在《引导器作用:客户端和服务端启动都

    2024年02月12日
    浏览(36)
  • Seata XA模式和AT模式的区别

    Seata  XA模式和AT模式都是分布式事务解决方案Seata中的两种实现方式。 XA模式: XA模式是基于数据库的两阶段提交协议实现的一种分布式事务解决方案。 在XA模式下,Seata会将分布式事务分为全局事务和各个分支事务。 全局事务由事务协调器(TC)统一管理,分支事务由不同的

    2024年02月12日
    浏览(31)
  • gRPC源码剖析-Server启动流程

    创建一个gRPC Server代码很简单就这么两行,我们可以运行起来单步调试来学习一下gRPC Server启动流程。 Server server = ServerBuilder.forPort(50051)                .addService(new OrderServiceImpl())                .build()                .start(); server.awaitTermination(); 绑定端口 调用NettyServer

    2024年02月07日
    浏览(41)
  • 29.Netty源码之服务端启动:创建EventLoop&Selector流程

    通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓,如 Netty 的设计思想、工程技巧等,为之后继续深入研究 Netty 打下坚

    2024年02月12日
    浏览(40)
  • springboot启动流程源码解析(带流程图)

    本文自己写的(头条也有这篇文章),若有问题,请指正。 大致流程如下: 1. 初始化SpringApplication,从META-INF下的spring.factories读取 ApplicationListener/ApplicationContextInitializer 2.运行SpringApplication的run方法 3.读取项目中环境变量、jvm配置信息、配置文件信息等 4.创建Spring容器对象(

    2024年02月08日
    浏览(42)
  • Android系统启动流程 源码解析

    本文链接:https://blog.csdn.net/feather_wch/article/details/132518105 有道云脑图:https://note.youdao.com/s/GZ9d8vzO 1、整体流程 Boot Room BootLoader idle kthread init init ServiceManager zygote zygote SystemServer app 1、kernel/common/init/main.c 2、andorid.mk-android.bp编译 3、init是用户空间鼻祖 属于C、C++ Framework 1.1 启动源

    2024年02月11日
    浏览(50)
  • 分布式事务Seata实战-AT模式(注册中心为Eureka)

    大致记录Seata的AT模式下创建项目过程中需要注意的点和可能遇到的问题。 本项目是以官网的给的示例(即下图)进行创建的,以Eureka为注册中心。 官网:Seata AT 模式 | Apache Seata™ 官方代码示例:   快速启动 | Apache Seata™ 此文章涉及的项目代码链接:seata-at: 分布式事务解

    2024年01月19日
    浏览(44)
  • 聊聊Seata分布式解决方案AT模式的实现原理

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。 AT模式目前来看是Seata框架独有的一种模式,其它的分布式框架上并没有此种模式的实现。其是由二阶段提

    2024年02月05日
    浏览(42)
  • Seata分布式事务AT、TCC、SAGA、XA模式

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。 🍮实现原理 阿里SEATA独有模式,通过生成反向SQL实现数据回滚,需要在数据库额外附加UNDO_LOG表,

    2024年02月07日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包