es 读流程源码解析

这篇具有很好参考价值的文章主要介绍了es 读流程源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 本文源码基于es6.8.0版本

es 读流程源码解析

search 分为两部分,query + fetch

节点角色划分

协调节点负责接收请求,然后构造查询分发给其他的数据节点,然后从各个分片上获取数据。数据最终汇聚到协调节点,然后再讲结果做合并。然后返回查询结果。

而数据节点,则只负责将自己的分片上的数据做一次查询。然后把数据发给协调节点。

1.请求到协调节点,解析请求

Rest层用于解析Http请求参数,RestRequest解析并转化为SearchRequest,然后再对SearchRequest做处理,这块的逻辑在RestSearchAction.prepareRequest(final RestRequest request, final NodeClient client)

2.转换请求,从Rest层到Transport层

NodeClient在处理SearchRequest请求时,会将请求的action转化为对应Transport层的action,然后再由Transport层的action来处理SearchRequest

3.协调节点在TransportAction

TransportAction#execute(Request request, ActionListener listener) -> TransportAction#execute(Task task, Request request, ActionListener listener) -> TransportAction#proceed(Task task, String actionName, Request request, ActionListener listener)。TransportAction会调用一个请求过滤链来处理请求,如果相关的插件定义了对该action的过滤处理,则先会执行插件的处理逻辑,然后再进入TransportAction的处理逻辑

4.构造查询的分片列表

TransportSearchAction.doExecute()方法里边,会获取到远程集群和本地集群对应的索引列表

TransportSearchAction.executeSearch()构造查询的分片列表,设置默认的查询策略QUERY_THEN_FETCH,设置默认的缓存策略,不开启缓存。setMaxConcurrentShardRequests() 方法设置最大的并发数最大并发分片数,最大是256:Math.min(256, Math.max(nodeCount, 1)* IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getDefault(Settings.EMPTY))。

