Nacos源码 (6) Grpc概述与Nacos集成

这篇具有很好参考价值的文章主要介绍了Nacos源码 (6) Grpc概述与Nacos集成。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Nacos 2.x版本增加了GRPC服务接口和客户端,极大的提升了Nacos的性能,本文将简单介绍grpc-java的使用方式以及Nacos中集成GRPC的方式。

grpc-java

GRPC是google开源的、以protobuf作为序列化方式、以http2作为通信协议的高性能rpc框架。

grpc-java是grpc对java语言的实现,使用Netty/Okhttp作为通信组件。

使用方式

添加依赖

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.56.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

生成代码

需要将.proto文件放到src/main/proto或src/test/proto目录下。

然后添加生成代码使用的插件:

For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin's IDE documentation):

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.7.1</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
        <skip>false</skip>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

生成代码:

mvn clean compile

High-level组件

At a high level there are three distinct layers to the library: Stub, Channel, and Transport.

Stub

The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.

Channel

The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.

Transport

The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.

gRPC comes with multiple Transport implementations:

  • The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
  • The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
  • The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
  • The Binder transport is for Android cross-process communication on a single device.

四种通信模式

  • 简单rpc - 一个请求一个响应

    rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
    
  • 服务端流式rpc - 服务端流式响应

    rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
    
  • 客户端流式rpc - 客户端流式请求

    rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
    
  • 双向流rpc

    rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
    

高级应用

  1. 拦截器
  2. Stream Tracer - 流拦截器
  3. Retry Policy - 客户端重试
  4. NameResolver - 服务发现
  5. 负载均衡
  6. grpc与微服务:与dubbo、gateway、jwt、nacos2.x、openfeign

基础示例

本小节将使用简单的示例说明grpc-java的使用方法。

.proto文件

.proto文件需要放在src/main/proto目录下面:

syntax = "proto3";

package org.net5ijy.grpc.auto;

option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;

service StudentService {
  rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
  rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
  rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
  rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}

message StudentUsernameRequest {
  string username = 1;
}

message StudentResponse {
  string realName = 1;
}

message StudentResponseList {
  repeated StudentResponse studentResponse = 1;
}

pom依赖

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.56.0</version>
</dependency>

pom插件

<build>
    <finalName>${project.artifactId}</finalName>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.7.1</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <!-- mvn protobuf:compile -->
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>
                    com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>
                    io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
                </pluginArtifact>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
                <skip>false</skip>
            </configuration>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

编译生成代码

mvn clean compile

编写业务实现类

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

  @Override
  public void getRealNameByUsername(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    StudentResponse response = StudentResponse.newBuilder().setRealName("徐国峰").build();

    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }

  @Override
  public void getRealNameByUsernameLike(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰1").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰2").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰3").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐国峰4").build());
    responseObserver.onCompleted();
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
      StreamObserver<StudentResponseList> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐国峰5").build();
        StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐国峰6").build();
        StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐国峰7").build();
        StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐国峰8").build();

        StudentResponseList responseList = StudentResponseList.newBuilder()
            .addStudentResponse(response1)
            .addStudentResponse(response2)
            .addStudentResponse(response3)
            .addStudentResponse(response4)
            .build();

        responseObserver.onNext(responseList);
        responseObserver.onCompleted();
      }
    };
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
      StreamObserver<StudentResponse> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
        StudentResponse response = StudentResponse.newBuilder()
            .setRealName("徐国峰" + new Random().nextInt(10)).build();
        responseObserver.onNext(response);
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onCompleted();
      }
    };
  }
}

Server代码

public class StudentGrpcServer {

  private static final AtomicInteger COUNT = new AtomicInteger(0);

  static final int GRPC_SERVER_PORT = 50051;

  private Server server;

  private void start() throws IOException {

    // grpc server executor
    Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        r -> {
          Thread t = new Thread(r);
          t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
          return t;
        });

    /* The port on which the server should run */
    this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addService(new StudentServiceImpl())
        .intercept(serverInterceptor())
        .addTransportFilter(serverTransportFilter())
        .build();

