深入探讨Seata RPC模块的设计与实现

这篇具有很好参考价值的文章主要介绍了深入探讨Seata RPC模块的设计与实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Seata中,TM,RM与TC都需要进行跨进程的网络调用,通常来说就会需要RPC来支持远程调用,而Seata内部就有自身基于Netty的RPC实现,这里我们就来看下Seata是如何进行RPC设计与实现的

 RPC整体设计

深入探讨Seata RPC模块的设计与实现

 抽象基类AbstractNettyRemoting

 该类是整个RPC设计的一个顶层抽象类,其主要实现了同步发送与异步发送的功能,此时在这里还没区分客户端与服务端,因为无论是客户端还是服务端,双方都应该有发送数据的功能

(1)同步请求sendSync 
/**
 * 发送一个同步的rpc请求
 *
 * @param channel       netty的channel对象
 * @param rpcMessage    rpc的消息载体对象
 * @param timeoutMillis rpc超时时间
 * @return 响应的数据对象
 * @throws TimeoutException 请求超时异常
 */
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    if (timeoutMillis <= 0) {
        throw new FrameworkException("timeout should more than 0ms");
    }
    if (channel == null) {
        LOGGER.warn("sendSync nothing, caused by null channel.");
        return null;
    }

    // 创建一个MessageFuture同步发送辅助对象,并放到futures集合中
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);

    // 检查该channel是否可写(channel中有写缓冲区,如果写缓冲区达到了一定的水位此时的状态就是不可写)
    channelWritableCheck(channel, rpcMessage.getBody());
    // 获取要发送的目标ip地址
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    // 执行rpc发送前的钩子方法
    doBeforeRpcHooks(remoteAddr, rpcMessage);

    // 发送数据
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        // 条件成立:说明发送失败
        if (!future.isSuccess()) {
            // 把这个MessageFuture从futures集合中清除
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                // 设置发送异常类型
                messageFuture1.setResultMessage(future.cause());
            }

            // 销毁channel
            destroyChannel(future.channel());
        }
    });

    try {
        // 由于netty是异步发送的,所以为了达到同步发送的效果,在这里会进行阻塞,当接收端发送响应之后才会解除阻塞,这是一种常用的异步转同步的方式
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        // 执行rpc发送后的钩子方法
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        // 返回响应的数据对象
        return result;
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
                     rpcMessage.getBody());
        // 等待响应超时,请求已经发出去了
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        }
            // 发送请求异常,请求还没发出去
        else {
            throw new RuntimeException(exx);
        }
    }
}
  1. 构建MessageFuture对象,RpcMessage的id作为key,MessageFuture作为value放到futures这个map中
  2. 检查下发送的目标channel是否是可写的
  3. 执行钩子的发送前置方法
  4. 发送数据,如果发送失败就把RpcMessage从futures中进行移除,并且销毁这个channel
  5. 通过MessageFuture的get方法进行阻塞,同步获取响应结果

这里着重关注一下MessageFuture,通常来说如果我们基于Netty去实现RPC的话,由于Netty是异步发送请求的,所以单纯通过Netty我们是不能做到同步请求的效果的,而一般异步转同步的解决方案就是使用Java的CountdownLatch(RocketMQ是以CountdownLatch实现的)或者CompleteFuture来进行同步阻塞。比如使用CompleteFuture的话具体的实现流程就是创建一个CompleteFuture对象,然后把这个对象先缓存到map中(key为请求id,value为这个CompleteFuture对象),接着调用netty的发送数据,调用完之后通过CompleteFuture的get方法进行同步阻塞,阻塞到什么时候呢?当接收到对方的响应请求的时候,会根据响应请求的请求id(响应请求id与发送请求id是相同的)去从map中找到对应的CompleteFuture对象,然后把响应结果作为参数去调用CompleteFuture对象的complete方法,根据CompleteFuture的特性,此时同步阻塞的get方法就会解除阻塞并获取到上面传入的响应结果。正好Seata实现异步转同步就是使用CompleteFuture的方式,每一个MessageFuture对象中就包装了一个CompleteFuture,具体代码如下: 

