Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

这篇具有很好参考价值的文章主要介绍了Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、main方式入口

路径:org.elasticsearch.bootstrap.Elasticsearch

  /**
     * 启动 elasticsearch 的主入口点。
     */
    public static void main(final String[] args) {

        Bootstrap bootstrap = initPhase1();
        assert bootstrap != null;

        try {
            initPhase2(bootstrap);
            initPhase3(bootstrap);
        } catch (NodeValidationException e) {
            bootstrap.exitWithNodeValidationException(e);
        } catch (Throwable t) {
            bootstrap.exitWithUnknownException(t);
        }
    }

这里初始化会有三个初始化阶段。可以直接看initPhase3

二、Elasticsearch初始化第三阶段

 /**
     *初始化的第三阶段
     *阶段 3 包含初始化安全管理器后的所有内容。到目前为止,该系统一直是单线程的。此阶段可以生成线程、写入日志,并受安全管理器策略的约束。
     * 在第 3 阶段结束时,系统已准备好接受请求,主线程已准备好终止。这意味着:
     *    节点组件已构建并启动
     *    清理已完成(例如安全设置已关闭)
     *    除主线程外,至少有一个线程处于活动状态,并且在主线程终止后将保持活动状态
     *    已通知父 CLI 进程系统已准备就绪
     */
    private static void initPhase3(Bootstrap bootstrap) throws IOException, NodeValidationException {
        //调用checkLucene()函数进行Lucene的检查
        checkLucene();
        //创建一个Node对象,并重写validateNodeBeforeAcceptingRequests方法,用于在接受请求之前进行节点验证。
        Node node = new Node(bootstrap.environment()) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress,
                List<BootstrapCheck> checks
            ) throws NodeValidationException {
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
        //使用bootstrap.spawner()和之前创建的node对象实例化一个Elasticsearch对象,并将其赋值给INSTANCE变量。
        INSTANCE = new Elasticsearch(bootstrap.spawner(), node);
        //关闭安全设置
        IOUtils.close(bootstrap.secureSettings());
        //启动INSTANCE对象,node会启动,并保持一个存活线程
        INSTANCE.start();
        //如果命令行参数指定了daemonize,则移除控制台输出的日志配置。
        if (bootstrap.args().daemonize()) {
            LogConfigurator.removeConsoleAppender();
        }
        //发送CLI标记,表示服务器已经准备好接受请求。
        bootstrap.sendCliMarker(BootstrapInfo.SERVER_READY_MARKER);
        //如果命令行参数指定了daemonize,则关闭流;否则,启动CLI监视线程。
        if (bootstrap.args().daemonize()) {
            bootstrap.closeStreams();
        } else {
            startCliMonitorThread(System.in);
        }
    }

其中INSTANCE.start();如下,代表node启动,并且存活线程运行

private void start() throws NodeValidationException {
        node.start();
        keepAliveThread.start();
    }

1、构造node节点对象时构造restController

public Node(Environment environment) {
        this(environment, PluginsService.getPluginsServiceCtor(environment), true);
    }
/**
     * Constructs a node
     * 节点的初始化
     */
    protected Node(
        final Environment initialEnvironment,
        final Function<Settings, PluginsService> pluginServiceCtor,
        boolean forbidPrivateIndexSettings
    ) {
   		 //省略代码。。。。
   			//里面会初始化restController
            ActionModule actionModule = new ActionModule(
                settings,
                clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(),
                settingsModule.getClusterSettings(),
                settingsModule.getSettingsFilter(),
                threadPool,
                pluginsService.filterPlugins(ActionPlugin.class),
                client,
                circuitBreakerService,
                usageService,
                systemIndices,
                tracer,
                clusterService,
                reservedStateHandlers
            );
            modules.add(actionModule);
            //restController存入到networkModule,而NetworkModule是用于处理注册和绑定所有网络相关类的模块
            //末尾有 actionModule.initRestHandlers初始化hander
            final RestController restController = actionModule.getRestController();
            final NetworkModule networkModule = new NetworkModule(
                settings,
                pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool,
                bigArrays,
                pageCacheRecycler,
                circuitBreakerService,
                namedWriteableRegistry,
                xContentRegistry,
                networkService,
                restController,
                actionModule::copyRequestHeadersToThreadContext,
                clusterService.getClusterSettings(),
                tracer
            );
            //省略代码。。。。
 			//初始化Rest的Handler
            actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered());
	}