    this.server.start();

    System.out.println("Server started, listening on " + GRPC_SERVER_PORT);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      // Use stderr here since the logger may have been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down");
      try {
        StudentGrpcServer.this.stop();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.err.println("*** server shut down");
    }));
  }

  private void stop() throws InterruptedException {
    if (this.server != null && !this.server.isShutdown()) {
      this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (this.server != null) {
      this.server.awaitTermination();
    }
  }

  private ServerInterceptor serverInterceptor() {
    return new ServerInterceptor() {
      @Override
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
          Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        Context ctx = Context.current();
        return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
      }
    };
  }

  private ServerTransportFilter serverTransportFilter() {
    return new ServerTransportFilter() {
      @Override
      public Attributes transportReady(Attributes transportAttrs) {
        return super.transportReady(transportAttrs);
      }

      @Override
      public void transportTerminated(Attributes transportAttrs) {
        super.transportTerminated(transportAttrs);
      }
    };
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    final StudentGrpcServer server = new StudentGrpcServer();
    server.start();
    server.blockUntilShutdown();
  }
}

Client代码

public class StudentGrpcClient {

  private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
  private final StudentServiceGrpc.StudentServiceStub stub;

  public StudentGrpcClient(Channel channel) {
    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
    // shut it down.

    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
    this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
    this.stub = StudentServiceGrpc.newStub(channel);
  }

  public void getRealNameByUsername(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      StudentResponse response = this.blockingStub.getRealNameByUsername(request);
      System.out.printf("Real name=%s\n", response.getRealName());
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernameLike(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
      iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList responseList) {
              responseList.getStudentResponseList()
                  .forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public void getRealNamesByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
            @Override
            public void onNext(StudentResponse response) {
              System.out.printf("Real name=%s\n", response.getRealName());
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) throws InterruptedException {

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();

    try {

      StudentGrpcClient client = new StudentGrpcClient(channel);

      int count = 1;

      for (int i = 0; i < count; i++) {
        client.getRealNameByUsername("admin2018");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernameLike("admin2019");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernames("admin2020");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNamesByUsernames("admin2021");
      }

      Thread.sleep(10000);

    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }
}

Client代码(FutureStub)

仅适用于单请求单响应的简单rpc调用:

try {

  ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
      StudentUsernameRequest.newBuilder().setUsername(username).build());

  // 阻塞等待
  // StudentResponse studentResponse = future.get();

  CountDownLatch latch = new CountDownLatch(1);

  Futures.addCallback(future, new FutureCallback<StudentResponse>() {
    @Override
    public void onSuccess(StudentResponse response) {
      System.out.printf("Real name=%s\n", response.getRealName());
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      System.err.println(t.getMessage());
      latch.countDown();
    }
  }, Executors.newSingleThreadExecutor());

  latch.await();

} catch (StatusRuntimeException | InterruptedException e) {
  System.err.println(e.getMessage());
}

Nacos中grpc的使用

在Nacos中,proto文件并没有定义所有的接口,而是只定义了基础的转发接口和通用请求响应Payload结构体。

具体的接口请求响应结构体在业务代码中编写,业务接口则是使用转发接口进行路由,类似SpringMVC中DispatcherServlet转发请求给Controller一样。

本小节将简单介绍Nacos中集成grpc的方式。

服务端

BaseGrpcServer

抽象类BaseRpcServer定义了rpc服务器的框架逻辑,模板方法startServer()要子类实现,是启动rpc服务器的核心逻辑。

抽象类BaseGrpcServer继承了BaseRpcServer类,封装了grpc组件:

  • Server - GRPC服务器对象
  • GrpcRequestAcceptor - 业务请求接收、转发器
  • GrpcBiStreamRequestAcceptor - 连接请求接收处理器,用于获取双向流发送StreamObserver
  • ConnectionManager - 连接管理器

startServer()方法封装了启动grpc服务器的逻辑:

public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();

    // server interceptor to set connection id.
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                ServerCallHandler<T, S> next) {
            // 把connectionId、ip、port等保存到Context上
            Context ctx = Context.current()
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
                               call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                Channel internalChannel = getInternalChannel(call);
                // 保存channel
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };
    // 添加转发组件
    addServices(handlerRegistry, serverInterceptor);