/**
 * 通过该类的辅助去实现Netty发送数据的同步效果
 * 具体方案:在每次往接收端发送数据之前,创建一个新的MessageFuture对象然后放到一个集合中,该集合的key是该数据的唯一id,value是这个MessageFuture对象,
 * 然后当调用完netty的发送数据api之后就可以利用MessageFuture中的CompletableFuture进行阻塞,阻塞到什么时候呢?
 * 当接收端那边接收到发送过来的数据之后,就发送一个对应的响应请求过来,这个响应请求会带着发送的那个数据的唯一id,只要收到这个响应请求之后,根据唯一id去集合中
 * 寻找到对应的MessageFuture对象,然后就可以让里面的CompletableFuture对象解除阻塞,这样整个过程就实现了同步发送的效果了。该方案也是异步转同步的典型方案
 *
 * @author slievrly
 */
public class MessageFuture {

    /**
     * 同步发送的数据载体对象
     */
    private RpcMessage requestMessage;

    /**
     * 同步发送的超时时间
     */
    private long timeout;

    private long start = System.currentTimeMillis();

    private transient CompletableFuture<Object> origin = new CompletableFuture<>();

    /**
     * Is timeout boolean.
     *
     * @return the boolean
     */
    public boolean isTimeout() {
        return System.currentTimeMillis() - start > timeout;
    }

    /**
     * 同步等待获取结果值
     *
     * @param timeout 等待超时时间
     * @param unit    时间单位
     * @return 获取到的结果值
     * @throws TimeoutException 超时异常
     * @throws InterruptedException 中断异常
     */
    public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
        Object result = null;
        try {
            result = origin.get(timeout, unit);
            if (result instanceof TimeoutException) {
                throw (TimeoutException)result;
            }
        } catch (ExecutionException e) {
            throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
        } catch (TimeoutException e) {
            throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
        }

        if (result instanceof RuntimeException) {
            throw (RuntimeException)result;
        } else if (result instanceof Throwable) {
            throw new RuntimeException((Throwable)result);
        }

        return result;
    }

    /**
     * 设置结果值,有可能是个超时异常 {@link TimeoutException}
     *
     * @param obj 结果值
     */
    public void setResultMessage(Object obj) {
        origin.complete(obj);
    }
}

MessageFuture的get方法和setResultMessage方法就是分别对CompleteFuture的get方法和complete方法的封装

(2)异步请求sendAsync 
/**
 * 发送一个异步RPC请求
 *
 * @param channel    channel
 * @param rpcMessage rpc请求载体对象
 */
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
    channelWritableCheck(channel, rpcMessage.getBody());
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                     + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }

    doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);

    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            destroyChannel(future.channel());
        }
    });
}

异步发送就比较简单了,因为Netty本身就是异步发送的,所以这里就不用再做额外的处理了 

