Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见

这篇具有很好参考价值的文章主要介绍了Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分

Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见,# ElasticSearch,elasticsearch,lucene

其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());
 actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
 actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP请求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {

    @Override
    public List<Route> routes() {
        return List.of(
            new Route(GET, "/_refresh"),
            new Route(POST, "/_refresh"),
            new Route(GET, "/{index}/_refresh"),
            new Route(POST, "/{index}/_refresh")
        );
    }

    @Override
    public String getName() {
        return "refresh_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
        refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
        return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {
            @Override
            protected RestStatus getStatus(RefreshResponse response) {
                return response.getStatus();
            }
        });
    }
}

client.admin().indices().refresh()会执行到下面的父类TransportBroadcastReplicationActiondoExecute方法

2、往数据节点发送刷新请求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<
    RefreshRequest,
    RefreshResponse,
    BasicReplicationRequest,
    ReplicationResponse> {

    @Inject
    public TransportRefreshAction(
        ClusterService clusterService,
        TransportService transportService,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        NodeClient client
    ) {
        super(
            RefreshAction.NAME,
            RefreshRequest::new,
            clusterService,
            transportService,
            client,
            actionFilters,
            indexNameExpressionResolver,
            TransportShardRefreshAction.TYPE,
            ThreadPool.Names.REFRESH
        );
    }

   //省略代码
}
public abstract class TransportBroadcastReplicationAction<
    Request extends BroadcastRequest<Request>,
    Response extends BaseBroadcastResponse,
    ShardRequest extends ReplicationRequest<ShardRequest>,
    ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
 @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));
    }

    private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {
        return new CheckedConsumer<ActionListener<Response>, Exception>() {

            private int totalShardCopyCount;
            private int successShardCopyCount;
            private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();

            @Override
            public void accept(ActionListener<Response> listener) {
                assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";

                final ClusterState clusterState = clusterService.state();
                final List<ShardId> shards = shards(request, clusterState);
                final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();

                try (var refs = new RefCountingRunnable(() -> finish(listener))) {
                //遍历所有的分片
                    for (final ShardId shardId : shards) {
                        // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?
                        shardExecute(
                            task,
                            request,
                            shardId,
                            ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire())
                        );
                    }
                }
            }
        };
    }

    protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
        assert Transports.assertNotTransportThread("may hit all the shards");
        ShardRequest shardRequest = newShardRequest(request, shardId);
        shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
        client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);
    }

}    

3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<
    BasicReplicationRequest,
    ShardRefreshReplicaRequest,
    ReplicationResponse> {

    private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);

    public static final String NAME = RefreshAction.NAME + "[s]";
    public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
    public static final String SOURCE_API = "api";

    @Inject
    public TransportShardRefreshAction(
        Settings settings,
        TransportService transportService,
        ClusterService clusterService,
        IndicesService indicesService,
        ThreadPool threadPool,
        ShardStateAction shardStateAction,
        ActionFilters actionFilters
    ) {
        super(
            settings,
            NAME,
            transportService,
            clusterService,
            indicesService,
            threadPool,
            shardStateAction,
            actionFilters,
            BasicReplicationRequest::new,
            ShardRefreshReplicaRequest::new,
            ThreadPool.Names.REFRESH
        );
        // registers the unpromotable version of shard refresh action
        new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
    }

  
    @Override
    protected void shardOperationOnPrimary(
        BasicReplicationRequest shardRequest,
        IndexShard primary,
        ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
    ) {
        primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
            ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
            replicaRequest.setParentTask(shardRequest.getParentTask());
            logger.trace("{} refresh request executed on primary", primary.shardId());
            l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));
        }));
    }
}    

primary.externalRefresh执行分片的刷新

二、在IndexShard执行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        verifyNotClosed();
        getEngine().externalRefresh(source, listener);
    }
 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        ActionListener.completeWith(listener, () -> {
            logger.trace("external refresh with source [{}]", source);
            return refresh(source);
        });
    }   

getEngine()的实现是InternalEngine

  @Override
    public RefreshResult refresh(String source) throws EngineException {
        return refresh(source, SearcherScope.EXTERNAL, true);
    }