    // 创建Server
    server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
            .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 在连接建立时获取ip、port等信息,生成connectionId
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    int remotePort = remoteAddress.getPort();
                    int localPort = localAddress.getPort();
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            .set(TRANS_KEY_CONN_ID,
                                 System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    return attrWrapper;
                    
                }

                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        // 连接断开时,从connectionManager移除
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    server.start();
}

GrpcBiStreamRequestAcceptor

grpc bi stream request handler.

主要功能就是封装服务端Connection对象:

if (parseObj instanceof ConnectionSetupRequest) {
    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
    Map<String, String> labels = setUpRequest.getLabels();
    String appName = "-";
    if (labels != null && labels.containsKey(Constants.APPNAME)) {
        appName = labels.get(Constants.APPNAME);
    }

    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
    metaInfo.setTenant(setUpRequest.getTenant());
    // 封装连接基础信息和responseObserver、channel
    // 使用responseObserver向客户端推送消息
    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
    connection.setAbilities(setUpRequest.getAbilities());
    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
    // 注册到connectionManager
    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
        try {
            connection.request(new ConnectResetRequest(), 3000L);
            connection.close();
        } catch (Exception e) {
            
        }
    }
}

GrpcRequestAcceptor

处理业务请求,将业务请求转发到RequestHandler上:

public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    String type = grpcRequest.getMetadata().getType();

    // 查找RequestHandler处理器对象
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // no handler found
    if (requestHandler == null) {
        Payload payloadResponse = GrpcUtils
                .convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 请求体反序列化
    Object parseObj = null;
    try {
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 处理业务请求
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新客户端活跃状态
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        // 使用RequestHandler处理请求
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        // 响应
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse((e instanceof NacosException) ?
                               ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                               e.getMessage()));
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

客户端

客户端使用GrpcSdkClient类,连接服务端的逻辑在其父类GrpcClient的connectToServer方法中:文章来源地址https://www.toymoban.com/news/detail-712252.html

public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            int threadNumber = ThreadUtils.getSuitableThreadCount(8);
            grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(10000),
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
                        "nacos-grpc-client-executor-%d").build());
            grpcExecutor.allowCoreThreadTimeOut(true);
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
        // 创建grpc客户端stub
        RequestGrpc.RequestFutureStub newChannelStubTemp =
            createNewChannelStub(serverInfo.getServerIp(), port);
        if (newChannelStubTemp != null) {
            // 检查服务端的可用性
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) {
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }
            // 创建biStreamStub
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    .newStub(newChannelStubTemp.getChannel());
            // 创建connection
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

            // create stream request and bind connection event to this connection.
            // 用于向服务端发送请求
            StreamObserver<Payload> payloadStreamObserver =
                bindRequestStream(biRequestStreamStub, grpcConn);

            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            // 发送一个ConnectionSetupRequest让服务端创建Connection
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {}
    return null;
}

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {
    return streamStub.requestBiStream(new StreamObserver<Payload>() {
        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 使用客户端侧的ServerRequestHandler处理服务端发送过来的数据
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            // 响应
                            sendResponse(response);
                        }
                    } catch (Exception e) {
                        sendResponse(request.getRequestId(), false);
                    }
                }
            } catch (Exception e) {
                // ...
            }
        }

        @Override
        public void onError(Throwable throwable) {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            } else {
                // ...
            }
        }

        @Override
        public void onCompleted() {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            }
        }
    });
}

