Java 实现对ES的Scroll读以及分片Scroll读--全量数据读取

这篇具有很好参考价值的文章主要介绍了Java 实现对ES的Scroll读以及分片Scroll读--全量数据读取。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java 实现对ES的Scroll读以及分片Scroll读

本文实现的是使用Java对ES索引全量数据的读取操作。ES版本是7.14。采用两种方式,一种是不分片读,一种是分片读。
对ES实现全量读取需要依赖到ES所提供的API,这里需要添加两个依赖

    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.14</version>
    </dependency>

    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>7.14</version>
      <scope>compile</scope>
    </dependency>

方式一不分片Scroll读取

public RestHighLevelClient initClient(String ip,String port,String scheme,String username,String password)
{
    int intPort = Integer.parseInt(port);
    HttpHost host=new HttpHost( ip, intPort, scheme);
    RestClientBuilder builder=RestClient.builder(host);
    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
    builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
    RestHighLevelClient client = new RestHighLevelClient(builder);
    return client;
}
    
public void testReadSpeed() throws IOException {
		//与es节点建立连接
        RestHighLevelClient esClient = initClient("192.168.11.137","9200","http","elastic","123456");
        
        int scrollSize = 1000;//一次读取的doc数量
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());//读取全量数据
        searchSourceBuilder.size(scrollSize);
        Scroll scroll1 = new Scroll(TimeValue.timeValueMinutes(3));//设置一次读取的最大连接时长

        String index1 = "alltype_test";//读取的索引名称
        SearchRequest searchRequest1 = new SearchRequest(index1);
        searchRequest1.types("_doc");
        searchRequest1.source(searchSourceBuilder);
        searchRequest1.scroll(scroll1);
        SearchResponse searchResponse1 = null;
        searchResponse1 = esClient.search(searchRequest1,RequestOptions.DEFAULT);
        String scrollId1 = searchResponse1.getScrollId();
        SearchHit[] hits1 = searchResponse1.getHits().getHits();

        long st = System.currentTimeMillis();

        while (hits1.length > 0) {
            try {
                SearchScrollRequest searchScrollRequest1 = new SearchScrollRequest(scrollId1);
                searchScrollRequest1.scroll(scroll1);
                SearchResponse searchScrollResponse1 = null;

                searchScrollResponse1 = esClient.searchScroll(searchScrollRequest1,RequestOptions.DEFAULT);
                scrollId1 = searchScrollResponse1.getScrollId();
                hits1 = searchScrollResponse1.getHits().getHits();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId1);
        try {
            esClient.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("读取用时:" + (System.currentTimeMillis() - st));
    }

方式二分片读取

分片就是将一个索引的全量数据分成几块,对每个块分别生成一个Scroll对象来读取。分片的数量最佳是根据索引的shards数来定。

/**
*读线程
*/
class ReadTread extends Thread{
        private RestHighLevelClient esClient;
        private SliceBuilder slice;
        private int scrollSize;
        private String index;

        public ReadTread(RestHighLevelClient esClient, SliceBuilder slice, int scrollSize, String index){
            this.esClient = esClient;
            this.slice = slice;
            this.scrollSize = scrollSize;
            this.index = index;
        }

        @Override
        public void run() {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.matchAllQuery());
            searchSourceBuilder.size(scrollSize);
            Scroll scroll1 = new Scroll(TimeValue.timeValueMinutes(3));

            SearchRequest searchRequest1 = new SearchRequest(index);
            searchRequest1.types("_doc");
            searchRequest1.source(searchSourceBuilder.slice(slice).sort("_doc"));
            searchRequest1.scroll(scroll1);
            SearchResponse searchResponse1 = null;
            try {
                searchResponse1 = esClient.search(searchRequest1,RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            String scrollId1 = searchResponse1.getScrollId();
            SearchHit[] hits1 = searchResponse1.getHits().getHits();

            while (hits1.length > 0) {
                try {
                    SearchScrollRequest searchScrollRequest1 = new SearchScrollRequest(scrollId1);
                    searchScrollRequest1.scroll(scroll1);
                    SearchResponse searchScrollResponse1 = null;

                    searchScrollResponse1 = esClient.searchScroll(searchScrollRequest1,RequestOptions.DEFAULT);
                    scrollId1 = searchScrollResponse1.getScrollId();
                    hits1 = searchScrollResponse1.getHits().getHits();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId1);
            try {
                esClient.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

public RestHighLevelClient initClient(String ip,String port,String scheme,String username,String password)
{
    int intPort = Integer.parseInt(port);
    HttpHost host=new HttpHost( ip, intPort, scheme);
    RestClientBuilder builder=RestClient.builder(host);
    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
    builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
    RestHighLevelClient client = new RestHighLevelClient(builder);
    return client;
}

public void testSearchScrollSlice() throws IOException {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);//创建一个大小为4的线程池
        RestHighLevelClient esClient = initClient("192.168.11.137","9200","http","elastic","123456");
        
        int scrollSize = 1000;
        String index1 = "alltype";
        GetSettingsRequest settingsRequest = new GetSettingsRequest().indices(index1);
		//获取索引的shards数量
        settingsRequest.names("index.number_of_shards");
        GetSettingsResponse getSettingsResponse = esClient.indices().getSettings(settingsRequest,RequestOptions.DEFAULT);
        String num = getSettingsResponse.getSetting(index1, "index.number_of_shards");
        int shardnum = Integer.parseInt(num);

        long st = System.currentTimeMillis();

        for(int i=0; i<shardnum; i++) {
            SliceBuilder slice = new SliceBuilder(i,shardnum);
            ReadTread readTread = new ReadTread(esClient, slice, scrollSize, index1, count);
            executor.schedule(readTread, 0, TimeUnit.MILLISECONDS);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
        }

        System.out.println("读取用时:" + (System.currentTimeMillis() - st));

    }

创作不易,点赞关注予我动力,有问题欢迎留言和私信!文章来源地址https://www.toymoban.com/news/detail-504460.html

到了这里,关于Java 实现对ES的Scroll读以及分片Scroll读--全量数据读取的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Elasticsearch集群搭建、数据分片以及位置坐标实现附近的人搜索

    es使用两种不同的方式来发现对方: 广播 单播 也可以同时使用两者,但默认的广播,单播需要已知节点列表来完成 当es实例启动的时候,它发送了广播的ping请求到地址 224.2.2.4:54328 。而其他的es实例使用同样的集群名称响应了这个请求。 一般这个默认的集群名称就是上面的

    2024年02月06日
    浏览(50)
  • ElasticSearch学习笔记-第四章 ES分片原理以及读写流程详解

    在学习ES分片原理以及读写流程之前,需要先学习一些ES的核心概念以及ES集群环境的相关知识 4.1 ES核心概念 4.1.1 索引 索引(Index)相当于MySQL中的数据库,一个索引就是一个拥有几分相似特征的文档的集合。 4.1.2 类型 类型(Type)相当于MySQL中的表,一个类型就是索引的一个逻辑上

    2024年02月06日
    浏览(59)
  • Java实现读取转码写入ES构建检索PDF等文档全栈流程

    之前已简单使用ES及Kibana和在线转Base64工具实现了检索文档的demo,并已实现WebHook的搭建和触发流程接口。 传送门: 基于GitBucket的Hook构建ES检索PDF等文档全栈方案 使用ES检索PDF、word等文档快速开始 总体思路:基于前面已经搭建的WebHook触发流程,接收到push更新消息之后,使用

    2024年03月10日
    浏览(56)
  • java使用ElasticSearch的scroll查询,高效的解决es查询数量的限制。

    (1)首先我们要明白es的查询机制:ES的搜索是分2个阶段进行的,即 Query阶段和Fetch阶段 。 Query阶段 比较轻量级,通过查询倒排索引,获取满足查询结果的文档ID列表。 Fetch阶段 比较重,需要将每个分片的查询结果取回,在协调结点进行 全局 排序。 通过From+size这种方式分批

    2024年02月03日
    浏览(82)
  • 使用kettle同步全量数据到Elasticsearch(es)--elasticsearch-bulk-insert-plugin应用

    为了前端更快地进行数据检索,需要将数据存储到es中是一个很不错的选择。由于公司etl主要工具是kettle,这里介绍如何基于kettle的elasticsearch-bulk-insert-plugin插件将数据导入es。在实施过程中会遇到一些坑,这里记录解决方案。 可能会遇到的报错: 1、No elasticSearch nodes found 2、

    2024年02月01日
    浏览(69)
  • ES检索结果高亮显示JAVA以及Kibana实现

    对比做了高亮前后的结果返回: 高亮前: 高亮后: 可以看到加入高亮的代码之后返回的json串命中的被套了一层em style=‘color: red’xxx/em标签,也就是我们前置设置的preTags与postTags; 当然hightlight本身支持多个字段高亮,java代码实现只要设置多个     后续查询出结果之

    2024年02月11日
    浏览(40)
  • ElasticSearch进阶:多种查询操作,各种ES查询以及在Java中的实现

    目录 前言 1 词条查询 1.1 等值查询-term 1.2 多值查询-terms 1.3 范围查询-range 1.4 前缀查询-prefix 1.5 通配符查询-wildcard 2 复合查询 2.1 布尔查询 2.2 Filter查询 3 聚合查询 3.1 最值、平均值、求和 3.2 去重查询 3.3 分组聚合 3.3.1 单条件分组 3.3.2 多条件分组 3.4 过滤聚合 ElasticSearch 第一篇

    2024年02月02日
    浏览(50)
  • E往无前 | 海量数据ES 扩展难?腾讯云大数据ES 扩展百万级分片也“So Easy~”

    《E往无前》系列将着重展现腾讯云ES在持续深入优化客户所关心的「省!快!稳!」诉求,能够在低成本的同时兼顾高可用、高性能、高稳定等特性,可以满足微盟、小红书、微信支付等内外部大客户的核心场景需求。 E往无前 | 海量数据ES扩展难?腾讯云ES 扩展百万级分片

    2024年02月06日
    浏览(86)
  • ElasticSearch系列 - SpringBoot整合ES:实现分页搜索 from+size、search after、scroll

    01. 数据准备 ElasticSearch 向 my_index 索引中索引了 12 条文档: 02. ElasticSearch 如何查询所有文档? ElasticSearch 查询所有文档 根据查询结果可以看出,集群中总共有12个文档,hits.total.value=12, 但是在 hits 数组中只有 10 个文档。如何才能看到其他的文档? 03. ElasticSearch 如何指定搜

    2023年04月08日
    浏览(44)
  • ES是一个分布式全文检索框架,隐藏了复杂的处理机制,核心数据分片机制、集群发现、分片负载均衡请求路由

    ES是一个分布式框架,隐藏了复杂的处理机制,核心数据分片机制、集群发现、分片负载均衡请求路由。 ES的高可用架构,总体如下图: 说明:本文会以pdf格式持续更新,更多最新尼恩3高pdf笔记,请从下面的链接获取:语雀 或者 码云 ES基本概念名词 Cluster 代表一个集群,集

    2024年02月10日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包