1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
        //这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。
        //获取当前的本地检查点。
        final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
        boolean refreshed;
        long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
        try {
            //refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。
            if (store.tryIncRef()) {
                try {
                    //尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。
                    ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                    long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
                    //根据参数决定是进行阻塞刷新还是非阻塞刷新
                    if (block) { 
                        //刷新可能会导致阻塞
                        referenceManager.maybeRefreshBlocking();
                        refreshed = true;
                    } else {
                    	//刷新不会导致阻塞
                        refreshed = referenceManager.maybeRefresh();
                    }
                    //如果刷新成功,获取当前的读取器,并更新段的生成号
                    if (refreshed) {
                    	//获取当前的目录
                        final ElasticsearchDirectoryReader current = referenceManager.acquire();
                        try {
                            //更新segment信息
                            segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);
                        } finally {
                            referenceManager.release(current);
                        }
                    }
                } finally {
                    store.decRef();
                }
                if (refreshed) {
                    lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
                }
            } else {
                refreshed = false;
            }
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("refresh failed source[" + source + "]", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new RefreshFailedEngineException(shardId, e);
        }
        assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
            : "refresh checkpoint was not advanced; "
                + "local_checkpoint="
                + localCheckpointBeforeRefresh
                + " refresh_checkpoint="
                + lastRefreshedCheckpoint();
        // TODO: maybe we should just put a scheduled job in threadPool?
        // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
        // for a long time:
        maybePruneDeletes();
        mergeScheduler.refreshConfig();
        return new RefreshResult(refreshed, segmentGeneration);
    }

其中referenceManager 根据入参是 SearcherScope.EXTERNAL 获得的实现是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;
  @Override
    protected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {
        return switch (scope) {
            case INTERNAL -> internalReaderManager;
            case EXTERNAL -> externalReaderManager;
        };
    }

根据入参中的block=true 实际执行的是referenceManager.maybeRefreshBlocking(); 来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager继承了ReferenceManager,所以没有重写maybeRefreshBlocking 所以执行的是父类ReferenceManager

import org.apache.lucene.search.ReferenceManager;

 @SuppressForbidden(reason = "reference counting is required here")
    private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
       
        @Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
           //省略代码
        }

        @Override
        protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
            return reference.tryIncRef();
        }

        @Override
        protected int getRefCount(ElasticsearchDirectoryReader reference) {
            return reference.getRefCount();
        }

        @Override
        protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
            reference.decRef();
        }
    }

(1)、maybeRefresh和maybeRefreshBlocking的简单介绍

下面是lucene源码中关于这两个API的实现,

//这个是会尝试获取刷新锁,如果没有则不执行刷新操作
  public final boolean maybeRefresh() throws IOException {
        this.ensureOpen();
        boolean doTryRefresh = this.refreshLock.tryLock();
        if (doTryRefresh) {
            try {
                this.doMaybeRefresh();
            } finally {
                this.refreshLock.unlock();
            }
        }

        return doTryRefresh;
    }
	//这里会等待获取刷新锁,所以会阻塞
    public final void maybeRefreshBlocking() throws IOException {
        this.ensureOpen();
        this.refreshLock.lock();

        try {
            this.doMaybeRefresh();
        } finally {
            this.refreshLock.unlock();
        }

    }

但是实际上最后执行刷新还是执行的this.doMaybeRefresh() 方法

三、lucene源码中执行逻辑

private void doMaybeRefresh() throws IOException {
        this.refreshLock.lock();
        boolean refreshed = false;

        try {
            Object reference = this.acquire();

            try {
            	//通知刷新监听器。
                this.notifyRefreshListenersBefore();
                //调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)
                //用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 null
                G newReference = this.refreshIfNeeded(reference);
                if (newReference != null) {
                    assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
                    try {
                    //调用 swapReference(newReference) 方法来交换旧的引用为新的引用。
                        this.swapReference(newReference);
                     //设置 refreshed 为 true 表示刷新成功。   
                        refreshed = true;
                    } finally {
                    //如果刷新失败,释放新的引用
                        if (!refreshed) {
                            this.release(newReference);
                        }
                    }
                }
            } finally {
                //释放旧的引用
                this.release(reference);
                //通知刷新监听器刷新完成
                this.notifyRefreshListenersRefreshed(refreshed);
            }

            this.afterMaybeRefresh();
        } finally {
        	//最后释放刷新锁
            this.refreshLock.unlock();
        }

    }

1、判断是否需要刷新

其中refreshIfNeeded用的是子类ExternalReaderManager的实现方法文章来源地址https://www.toymoban.com/news/detail-759504.html

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
 		@Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
            internalReaderManager.maybeRefreshBlocking();
             //获取其reader对象。
            final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
            //isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新
            if (isWarmedUp == false || newReader != referenceToRefresh) {
                boolean success = false;
                try {
                    refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
                    isWarmedUp = true;
                    success = true;
                } finally {
                    if (success == false) {
                        internalReaderManager.release(newReader);
                    }
                }
            }
            //没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作
            if (referenceToRefresh == newReader) {
                internalReaderManager.release(newReader);
                return null;
            } else {
                return newReader; // steal the reference
            }
        }
}        