(3)删除过期的MessageFuture 
public void init() {
    // 开启一个定时任务,每3s去检查一下所有正在同步发送的MessageFuture,如果发送有超时的,则及时从futures集合中清除
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                MessageFuture future = entry.getValue();
                if (future.isTimeout()) {
                    futures.remove(entry.getKey());
                    RpcMessage rpcMessage = future.getRequestMessage();
                    future.setResultMessage(new TimeoutException(String
                                                                 .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}

上面我们知道了同步请求的实现原理,对于超时的同步请求的MessageFuture如果不进行清理,那么就会一直存在于futures这个map中,在AbstractNettyRemoting中提供了一个init方法,在init方法中会去通过一个定时器每隔3s去map中进行移除掉过期的MessageFuture 

(4)处理数据接收 

无论是客户端还是服务端,都需要对接收到的数据进行处理,所以在这一层中也对接收数据进行了统一的定义处理,代码如下: 

/**
 * 接收rpc请求
 *
 * @param ctx        ChannelHandlerContext对象
 * @param rpcMessage 接收的消息载体对象
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }

    // 获取消息内容
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;

        // 根据消息类型去获取到对应的处理器进行处理
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            if (pair.getSecond() != null) {
                try {
                    // 交给线程池去处理
                    pair.getSecond().execute(() -> {
                        try {
                            // 处理器处理接收到的消息
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                                 "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

当接收到数据之后,会根据数据的类型去从processorTable这个map中获取到对应的pair,processorTable又是个什么东西呢,里面存的pair又是干什么的?processorTable中的key是数据的类型,key是pair,pair里面存了两个对象,一个是RemotingProcessor,另一个是ExecutorService(也就是线程池),因为接收到的消息是需要去走不同的业务处理逻辑的,比如RM接收到TC发送过来的commit消息就走commit的处理逻辑,接收到rollback消息就走rollback的处理逻辑,所以在Seata中为了达到这样的效果,就需要对接收的数据进行类型区分,然后每一种类型的消息都会对应一个处理器RemotingProcessor,而处理器则交由线程池来执行。那么processorTable的key-value又是哪里来的呢?答案就是交由客户端与服务端根据自身的业务需求来具体往processorTable中注册处理器

客户端(TM/RM) 

AbstractNettyRemotingClient 

该类继承于AbstractNettyRemoting,是Netty客户端的抽象基类,基于AbstractNettyRemoting的发送数据能力之上,实现了通过不同的负载均衡策略去选择服务端地址以及批量发送的功能 

(1)服务端负载均衡 
protected String loadBalance(String transactionServiceGroup, Object msg) {
    InetSocketAddress address = null;
    try {
        // 获取到transactionServiceGroup对应的TC集群的地址集合
        @SuppressWarnings("unchecked")
        List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
        // 通过负载均衡算法(默认是XIDLoadBalance)选择出一个TC的地址
        address = this.doSelect(inetSocketAddressList, msg);
    } catch (Exception ex) {
        LOGGER.error(ex.getMessage());
    }
    if (address == null) {
        throw new FrameworkException(NoAvailableService);
    }
    return NetUtil.toStringAddress(address);
}

loadBalance方法主要做的就是通过事务服务组名从注册工厂中去获取到所有的服务端地址,然后再根据具体的负载均衡算法去选择出一个服务端地址并返回,那么这里就会涉及到有两个点了,一个是注册工厂,一个是负载均衡算法,这两个都是有具体不同实现的。对于注册工厂来说,在Seata中提供了多种TC(在Seata中TC就是服务端)注册的方式,比如支持Nacos,Etcd,Eureka,Redis,Zookeeper等等,而负载均衡算法也一样,有轮询,随机等方式,既然有多种实现方式,Seata是怎么进行选择与适配的呢?这里很自然就会想到通过SPI机制,我们通过配置某一种方式然后Seata就可以根据我们的配置去进行选择具体的策略,而之后如果还要扩展其它方式的时候,对于Seata来说就很方便了,只需要对接SPI接口来实现即可。另外还有个问题就是loadBalance方法是每次在发送消息的时候就会去调用,是不是每次都需要去从注册工厂中实时获取到TC的地址呢?其实并没这个必要,因为TC的地址一般都不会频繁变动(TC的上下线也不会很多),所以Seata对于这一点做了些优化,就是通过一个定时任务每隔10s从注册工厂中获取到TC的地址然后建立channel连接并缓存起来,之后每次调用loadBalance去获取TC地址的时候从这个缓存中进行获取即可,定时任务的代码如下: 

public void init() {
    // 开启一个定时任务,每10s去从注册中心获取到tc集群最新的地址,然后分别与之建立channel连接
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();

    // 启动netty客户端
    clientBootstrap.start();
}

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect 

void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
    // 从注册服务上获取tc的地址
    availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
    LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
    return;
}
if (CollectionUtils.isEmpty(availList)) {
    RegistryService registryService = RegistryFactory.getInstance();
    String clusterName = registryService.getServiceGroup(transactionServiceGroup);

    if (StringUtils.isBlank(clusterName)) {
        LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                     ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                     transactionServiceGroup);
        return;
    }

    if (!(registryService instanceof FileRegistryServiceImpl)) {
        LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
    }
    return;
}

Set<String> channelAddress = new HashSet<>(availList.size());
try {
    // 遍历所有的tc地址
    for (String serverAddress : availList) {
        try {
            // 根据tc的地址获取对应已连接的channel
            acquireChannel(serverAddress);
            channelAddress.add(serverAddress);
        } catch (Exception e) {
            LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                         serverAddress, e.getMessage(), e);
        }
    }
} finally {
    if (CollectionUtils.isNotEmpty(channelAddress)) {
        List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
        for (String address : channelAddress) {
            String[] array = address.split(":");
            aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
        }
        // 把已经建立好连接的channel缓存起来
        RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
    } else {
        RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
    }
}
}
public interface RegistryService<T> {

    // ......

    default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
        return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
    }

    default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
        List<InetSocketAddress> aliveAddress) {
        return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
    }
    
}

定时器会每隔10s调用channel管理器NettyClientChannelManager的reconnect方法,reconnect方法就会去通过注册工厂获取TC的地址并缓存到CURRENT_ADDRESS_MAP这个map中,之后loadBalance方法就会从CURRENT_ADDRESS_MAP获取到TC地址 

(2)定义客户端handler 
/**
 * Netty客户端处理接收消息的handler
 */
@Sharable
class ClientHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof RpcMessage)) {
            return;
        }
        processMessage(ctx, (RpcMessage) msg);
    }

    // ......
   
}