到了这里,关于Nacos源码 (6) Grpc概述与Nacos集成的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务与Nacos概述-6

    RBAC 基于角色的访问控制是实施面向企业安全策略的一种有效的访问控制方式。 基本思想是,对系统操作的各种权限不是直接授予具体的用户,而是在用户集合与权限集合之间建立一个角色集合。每一种角色对应一组相应的权限。一旦用户被分配了适当的角色后,该用户就拥

    2024年02月12日
    浏览(69)
  • 微服务与Nacos概述-5

    添加依赖: 定义配置 在主类或者配置类上添加注解以支持OpenClient应用 定义http伪客户端接口 定义控制器,通过feign接口调用远程的服务提供者 测试 负载均衡策略配置 LB中提供了三种负载均衡策略,同时提供接口允许用户自定义扩展 1、定义配置类 2、可以全局或者局部配置

    2024年02月13日
    浏览(30)
  • 微服务与Nacos概述-2

    微服务是一种软件开发架构,它将一个大型应用程序拆分为一系列小型、独立的服务。每个服务都可以独立开发、部署和扩展,并通过轻量级的通信机制进行交互。 应用开发 common模块中包含服务提供者和服务消费者共享的内容 provider模块是服务的提供者,用于通过SpringMVC的

    2024年02月13日
    浏览(33)
  • 微服务注册中心-Nacos概述

    Nacos 是阿里巴巴推出来的一个新开源项目,这是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。 N

    2024年02月04日
    浏览(41)
  • springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析

    Spring Cloud 是一系列框架的有序集合如服务发现注册、配置中心、消息总线、负载均衡、熔断器、数据监控等。 SpringCloud 将多个服务框架组合起来,通过Spring Boot进行再封装,屏蔽掉了复杂的配置和实现原理,最终给开发者提供了一套简单易懂、易部署和易维护的分布式系统开

    2024年02月08日
    浏览(67)
  • SpringBoot集成Nacos-服务注册篇

    Nacos是阿里巴巴开源的一款用于实现服务注册和发现、动态配置管理的中间件。它提供了服务注册与发现、配置管理和动态DNS等功能,可广泛应用于微服务架构中。Spring Boot作为一种快速开发的Java框架,与Nacos的集成可以方便地实现服务的注册与发现。 本文将分为以下几个部

    2024年02月16日
    浏览(40)
  • Nacos-2.2.2源码修改集成高斯数据库GaussDB,postresql

    一 ,下载代码 Release 2.2.2 (Apr 11, 2023) · alibaba/nacos · GitHub 二, 执行打包 mvn -Prelease-nacos -Dmaven.test.skip=true -Drat.skip=true clean install -U 或 mvn -Prelease-nacos ‘-Dmaven.test.skip=true’ ‘-Drat.skip=true’ clean install -U 注意:请不要把源码放在中文路径下,会报各种意想不到的错误。 打包的

    2024年02月05日
    浏览(43)
  • 微服务框架SpringcloudAlibaba+Nacos集成RabbiMQ

    目前公司使用jeepluscloud版本,这个版本没有集成消息队列,这里记录一下,集成的过程;这个框架跟ruoyi的那个微服务版本结构一模一样,所以也可以快速上手。 配置类的东西做成一个公共的模块 rabbitmq模块: 哪一个服务模块需要消息队列,就在对应的yml文件中配置 rabbit链接

    2024年02月07日
    浏览(41)
  • 微服务框架SpringcloudAlibaba+Nacos集成RabbitMQ

    目前公司使用jeepluscloud版本,这个版本没有集成消息队列,这里记录一下,集成的过程;这个框架跟ruoyi的那个微服务版本结构一模一样,所以也可以快速上手。 配置类的东西做成一个公共的模块 rabbitmq模块: 哪一个服务模块需要消息队列,就在对应的yml文件中配置 rabbit链接

    2024年02月06日
    浏览(30)
  • Gateway服务集成Nacos2021.0.4错误解决

    gateway服务集成nacos,启动后报错: Caused by: com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: ; 版本: jdk:1.8 spring-boot:2.7.11 spring-cloud:2021.0.6 spring-cloud-alibaba:2021.0.4.0 单配置文件:application.yaml中配置 多配置文件:

    2024年02月13日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包