翻译一下上边的公式(Math.min(256, Math.max(数据节点的个数, 1)* 索引设置的分片数 )

最后再调用searchAsyncAction(),去执行真正的查询操作。执行查询用到的线程池是:search

接着又调用了AbstractSearchAsyncAction的start方法。

5.执行query的操作,分发请求到各个分片上

TransportSearchAction.executeSearch() 方法里边调用了searchAsyncAction()方法! 接着再看searchAsyncAction(),它是真正执行查询操作的方法。这个方法里边,先判断是否执行过滤,如果不过滤,去判断是用那种方式来执行查询(默认QUERY_THEN_FETCH)。 searchAsyncAction()方法会根据查询类型,来返回AbstractSearchAsyncAction的实现类,如果是QUERY_THEN_FETCH就返回SearchQueryThenFetchAsyncAction对象,如果是DFS_QUERY_THEN_FETCH则返回SearchDfsQueryThenFetchAsyncAction对象。这个对象还在TransportSearchAction.executeSearch() 方法里边,并且最终调用了该对象的start()方法。注意这里边的调用关系:AbstractSearchAsyncAction类的start()方法调用了executePhase()方法,在executePhase()方法中又调用了SearchPhase类的 run()方法。而SearchPhase是一个抽象类。

接着再看继承了SearchPhase的类,只有以下截图中的四个。我们重点关注的是 InitialSearchPhase类,

es 读流程源码解析

InitialSearchPhase.run() 在该方法中,并发去获取每个分片上的数据(并发度为 Math.min(设置的最大并发数maxConcurrentShardRequests,分片数))。调用 performPhaseOnShard()方法,去每个分片上取数据。等待所有的并发的线程把所有的分片上的数据都取到,才结束。执行过程中,如果某个分片失败了,也会记录下来。 shardsIts是本次查询涉及的所有分片,shardRoutings.nextOrNull()从某个分片中主或者所有副本中选一个 。在该方法中,shardsIts是本次查询涉及的所有分片,shardRoutings.nextOrNull()从某个分片中主或者所有副本中选一个。InitialSearchPhase.run() 中最后去调用了performPhaseOnShard()方法,在performPhaseOnShard()方法里边调用了executePhaseOnShard()方法。还是在InitialSearchPhase类中的performPhaseOnShard()方法里边,executePhaseOnShard()方法是一个抽象方法,定义在InitialSearchPhase类中,它的实现类有以下截图上的几个

es 读流程源码解析

其中红色框是我们重点关注的,这个是和上边的TransportAction.executeSearch() 方法里边的searchAsyncAction()方法里边的searchAsyncAction()方法就照应起来了。如果是QUETY_THEN_FETCH搜索模式,则继续走的是SearchQueryThenFetchAsyncAction类的executePhaseOnShard()方法,在该方法中,调用了SearchTransportService类的sendExecuteQuery()方法,在sendExecuteQuery()方法中,继续调用了transportService的sendChildRequest()方法 向具体的分片发送Query阶段的子任务进行异步处理(如果想看是如何发送请求给分片,可以接着看!)。

到这里协调及节点的查询算是结束了,剩下的就是等各个分片将执行的结果返回了。( TODO )然后剩下的查询流程是数据节点上的分片具体的处理协调协调节点分配的查询任务了。

这里再说一下InitialSearchPhase.run(),它对一次一次的分发请求给所有的分片,即使是同一个节点的,也是分发的!

6.关于协调节点分发查询请求后结果的汇聚的调用链路

在上边提到的InitialSearchPhase.run()中调用performPhaseOnShard()方法,performPhaseOnShard()方法调用了executePhaseOnShard()方法,在这个方法中,创建了一个SearchActionListener监听器,用来回调分片的查询结果。然后调用InitialSearchPhase.onShardResult(),将结果合并到一起。前边还提到了,一个查询任务,可能是有多个线程并发执行的,所以在创建的SearchActionListener的时候,调用了InitialSearchPhase.maybeFork()方法,来合并多个线程的任务。InitialSearchPhase.onShardResult(),做的操作是把全部线程执行的结果都合并在协调节点。InitialSearchPhase.onShardResult()方法做了两件事,第一件事是合并结果,调用了onShardSuccess()方法,第二件事是调用successfulShardExecution()方法,来检查是否所有的分片都结束了。如果结束则触发进行下一个阶段的,在successfulShardExecution()方法中,如果都结束了则调用onPhaseDone()方法中的executeNextPhase()来进入 fetch阶段。

7.在协调节点,query阶段,数据汇聚的细节(此时汇聚的只有文档id,真正的数据的汇聚是在fetch阶段进行的)

针对上述的结果合并,onShardSuccess()方法中调用results.consumeResult()方法,多做了一部检查,检查分片的数据是否重复,然后再调用consumeInternal()方法,关于在协调节点,各个分片结果合并的操作都在consumeInternal()方法里边。其中包括了agg聚类结果的合并,和search出来的TopDocs的合并。agg聚类结果的合并调用consumeInternal()中的InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext),search出来的topDoc是通过mergeTopDocs(Arrays.asList(topDocsBuffer), querySearchResult.from() + querySearchResult.size(), 0)来合并的。

8.进入 fetch阶段的操作(包含了结果的合并)

实际上是根据上个过程合并的结果。

FetchSearchPhase.run() -> innerRun() -> resultConsumer.reduce()

然后找到reduce的实现类:SearchPhaseController.reduce()

然后再进入controller.reducedQueryPhase() 最终结果的合并,都是在这里边完成的。

controller.reducedQueryPhase()里边包含了:sortDocs() 把所有分片的的数据的排序后再取topN

controller.reducedQueryPhase()里边包含了:InternalAggregations.reduce() 把所有分片的的数据做聚类操作。

确定两件事情,query阶段和fetch,query只返回文档id吗,会不会返回分数值。

各个分片返回的结果再做和合并:SearchPhaseController.reducedQueryPhase()

9.结果返回

TransportService.sendResponse()的processResponse()

ActionListenerResponseHandler.handleResponse()

参看文章:

【Elasticsearch源码】查询源码分析(一)_少加点香菜的博客-CSDN博客

【Elasticsearch源码】查询源码分析(二)_少加点香菜的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-403602.html