在AbstractNettyRemotingClient中会去把ClientHandler作为客户端的handler,当客户端在接收服务端发送过来的数据的时候,会回调channelRead方法,而channelRead方法会调用父类processMessage方法 

(3)注册处理器 
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(requestCode, pair);
}

定义了registerProcessor方法,子类可以通过该方法往processorTable中进行注册客户端的处理器 

(4)批量发送 

Seata并没有开放批量请求的api给用户去使用,也就是说我们并不能去把多个RPC请求作为参数去进行发送,整个批量发送的机制是在Seata底层去进行的,代码如下: 

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object) 

// 条件成立:说明允许发送批量请求
if (this.isEnableClientBatchSendRequest()) {

    // send batch message is sync request, needs to create messageFuture and put it in futures.
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);

    // 把RPC请求放到队列中
    BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                                                                       key -> new LinkedBlockingQueue<>());
    if (!basket.offer(rpcMessage)) {
        LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                     serverAddress, rpcMessage);
        return null;
    }
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("offer message: {}", rpcMessage.getBody());
    }

    // 条件成立:说明此时异步线程处于空闲状态,此时唤醒异步线程取进行批量请求发送
    if (!isSending) {
        synchronized (mergeLock) {
            mergeLock.notifyAll();
        }
    }

    try {
        return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}",
                     exx.getMessage(), serverAddress, rpcMessage.getBody());
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw new RuntimeException(exx);
        }
    }

}

在客户端发送同步请求的时候,Seata还支持批量请求的发送,可以看到,上面的代码中在执行同步发送的时候会判断是否开启了批量请求,如果开启了就把RPC请求放到一个队列中(每一个发送目标地址对应一个队列),然后再通过MessageFuture进行同步阻塞。那么放到队列中有什么用呢?通常看到把某个东西放到队列中的,大多数都是通过异步线程从队列中去取然后执行的,果然,在AbstractNettyRemotingClient中确实开启了一个异步线程 

public void init() {

    // ......
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }

    // ......
}
private class MergedSendRunnable implements Runnable {

