Nacos源码 (5) Grpc服务端和客户端

这篇具有很好参考价值的文章主要介绍了Nacos源码 (5) Grpc服务端和客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:

  • 服务器启动
  • 客户端连接
  • 客户端心跳
  • 服务器监控检查

服务器

proto文件

api/src/main/proto/nacos_grpc_service.proto文件:

syntax = "proto3";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto";

message Metadata {
  string type = 3; // 请求/响应的真实类型
  string clientIp = 8;
  map<string, string> headers = 7;
}

// GRPC通信层请求/响应体
message Payload {
  Metadata metadata = 2;
  // 业务层的请求/响应体,需要使用type做反序列化
  google.protobuf.Any body = 3;
}

service RequestStream {
  // build a streamRequest
  rpc requestStream (Payload) returns (stream Payload) {
  }
}

service Request {
  // Sends a commonRequest
  rpc request (Payload) returns (Payload) {
  }
}

service BiRequestStream {
  // Sends a commonRequest
  rpc requestBiStream (stream Payload) returns (stream Payload) {
  }
}

文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。

服务器启动

Server类继承关系

BaseRpcServer
  |-- BaseGrpcServer
     |-- GrpcSdkServer
     |-- GrpcClusterServer

此处介绍一下GrpcSdkServer实现。

GrpcSdkServer类

@Service
public class GrpcSdkServer extends BaseGrpcServer {

    // 所以SDK服务器的监听端口是9848
    private static final int PORT_OFFSET = 1000;

    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }

    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}

大部分的启动逻辑在BaseGrpcServer中。

BaseGrpcServer类

GRPC服务器的启动逻辑大部分都在这个类的startServer方法。

  1. 将处理请求的RequestAcceptor注册到HandlerRegistry
    • GrpcRequestAcceptor用于处理普通业务请求
    • GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
  2. 创建GRPC的Server对象
    • 设置port和executor
    • 设置HandlerRegistry
    • 添加ServerTransportFilter在连接建立和断开时做一些业务操作
  3. 启动Server

GrpcRequestAcceptor类

这个类对GRPC做了扩展,重写了request方法:

  1. 解析Payload获取请求体的数据类型
  2. 从RequestHandlerRegistry获取适配的RequestHandler处理器
  3. 将请求体反序列化成请求体类型对象
  4. 调用handleRequest方法处理请求返回响应

处理请求代码:

Request request = (Request) parseObj;
try {
    // 获取Connection
    Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
    RequestMeta requestMeta = new RequestMeta();
    requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
    requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
    requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
    requestMeta.setLabels(connection.getMetaInfo().getLabels());
    // 刷新活跃时间,后续的健康检查会使用到这个时间戳
    connectionManager.refreshActiveTime(requestMeta.getConnectionId());
    // 使用RequestHandler处理请求
    Response response = requestHandler.handleRequest(request, requestMeta);
    Payload payloadResponse = GrpcUtils.convert(response);
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
} catch (Throwable e) {
    Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
            (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
            e.getMessage()));
    traceIfNecessary(payloadResponse, false);
    responseObserver.onNext(payloadResponse);
    responseObserver.onCompleted();
}

RequestHandler处理器

RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:

public abstract class RequestHandler<T extends Request, S extends Response> {

    @Autowired
    private RequestFilters requestFilters;

    /**
     * Handler request.
     */
    public Response handleRequest(T request, RequestMeta meta) throws NacosException {
        for (AbstractRequestFilter filter : requestFilters.filters) {
            try {
                Response filterResult = filter.filter(request, meta, this.getClass());
                if (filterResult != null && !filterResult.isSuccess()) {
                    return filterResult;
                }
            } catch (Throwable throwable) {
                Loggers.REMOTE.error("filter error", throwable);
            }
        }
        return handle(request, meta);
    }

    /**
     * Handler request.
     */
    public abstract S handle(T request, RequestMeta meta) throws NacosException;
}

实现类:

Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:

// key是Request类型的简单名
// value是RequestHandler实现类对象
Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();

RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。

GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:

public RequestHandler getByRequestType(String requestType) {
    return registryHandlers.get(requestType);
}

建立连接

在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。

创建GrpcConnection

客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:

  1. 获取到conn_id、remote_ip、remote_port、local_port等
  2. 解析请求获取clienIp
  3. 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
  4. 将GrpcConnection注册到ConnectionManager上

创建Client

ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:

public void clientConnected(Connection connect) {
    // grpc类型
    String type = connect.getMetaInfo().getConnectType();
    // 此处获取到的是ConnectionBasedClientFactory对象
    ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
    // 此处创建的是ConnectionBasedClient对象
    clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));
}