2、在node构建对象最后执行初始化RestHanders的操作

  public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
       //省略代码。。。这里只选几个经常用到的
        registerHandler.accept(new RestGetIndicesAction());
        registerHandler.accept(new RestIndicesStatsAction());
        registerHandler.accept(new RestCreateIndexAction());    
        registerHandler.accept(new RestDeleteIndexAction());
        registerHandler.accept(new RestGetIndexTemplateAction());
        registerHandler.accept(new RestPutIndexTemplateAction());
        registerHandler.accept(new RestDeleteIndexTemplateAction());
        registerHandler.accept(new RestPutMappingAction());
        registerHandler.accept(new RestGetMappingAction());
        registerHandler.accept(new RestGetFieldMappingAction());
        registerHandler.accept(new RestIndexAction());
        registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder()));
        //省略代码
    }

三、以注册在hander中的RestGetIndicesAction对象为例介绍


/**
 * The REST handler for get index and head index APIs.
 * 用于获取索引和头索引 API 的 REST 处理程序。
 */
@ServerlessScope(Scope.PUBLIC)
public class RestGetIndicesAction extends BaseRestHandler {
	//代表路由匹配规则,通过这个规则知道要调用这个实例,每一个实例路由规则都是不一样的
    @Override
    public List<Route> routes() {
        return List.of(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"));
    }
    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        // starting with 7.0 we don't include types by default in the response to GET requests
        if (request.getRestApiVersion() == RestApiVersion.V_7
            && request.hasParam(INCLUDE_TYPE_NAME_PARAMETER)
            && request.method().equals(GET)) {
            deprecationLogger.compatibleCritical("get_indices_with_types", TYPES_DEPRECATION_MESSAGE);
        }

        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
        final GetIndexRequest getIndexRequest = new GetIndexRequest();
        getIndexRequest.indices(indices);
        getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));
        getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local()));
        getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout()));
        getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
        getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
        getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));
        final var httpChannel = request.getHttpChannel();
        return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
            .indices()
            .getIndex(getIndexRequest, new RestChunkedToXContentListener<>(channel));
    }

   
}

1、继承了BaseRestHandler,routes方法做路由规则,父类调用子类的prepareRequest实现

public abstract class BaseRestHandler implements RestHandler {
    /**
     * {@inheritDoc}
     */
    @Override
    public abstract List<Route> routes();

    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        //调用prepareRequest方法,准备请求以供执行,并且会对请求参数进行处理。
        final RestChannelConsumer action = prepareRequest(request, client);
        //过滤未使用的参数,将未使用的参数收集到一个有序集合中。
        final SortedSet<String> unconsumedParams = request.unconsumedParams()
            .stream()
            .filter(p -> responseParams(request.getRestApiVersion()).contains(p) == false)
            .collect(Collectors.toCollection(TreeSet::new));
        //验证未使用的参数是否有效,如果存在无效参数,则抛出IllegalArgumentException异常。
        if (unconsumedParams.isEmpty() == false) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams(request.getRestApiVersion()));
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }
        //验证请求是否包含请求体,并且请求体是否已被消耗,如果不满足条件,则抛出IllegalArgumentException异常。
        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }
        //增加使用计数
        usageCount.increment();
        //执行action,将结果传递给channel。
        action.accept(channel);
    }
     /**
     *准备要执行的请求。
     * 实现应在返回可运行对象以进行实际执行之前使用所有请求参数。
     * 未使用的参数将立即终止请求的执行。
     * 但是,某些参数仅用于处理响应;实现可以覆盖 {@link BaseRestHandlerresponseParams()} 来指示此类参数。
     */
    protected abstract RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException;
}    

2、BaseRestHandler实现的是RestHandler接口

/**
 * Handler for REST requests
 */
@FunctionalInterface
public interface RestHandler {
    /**
     * 处理rest请求
     */
    void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception;
    /**
     * 此 RestHandler 负责处理的 {@link 路由} 的列表。
     */
    default List<Route> routes() {
        return Collections.emptyList();
    }
 }

其中调用RestHandler接口的handerRequest的上游是

 @Override
    public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    //使用 OPTIONS 方法的请求应在其他地方处理,而不是通过调用 {@code RestHandlerhandleRequest} 使用 OPTIONS 方法的 HTTP 请求绕过 authn,因此此健全性检查可防止调度未经身份验证的请求
        if (request.method() == Method.OPTIONS) {
            handleException(
                request,
                channel,
                new ElasticsearchSecurityException("Cannot dispatch OPTIONS request, as they are not authenticated")
            );
            return;
        }
        if (enabled == false) {
            doHandleRequest(request, channel, client);
            return;
        }
    }

    private void doHandleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        threadContext.sanitizeHeaders();
        // operator privileges can short circuit to return a non-successful response
        if (operatorPrivilegesService.checkRest(restHandler, request, channel, threadContext)) {
            try {
                restHandler.handleRequest(request, channel, client);
            } catch (Exception e) {
                logger.debug(() -> format("Request handling failed for REST request [%s]", request.uri()), e);
                throw e;
            }
        }
    }