    @Override
    public void run() {
        @Override
        public void run() {
            while (true) {
                // 等待唤醒,避免一直死循环占用cpu资源
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {
                    }
                }

                // 设置为正在批量发送中
                isSending = true;
                // 遍历所有的批量请求
                basketMap.forEach((address, basket) -> {
                    if (basket.isEmpty()) {
                        return;
                    }

                    // 把要发送到同一个目标地址的请求都放到MergedWarpMessage请求中
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        // send batch message is sync request, but there is no need to get the return value.
                        // Since the messageFuture has been created before the message is placed in basketMap,
                        // the return value will be obtained in ClientOnResponseProcessor.
                        // 获取与目标地址的channel连接
                        sendChannel = clientChannelManager.acquireChannel(address);
                        // 把这个批量请求异步发送给服务端
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            destroyChannel(address, sendChannel);
                        }
                        // fast fail
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = futures.remove(msgId);
                            if (messageFuture != null) {
                                messageFuture.setResultMessage(
                                    new RuntimeException(String.format("%s is unreachable", address), e));
                            }
                        }
                        LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                    }
                });

                // 发送完之后恢复标志位
                isSending = false;
            }
    }

    // ......
}

可以看到,异步线程会去从队列中获取到刚才我们放进去的RPC请求,然后把发送到同一个地址的RPC请求都汇集到一个MergedWarpMessage请求中,再把MergedWarpMessage请求异步发送到服务端。那么服务端拿到这个批量请求之后会怎样呢?接下来我们去看下服务端的代码: 

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage 

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        // 获取到请求对象
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());

        if (!(message instanceof AbstractMessage)) {
            return;
        }

        // 条件成立:说明是批量发送
        if (message instanceof MergedWarpMessage) {
            // 条件成立:服务端开启了批量响应
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                // 获取批量发送的请求
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                // 获取批量发送的请求id
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                // 遍历所有的批量请求并进行处理
                for (int i = 0; i < msgs.size(); i++) {
                    AbstractMessage msg = msgs.get(i);
                    int msgId = msgIds.get(i);
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                    } else {
                        handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                    }
                }
            } else {
                // 批量请求都处理完得到的结果集合
                List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
                List<CompletableFuture<Void>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<>();
                        }
                        int finalI = i;
                        completableFutures.add(CompletableFuture.runAsync(() -> {
                            results.add(finalI, handleRequestsByMergedWarpMessage(
                                ((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
                        }));
                    }
                    // 条件成立:说明没有开启并行处理请求,此时就对每一个请求进行串行处理
                    else {
                        results.add(i,
                            handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                    }
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        // 多线程并行处理请求
                        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
                    } catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", e.getMessage(), e);
                    }
                }

                // 创建一个MergeResultMessage请求,把结果集合都放进去该请求中然后异步发送给客户端
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        }
        // 条件成立:说明发送过来的是单个请求
        else {
            final AbstractMessage msg = (AbstractMessage) message;
            // 处理这个请求,得到处理结果
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            // 响应这个处理结果
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }

当服务端接收到MergedWarpMessage请求的时候,会有两种处理方式:

  • 服务端开启了批量响应并且客户端版本大于等于1.5:
/**
 * 发送批量响应请求的定时任务
 *
 * @since 1.5.0
 */
private class BatchResponseRunnable implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (batchResponseLock) {
                try {
                    batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);
                } catch (InterruptedException e) {
                    LOGGER.error("BatchResponseRunnable Interrupted error", e);
                }
            }
            isResponding = true;
            basketMap.forEach((channel, msgQueue) -> {
                if (msgQueue.isEmpty()) {
                    return;
                }
                // Because the [serialization,compressor,rpcMessageId,headMap] of the response
                // needs to be the same as the [serialization,compressor,rpcMessageId,headMap] of the request.
                // Assemble by grouping according to the [serialization,compressor,rpcMessageId,headMap] dimensions.
                Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();
                while (!msgQueue.isEmpty()) {
                    QueueItem item = msgQueue.poll();
                    BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,
                                                                                            new ClientRequestRpcInfo(item.getRpcMessage()),
                                                                                            key -> new BatchResultMessage());
                    batchResultMessage.getResultMessages().add(item.getResultMessage());
                    batchResultMessage.getMsgIds().add(item.getMsgId());
                }
                batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->
                                              remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo),
                                                                               channel, batchResultMessage));
            });
            isResponding = false;
        }
    }
}

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,然后把这些结果以同一个channel为维度放在一个队列中,服务端会开启一个线程从每一个channel的队列获取执行结果放到一个BatchResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

  • 服务端没有开启批量响应或者客户端版本小于1.5:

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,把这些结果都放到一个MergeResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