public boolean clientConnected(Client client) {
    if (!clients.containsKey(client.getClientId())) {
        // 注册到client集
        // 使用Map维护clientId->client对象关系
        clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);
    }
    return true;
}

健康检查

ConnectionManager连接管理器

这个类管理客户端连接,提供注册连接、移除连接等功能:

// 管理IP -> 连接数,用于实现ConnectionLimitRule
private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);
// 管理connectionId -> Connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。

GrpcConnection实现了Connection抽象类。

在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。

健康检查周期任务

ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:

  1. 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
  2. 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接

断开连接

业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        // 使用ConnectionManager移除连接
        connectionManager.unregister(connectionId);
    }
}

ConnectionManager移除连接:

public synchronized void unregister(String connectionId) {
    // 从Connection集移除连接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        // IP连接数--
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        // 通知ClientManager层移除client对象
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    // 推送一个ClientDisconnectEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

事件处理流程

ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

  • ClientServiceIndexesManager - 维护注册和订阅关系
  • DistroClientDataProcessor - 同步客户端数据到所有服务节点
  • NamingMetadataManager - 维护客户端注册的服务和实例元数据信息

客户端

建立连接

ServerListFactory接口

Server list factory. Use to inner client to connected and switch servers.

管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。

public interface ServerListFactory {

    // 选择一个可用的服务器地址 ip:port格式
    String genNextServer();

    // 返回当前使用的服务器地址 ip:port格式
    String getCurrentServer();

    // 返回服务器集合
    List<String> getServerList();
}

ServerListManager类

解析Properties参数封装服务器地址集合。

创建RpcClient

RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);

createClient方法:

public static RpcClient createClient(String clientName,
                                     ConnectionType connectionType,
                                     Map<String, String> labels) {
    return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {
        if (client == null) {
            if (ConnectionType.GRPC.equals(connectionType)) {
                // 创建的是GrpcSdkClient对象
                client = new GrpcSdkClient(clientNameInner);
            }
            if (client == null) {
                throw new UnsupportedOperationException(
                    "unsupported connection type :" + connectionType.getType());
            }
            client.labels(labels);
        }
        return client;
    });
}

之后需要为Client进行初始化:

  1. 设置ServerListFactory,用于选择服务器地址

  2. 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知

  3. 注册ConnectionEventListener监听器

    rpcClient.serverListFactory(serverListFactory);
    rpcClient.start();
    rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
    rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
    
  4. 启动Client

    • 启动ConnectionEvent处理线程
    • 启动健康检查(心跳)线程
    • 创建GrpcConnection

创建GrpcConnection

  1. 创建GRPC的RequestFutureStub和BiRequestStreamStub
  2. 发一个ServerCheckRequest请求验证服务端的可用性
  3. 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
  4. 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
  5. 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
if (grpcExecutor == null) {
    int threadNumber = ThreadUtils.getSuitableThreadCount(8);
    grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
                    .build());
    grpcExecutor.allowCoreThreadTimeOut(true);
}
// 8848+1000
int port = serverInfo.getServerPort() + rpcPortOffset();
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
// 发一个ServerCheckRequest请求验证服务端的可用性
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
    shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
    return null;
}

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
        .newStub(newChannelStubTemp.getChannel());
// 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

// create stream request and bind connection event to this connection
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);

// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// send a setup request
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);

发送请求

Requester接口

这个接口定义了发送请求的方法:

public interface Requester {

    /**
     * send request.
     *
     * @param request      request.
     * @param timeoutMills mills of timeouts.
     * @return response  response returned.
     * @throws NacosException exception throw.
     */
    Response request(Request request, long timeoutMills) throws NacosException;

    /**
     * send request.
     *
     * @param request request.
     * @return request future.
     * @throws NacosException exception throw.
     */
    RequestFuture requestFuture(Request request) throws NacosException;

    /**
     * send async request.
     *
     * @param request         request.
     * @param requestCallBack callback of request.
     * @throws NacosException exception throw.
     */
    void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException;

    /**
     * close connection.
     */
    void close();
}

GrpcConnection实现

GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:

public Response request(Request request, long timeouts) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
    Payload grpcResponse;
    try {
        // 由于request方法是同步的,所以此处阻塞等待响应
        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    return (Response) GrpcUtils.parse(grpcResponse);
}

对于另外两个方法:

  • requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
  • asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调

心跳healthCheck

前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:

// keepAliveTime默认5000L
ReconnectContext reconnectContext = reconnectionSignal
        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
    // check alive time.
    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
        // 健康检查
        boolean isHealthy = healthCheck();
        if (!isHealthy) {
            if (currentConnection == null) {
                continue;
            }

            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                break;
            }

            // 准备重连
            boolean success = RpcClient.this.rpcClientStatus
                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
            if (success) {
                reconnectContext = new ReconnectContext(null, false);
            } else {
                continue;
            }
        } else {
            lastActiveTimeStamp = System.currentTimeMillis();
            continue;
        }
    } else {
        continue;
    }
}