到了这里,关于Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用环形缓冲区ringbuffer实现串口数据接收

    环形缓冲区(ringbuffer),实际上就是一种队列数据结构,只不过它不是线性队列,而是环形队列。 关于环形缓冲区(ringbuffer)的详细介绍,网上一搜一大把,这里不重复介绍了,我这里直接上代码。 详细介绍可以参考下面链接里面的介绍: https://en.wikipedia.org/wiki/Circular_b

    2023年04月19日
    浏览(42)
  • 【STM32 CubeMX】学STM必会的数据结构——环形缓冲区

    在嵌入式系统开发中,经常需要处理数据的缓存和传输,而环形缓冲区是一种常见且有效的数据结构,特别适用于处理实时数据流或者在有限的内存资源下高效地管理数据。在STM32微控制器的开发中,使用CubeMX工具可以方便地配置和生成环形缓冲区的代码,从而加速开发过程

    2024年04月12日
    浏览(29)
  • 【Linux】文件缓冲区

    提到文件缓冲区这个概念我们好像并不陌生,但是我们对于这个概念好像又是模糊的存在脑海中,之间我们在介绍c语言文件操作已经简单的提过这个概念,今天我们不妨深入理解什么是文件缓冲区 通过自己实现库中的一些文件操作函数更加深入的理解文件缓冲区 自定义实现

    2024年02月10日
    浏览(56)
  • 8.缓冲区管理

    双缓冲区:TC+M 假设初始状态缓冲区1满,缓冲区2空,工作区为空。 刚开始缓冲区2为空,所以设备可以向缓冲区2中冲入数据耗时T,另一方面刚开始缓冲区1中是满的,所以刚开始就可以把缓冲区1中的数据传送到工作区中,M时刻工作区被充满,CPU就开始处理数据耗时C,处理完

    2024年02月11日
    浏览(40)
  • Redis 缓冲区

    缓冲区的应用场景 : 客户端与服务器端的通信时,暂存客户端发送的命令数据,或暂存服务器端返给客户端的数据结果 主从节点间进行数据同步时,暂存主节点接收的写命令和数据 缓冲区 : 避免客户端和服务器端的请求发送和处理速度不匹配 服务器给每个连接的客户端都准

    2024年02月07日
    浏览(68)
  • 理解缓冲区

    对于这样的代码,首先可以肯定的是 printf 语句先于 sleep 执行,既然如此那么就应该是先打印语句然后进行休眠,下面看看结果: 但这里却是先休眠以后再打印语句,这是因为存在一个叫缓冲区的东西,当我们要向外设写入数据(让显示器显示就是向显示器写入数据)时会将

    2023年04月25日
    浏览(71)
  • 【Linux】理解缓冲区

    我们发现 printf 和 fwrite (库函数)都输出了2次,而 write 只输出了一次(系统调用)。为什么呢?肯定和fork有关! C接口的函数被打印了两次系统接口前后只是打印了一次:和fork函数有关,fork会创建子进程。在创建子进程的时候,数据会被处理成两份,父子进程发生写时拷

    2024年01月23日
    浏览(52)
  • 【Linux】深入理解缓冲区

    目录 什么是缓冲区 为什么要有缓冲区 缓冲区刷新策略 缓冲区在哪里  手动设计一个用户层缓冲区 缓冲区本质上一块内存区域,用来保存临时数据。 缓冲区在各种计算任务中都广泛应用,包括输入/输出操作、网络通信、图像处理、音频处理等。 这块内存区域是由 谁提供的

    2024年02月15日
    浏览(62)
  • 【linux】重定向+缓冲区

    自我名言 : 只有努力,才能追逐梦想,只有努力,才不会欺骗自己。 喜欢的点赞,收藏,关注一下把! close(1),为什么没有打印新建文件fd呢? printf(“%dn”,fd); printf会把内容打印到stdout文件中。 但是close(1)关闭标准输出stdout—显示器,int fd=open();新打开的文件fd是1。 st

    2024年02月08日
    浏览(54)
  • SEED-缓冲区溢出攻击

    实验环境:SEED-Ubuntu20.04虚拟机 a) 缓冲区溢出原理 **缓冲区溢出攻击原理:**利用溢出的数据改变源程序的控制流,如覆盖返回地址 b) 分析生成badfile文件的exploit.py程序 Shellcode部分 字节数组末尾处填入shellcode c) 编译目标服务器上具有缓冲区溢出漏洞的stack.c程序,并将其缓冲

    2024年02月07日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包