可以看到,其实这两种方式的差异并不大,主要的区别在于第一种方式响应给客户端的请求是由异步线程去做的,第二种方式响应给客户端是由processor本身的执行线程去做的 

RmNettyRemotingClient 
注册RM端的处理器 
/**
 * 注册RM端的处理器
 */
private void registerProcessor() {
    // 注册处理分支事务commit的handler
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

    // 注册处理分支事务rollback的handler
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

    // 处理处理undoLog的handler
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);

    // 注册处理接收TC端响应的handler
    ClientOnResponseProcessor onResponseProcessor =
    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);

    // 注册处理心跳请求的handler
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
TmNettyRemotingClient 
注册TM端的处理器 
/**
 * 注册处理器
 */
private void registerProcessor() {
    // 注册TC响应数据过来的处理器
    ClientOnResponseProcessor onResponseProcessor =
    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);

    // 注册心跳接收的处理器
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

服务端(TC) 

抽象基类AbstractNettyRemotingServer 
Netty服务启动 
public void init() {
    super.init();
    // 启动netty服务
    serverBootstrap.start();
}
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
    super(messageExecutor);
    serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
    // 设置netty的接收handler
    serverBootstrap.setChannelHandlers(new ServerHandler());
}

重写父类的init方法,在启动的时候会调用init方法来启动一个Netty服务。那么什么时候会去调用init方法呢?答案是在Server类的start方法中 

public class Server {

    public static void start(String[] args) {
        // ......
        nettyRemotingServer.init();
    }
}
@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);

    private boolean started = Boolean.FALSE;

    private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();

    public static void addDisposable(Disposable disposable) {
        DISPOSABLE_LIST.add(disposable);
    }

    @Override
    public void run(String... args) {
        try {
            long start = System.currentTimeMillis();
            Server.start(args);
            started = true;

            long cost = System.currentTimeMillis() - start;
            LOGGER.info("seata server started in {} millSeconds", cost);
        } catch (Throwable e) {
            started = Boolean.FALSE;
            LOGGER.error("seata server start error: {} ", e.getMessage(), e);
            System.exit(-1);
        }
    }

    // ......
}

而io.seata.server.Server#start方法又是被io.seata.server.ServerRunner#run所调用,而ServerRunner又实现了springboot的CommandLineRunner接口,基于springboot的扩展机制,当容器启动完成之后就会回调所有的CommandLineRunner接口的run方法,从而在服务启动的时候间接地去启动Netty服务 文章来源地址https://www.toymoban.com/news/detail-513359.html

NettyRemotingServer 
注册服务端的处理器 
/**
 * 注册处理器
 */
private void registerProcessor() {
    // 注册处理请求消息的处理器
    ServerOnRequestProcessor onRequestProcessor =
    new ServerOnRequestProcessor(this, getHandler());
    ShutdownHook.getInstance().addDisposable(onRequestProcessor);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

    // 注册处理响应请求的处理器
    ServerOnResponseProcessor onResponseProcessor =
    new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);

    // 注册处理RM注册请求的处理器
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

    // 注册处理TM注册请求的处理器
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

    // 注册处理心跳消息的处理器
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