到了这里,关于es 读流程源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JavaScript解析】ES6定义变量与箭头函数详解

    箭头函数可以说是ES6的一大亮点,使用箭头函数,可以简化编码过程,使代码更加的简洁 本文由千锋前端老师独家创作,主要给大家介绍了关于ES6中箭头函数的相关资料,文中通过实例代码介绍的非常详细,觉得有帮助的话可以【关注】持续追更~ 我们现在知道定义(声明)一个变

    2024年02月05日
    浏览(48)
  • 解析 angular subscribe中, ES6 Arrow 箭头函数

    箭头函数表达式 的语法比函数表达式更简洁,并且没有自己的this,arguments,super或new.target。箭头函数表达式更适用于那些本来需要匿名函数的地方,并且它不能用作构造函数。 在hero.component中 我们定义了一个函数来获取hero.service的请求 getHeroes(): void {     this.heroService.getH

    2024年02月12日
    浏览(42)
  • JavaScript全解析——ES6函数中参数的默认值和解构赋值

    本文为千锋资深前端教学老师带来的【JavaScript全解析】系列,文章内含丰富的代码案例及配图,从0到1讲解JavaScript相关知识点,致力于教会每一个人学会JS! 文末有本文重点总结,可以收藏慢慢看~ 更多技术类内容,主页关注一波! 给函数的形参设置一个默认值, 当你没有传

    2024年02月05日
    浏览(43)
  • webpack原理之手写babel-loader,将es6以上语法转换为低版本语法

    一、手写babel-loader 一个基于webpack的简单的loader,将es6以上版本的语法转换成低版本语法的步骤如下: 1. 创建一个名为 es6-to-es5-loader 的文件夹,进入该文件夹。 2. 在该文件夹中创建一个名为 index.js 的文件。该文件是我们自定义的loader。再创建一个 schema.json 文件,用于定义配

    2024年02月11日
    浏览(39)
  • 含源码|基于MATLAB的去雾系统(5种去雾算法+1种本文的改进算法)

    去雾系统V2包括作者新加入的 多尺度Retinex去雾算法以及改进去雾算法 ,以及 4种 评价去雾效果的 客观指标 。 引言 去雾系统新增功能 结果分析 源码获取 展望 参考文献 在作者前面写过的文章中,已经介绍过图像去雾算法的应用价值及研究现状,并且也介绍了4种去雾算法的

    2024年01月23日
    浏览(79)
  • 【opencv】【GPU】windows10下opencv4.8.0-cuda C++版本源码编译教程

    提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 OpenCV是一个开源的计算机视觉库,包含了核心模块和扩展模块,提供了基础的图像处理和计算机视觉算法,以及一些机器学习工具。而OpenCV Contrib是OpenCV社区贡献的一组扩展模块之一,包含了一些较为

    2024年02月08日
    浏览(82)
  • 【框架源码】Spring源码解析之BeanDefinition加载流程解析

    观看本文之前,我们先思考一个问题,Spring是如何描述Bean对象的? Spring是根据BeanDefinition来创建Bean对象,BeanDefinition就是Spring中表示Bean定义。BeanDefinition用来存储Bean的相关信息,主要包括:Bean的属性、是否单例、延迟加载、Bean的名称、构造方法等。 简言之就是Spring通过解

    2024年02月09日
    浏览(44)
  • springboot启动流程源码解析(带流程图)

    本文自己写的(头条也有这篇文章),若有问题,请指正。 大致流程如下: 1. 初始化SpringApplication,从META-INF下的spring.factories读取 ApplicationListener/ApplicationContextInitializer 2.运行SpringApplication的run方法 3.读取项目中环境变量、jvm配置信息、配置文件信息等 4.创建Spring容器对象(

    2024年02月08日
    浏览(43)
  • sentinel核心流程源码解析

    可以说,sentinel实现的各种功能就是由各处理槽完成的 ,ProcessorSlot定义了四个方法: 当进入该处理槽时触发该方法 处理完 entry方法之后触发该方法 退出该处理槽时触发该方法 exit方法处理完成时触发该方法  其中:FlowSlot是处理流控规则的处理槽,DegradeSlot是处理降级规则的

    2024年02月14日
    浏览(36)
  • 【框架源码】Spring源码解析之Bean创建源码流程

    问题:Spring中是如何初始化单例bean的? 我们都知道Spring解析xml文件描述成BeanDefinition,解析BeanDefinition最后创建Bean将Bean放入单例池中,那么Spring在创建Bean的这个过程都做了什么。 Spring核心方法refresh()中最最重要的一个方法 finishBeanFactoryInitialization() 方法,该方法负责初始化

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包