其他注册在hander中的API和RestGetIndicesAction类似文章来源地址https://www.toymoban.com/news/detail-725748.html

到了这里,关于Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Rest 优雅的url请求处理风格及注意事项

    😀前言 本篇博文是关于Rest 风格请求的应用和注意事项,希望能够帮助到您😊 🏠个人主页:晨犀主页 🧑个人简介:大家好,我是晨犀,希望我的文章可以帮助到大家,您的满意是我的动力😉😉 💕欢迎大家:这里是CSDN,我总结知识的地方,欢迎来到我的博客,感谢大家

    2024年02月13日
    浏览(40)
  • SpringMVC简介、请求与响应、REST风格、SSM整合、拦截器

    目录 SpringMVC简介 SpringMVC概述 入门案例 入门案例工作流程分析 Controller加载控制 PostMan 请求与响应 设置请求映射路径 五种类型参数传递 JSON数据传输参数  JSON对象数据 JSON对象数组 日期类型参数传递  响应  REST风格 REST风格简介 RESTful入门案例 RESTful快速开发 RESTful案例 SSM整

    2024年02月05日
    浏览(69)
  • 使用Spring Boot和Apache HttpClient构建REST客户端

    介绍: 在本文中,我们将学习如何使用Spring Boot和Apache HttpClient创建一个REST客户端。我们将探讨如何与远程服务器进行通信、处理JSON响应,并为Web应用程序配置跨源资源共享(CORS)。让我们深入代码吧! ClientService 类负责发起HTTP请求并处理响应。它使用 @Service 注解表示它应

    2024年01月16日
    浏览(53)
  • Elasticsearch 常用 REST API 之集群APIs

    集群运行状况API返回关于集群运行状况的简单状态。您还可以使用API仅获取指定数据流和索引的健康状态。对于数据流,API 检索流的支持索引的运行状况。 集群健康状态为:绿色、黄色和红色。在shard级别,红色状态表示集群中没有分配特定的shard,黄色状态表示主shard已分配

    2024年02月02日
    浏览(44)
  • Rust Web 全栈开发之 Actix 尝鲜并构建REST API

    actix-web v4.3.1 actix-rt v2.8.0 目录 Cargo.toml webservice/Cargo.toml webservice/src/bin/server1.rs 运行 客户端浏览器 互联网 Actix HTTP Server Actix的并发(concurrency) Actix支持两类并发: 异步I/O:给定的OS原生线程在等待I/O时执行其他任务(例如侦听网络连接) 多线程并行:默认情况下启动OS原生

    2024年02月06日
    浏览(37)
  • Elasticsearch Java REST Client 批量操作(Bulk API)

    上一篇:Elasticsearch Java REST Client Term Vectors API 下一篇:Elasticsearch Java REST Client Search APIs 查询 BulkRequest可用于使用单个请求执行多个索引、更新和/或删除操作。 它需要至少一个操作添加到 Bulk 请求中: multiGetAPI 在单个 http 请求中并行执行多个请求get 。 MultiGetRequest,添加 `M

    2024年02月11日
    浏览(51)
  • (Rest风格API)Elasticsearch索引操作、映射配置、数据操作、查询操作

    1.请求方式:put 2.请求路径:索引库名 3.请求参数:json格式 number_of_shards 是指索引要做多少个分片,只能在创建索引时指定,后期无法修改。 number_of_replicas 是指每个分片有多少个副本,后期可以动态修改 什么是分片? ES中所存数据的文件块,也是数据的最小单元块。假如有

    2024年04月26日
    浏览(51)
  • Elasticsearch rest-high-level-client 基本操作

    本篇主要讲解一下 rest-high-level-client 去操作 Elasticsearch , 虽然这个客户端在后续版本中会慢慢淘汰,但是目前大部分公司中使用Elasticsearch 版本都是6.x 所以这个客户端还是有一定的了解 前置准备 准备一个SpringBoot环境 2.2.11 版本 准备一个Elasticsearch 环境 我这里是8.x版本 引入依赖

    2024年01月22日
    浏览(54)
  • SpringBoot整合ElasticSearch之Java High Level REST Client

    1 搭建SpringBoot工程 2 引入ElasticSearch相关坐标。 3 编写核心配置类 编写核心配置文件: 这里可以不写在配置,可以直接写在代码中,只是一般都是写在配置文件中 编写核心配置类 4 测试客户端对象 记得把maven的单元测试关了 注意:使用@Autowired注入RestHighLevelClient 如果报红线

    2024年02月05日
    浏览(52)
  • Springboot引入elasticsearch-rest-high-level-client

    ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html 其中的Java Rest Client又包括两种: Java Low Level Rest Client Java High Level Rest Client 我这边以 Java High Level Re

    2024年02月03日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包