到了这里,关于深入探讨Seata RPC模块的设计与实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入探讨安全验证:OAuth2.0、Cookie与Session、JWT令牌、SSO与开放授权平台设计

    认证和授权是安全验证中的两个重要概念。认证是确认身份的过程,用于建立双方之间的信任关系。只有在认证成功的情况下,双方才可以进行后续的授权操作。授权则是在认证的基础上,确定用户或系统对资源的访问权限。 在设计一个权限认证框架时,可以考虑以下原则:

    2024年02月12日
    浏览(26)
  • Go微服务: 基于net/rpc/jsonrpc模块实现微服务跨语言调用

    概述 Golang 提供 net/rpc/jsonrpc 库来实现rpc方法 采用 json 方式进行数据编解码,支持跨语言调用 这里实现跨语言示例 1 ) go 服务端 2 ) nodejs 客户端1 3 ) nodejs 客户端2 4 ) go 客户端3 总结 这里演示了,基于go语言为服务端,nodejs 和 golang 为客户端的3种示范 注意,上面 nodejs版本

    2024年03月17日
    浏览(32)
  • 探讨PLC:一个需要一定专业性的领域?

    虽然有些人认为PLC是一个具有较高专业性的领域,但它并非完全专业化的内容。以下是一些关于PLC的观点,帮助优化表述:我这里刚好有plc、嵌入式、单片机的资料需要的可以私我或在评论区扣6 学习曲线:对于没有编程或电气知识背景的人来说,学习PLC可能会有一定的学习

    2024年02月11日
    浏览(45)
  • 网络编程——RPC与HTTP基本介绍、历史追溯、主流应用场景、对比分析、为什么还需要使用RPC

    HTTP协议(Hyper Text Transfer Protocol) 超文本传输协议 : 一个用于在网络上交换信息的标准协议,它定义了客户端(例如浏览器)和服务器之间的通信方式。如平时上网在浏览器上敲个网址url就能访问网页,这里用到的就是HTTP协议。 明确 HTTP 是一个协议,是一个超文本传输协议,

    2024年02月16日
    浏览(31)
  • 深入探讨 Presto 中的缓存

    【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 Presto是一种流行的开源分布式SQL引擎,使组织能够在多个数据源上大规模运行交互式分析查询。缓存是一种典型的提高 Presto 查询性能的优化技术。它为 Presto 平台提供了显着的性能和效

    2024年02月07日
    浏览(45)
  • 深入探讨 NFT 的金融化问题

    NFT必须寻求突破, 随着NFT开始通过这些新颖的应用将更多的主流用户带入DeFi的世界,我们注意到一些主要的协议和持续的挑战。 目前,在DeFi生态系统中采用NFT的最大障碍是准确的定价和流动性。DeFi正试图解决这两个关键障碍,并取得不同程度的成功。推荐阅读本文。你别

    2024年02月02日
    浏览(27)
  • 深入探讨YOLOv8 网络架构

    YOLOv8 尚未发表论文,因此我们无法直接了解其创建过程中进行的直接研究方法和消融研究。话虽如此,我们分析了有关模型的存储库和可用信息,以开始记录 YOLOv8 中的新功能。 如果您想自己查看代码,请查看YOLOv8 存储库并查看此代码差异以了解一些研究是如何完成的。 在

    2024年02月02日
    浏览(23)
  • 深入探讨软件测试的质量度量指标

    本文的目的是介绍项目中使用到主要质量指标,这些质量指标可以分为以下三类: 质量保证过程指标 生产事故管理指标 度量质量文化指标 质量保证过程指标 质量保证指标可以通过测试覆盖率来度量功能和非功能测试的覆盖率,同时也可以根据测试发现的缺陷的状态、优先

    2024年02月09日
    浏览(31)
  • 图卷积网络:GNN 深入探讨【02/4】

     在各种类型的GNN中, 图卷积网络 (GCN)已成为最普遍和应用最广泛的模型。GCN具有创新性,因为它们能够利用节点的特征及其局部性进行预测,从而提供了一种处理图形结构数据的有效方法。 在本文中,我们将在推荐系统的背景下概述图论和图神经网络(GNN)。      

    2024年02月12日
    浏览(20)
  • 《Linux详解:深入探讨计算机基础》

    《Linux详解:深入探讨计算机基础》 引言: 在计算机科学领域,操作系统是一个至关重要的概念,而Linux作为一种开源的Unix-like操作系统,不仅在服务器领域广泛应用,也在嵌入式系统、超级计算机等多个领域发挥着巨大作用。本文将深入探讨Linux操作系统,从基础概念到核

    2024年02月03日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包