if (reconnectContext.serverInfo != null) {
    // clear recommend server if server is not in server list.
    boolean serverExist = false;
    for (String server : getServerListFactory().getServerList()) {
        ServerInfo serverInfo = resolveServerInfo(server);
        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
            serverExist = true;
            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
            break;
        }
    }
    if (!serverExist) {
        reconnectContext.serverInfo = null;
    }
}
// 重连
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);

healthCheck方法:文章来源地址https://www.toymoban.com/news/detail-693360.html

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    if (this.currentConnection == null) {
        return false;
    }
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        // not only check server is ok ,also check connection is register.
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        // ignore
    }
    return false;
}

到了这里,关于Nacos源码 (5) Grpc服务端和客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++实现WebSocket通信(服务端和客户端)

    天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。 这里单纯是个人总结,如需更官方更准确的websocket介绍可百度 websocket是一种即时通讯协

    2024年02月09日
    浏览(48)
  • netty的TCP服务端和客户端实现

    2024年02月21日
    浏览(47)
  • 网络编程——socket服务端和客户端(TCP)

    所谓套接字(Socket),就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。一个套接字就是网络上进程通信的一端,提供了应用层进程利用网络协议交换数据的机制。从所处的地位来讲,套接字上联应用进程,下联网络协议栈,是应用程序通过网络协议进行通

    2024年02月07日
    浏览(86)
  • linux搭建http源【服务端和客户端详细说明】

    inet 192.168.122.1/24 brd 192.168.122.255 scope global virbr0 [root@master ~]# 关闭防火墙和selinux firewall的话,直接 systemctl stop firewalld 即可 如果使用的是iptables,则吧firewalld替换成iptables [root@master ~]# systemctl is-active firewalld active [root@master ~]# [root@master ~]# systemctl stop firewalld [root@master ~]# [root@m

    2024年04月13日
    浏览(40)
  • 服务端和客户端通信-TCP(含完整源代码)

    目录 简单TCP通信实验 分析 1、套接字类型 2、socket编程步骤 3、socket编程实现具体思路 实验结果截图 程序代码 实验设备:     目标系统:windows 软件工具:vs2022/VC6/dev 实验要求: 完成TCP服务端和客户端的程序编写; 实现简单字符串的收发功能。 需附上代码及运行结果截图

    2024年02月07日
    浏览(75)
  • 服务端和客户端通信--UDP(含完整源代码)

    实验设备:     目标系统:Windows 软件工具:vs2022/vc6/dev   实验要求: 完成UDP服务端和客户端的程序编写; 分别实现UDP一对一通信和广播通信功能。 实验内容: -static-libgcc 一对一通信 : 1 、加载/释放Winsock库,创建套接字(WSAStartup()/socket())。 加载方法: WSADATA wsa; /*初始化

    2024年02月14日
    浏览(55)
  • Golang实现之TCP长连接-------服务端和客户端

    一、数据包的数据结构 (所有字段采用大端序) 帧头 帧长度(头至尾) 帧类型 帧数据 帧尾 1字节 4字节 2字节 1024字节 1字节 byte int short string byte 0xC8 0xC9 二、Server端 实现代码 1、main.go 2、server.go 3、protocol.go 4、response.go 5、result.go 三、Client端 实现代码

    2024年02月07日
    浏览(55)
  • SpringBoot搭建Netty+Socket+Tcp服务端和客户端

    yml配置:    完成,启动项目即可自动监听对应端口 这里为了测试,写了Main方法,可以参考服务端,配置启动类 ,实现跟随项目启动   ......想写就参考服务端...... 有测试的,,,但是忘记截图了................

    2024年02月15日
    浏览(53)
  • #Centos Centos7配置NTP服务端和客户端

    环境: 服务器:172.16.89.252 客户端:172.16.89.253 NTP公网地址:常见的NTP授时服务器地址 一、配置服务端 1、先安装NTP服务器(服务器是最小安装,不带ntp) 2、配置ntpd服务 配置文件中一般有restrict default语句,注释掉第二种或选择第一种   配置与上级互联网服务端连续性同步

    2024年02月05日
    浏览(36)
  • linux搭建http源【服务端和客户端详细说明(1)

    我下面的HTTP配置使用的包就是iso镜像里面的包【同理,我们只要会这种方式以后,使用什么包都一样,可以自己在网上下载自己需要的包和依赖,然后通过这种方式配置成http源,然后就可以直接使用yum安装了,这样的好处是解决软件在安装的时候会有许多依赖包这个繁琐的

    2024年04月11日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包