迭代器模式 实现ES大量数据查询

这篇具有很好参考价值的文章主要介绍了迭代器模式 实现ES大量数据查询。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

项目需求 

要求

普通策略

升级策略:使用迭代器模式

迭代器模式组成

代码实现

查询实体

返回实体

实现类

代码测试

mock的ES返回结果json数据

第一次返回结果

第二次返回结果

第三次返回结果

postMan请求, 控制台打印结果


项目需求 

数据从Mysql 迁移到 Es,  Es查询数据默认fetch Size最大为10000条,如果查询超过1万条,需要通过scroll形式进行查询

要求

  1. 安全问题考虑,查询需要连接ES-ip:9200,不可使用第三方j a r
  2. 由于目前项目的查询方式是基于Mysql,为了减少改动,暂时使用sql进行查询
  3. 需要将结果以stream的形式进行返回,避免内存占用过大,以及瞬时的网络带宽问题

普通策略

  1. 进行第一次访问,然后取columns, rows 和 cursor
  2. 转化我们的第一次结果为map形式的json
  3. 拿第一次的corsor id 进行第二次访问
  4. 用第一次记录的columns 组装第二次的rows结果为map形式的json
  5. 以此类推

升级策略:使用迭代器模式

  1. 将我们访问封装到我们的迭代器中
  2. 第一次访问以迭代器的构造函数访问,初始化我们的columns, 供后续使用, 将第一次的结果转化为map 形式的json供迭代器使用 hasNext里面进行后续的多次访问

迭代器模式组成

  • iterator 抽象迭代器:负责定义访问和遍历元素的接口
  • concreteIterator 具体迭代器:实现迭代器接口,完成容器元素的遍历
  • aggreate抽象容器:容器角色负责提供创建决堤迭代器的角色接口,提供一个类似createiterator()这样的方法, 在Java中一般是 iterator()方法
  • concreteaggreate 具体容器:实现容器接口定义的方法

代码实现

查询实体


/**
 * 第一次查询用query和fetchSize, 后续用返回结果中的游标cursor
 */
@JsonIgnoreProperties
public class EsSqlQuery {
    /**
     * 调用ES的查询的sql
     */
    private String query;
    /**
     * 取出的条数
     */
    private Long fetchSize;
    /**
     * ES返回的游标
     */
    private String cursor;
    
    public EsSqlQuery(String cursor) {
        this.cursor = cursor;
    }

    public EsSqlQuery(String query, Long fetchSize) {
        this.query = query;
        this.fetchSize = fetchSize;
    }

    public String getQuery() {
        return query;
    }

    public void setQuery(String query) {
        this.query = query;
    }

    public Long getFetchSize() {
        return fetchSize;
    }

    public void setFetchSize(Long fetchSize) {
        this.fetchSize = fetchSize;
    }

    public String getCursor() {
        return cursor;
    }

    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}

返回实体


public class EsSqlResult {
    private List<Map<String,String>> columns;
    private List<List<Object>> rows;
    private String cursor;

    public List<Map<String, String>> getColumns() {
        return columns;
    }

    public void setColumns(List<Map<String, String>> columns) {
        this.columns = columns;
    }

    public List<List<Object>> getRows() {
        return rows;
    }

    public void setRows(List<List<Object>> rows) {
        this.rows = rows;
    }

    public String getCursor() {
        return cursor;
    }

    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}

实现类

@Component
public class EsQueryProcessor {

    // 1.用stream返回,节省内存
    public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) {
        return StreamSupport.stream(Spliterators
                .spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
    }

    // 2.用迭代器模式
    private class ScrollIterator implements Iterator<Map<String, Object>> {

        /**
         * 游标
         */
        private String scrollId;
        /**
         * 字段名集合
         */
        private List<String> columns;
        /**
         * 迭代元素
         */
        Iterator<Map<String, Object>> iterator;
        /**
         * 模拟访问次数
         */
        private int i = 1;

        // 2.1构造函数进行第一次查询, 初始化后续需要使用的columns和iterator,scrollId
        public ScrollIterator(String query, Long fetchSize) {
            // 模拟根据query和fetchSize从ES第一次获取数据
            String jsonName = "es1.json";
            EsSqlResult result = this.getEsSqlResult(jsonName);
            columns = CollStreamUtil.toList(result.getColumns(), u -> u.get("name"));
            this.scrollId = result.getCursor();
            this.iterator = convert(columns, result).iterator();

        }

        // 2.2根据 scrollId 是否为null进行后续访问,直到scrollId为null
        @Override
        public boolean hasNext() {
            return iterator.hasNext() || scrollNext();
        }

        private boolean scrollNext() {
            if (iterator == null || this.scrollId == null) {
                return false;
            }
            i++;
            // 模拟根据query和fetchSize从ES第i次获取数据
            String jsonName = "es" + i + ".json";
            EsSqlResult result = this.getEsSqlResult(jsonName);
            this.scrollId = result.getCursor();
            this.iterator = convert(columns, result).iterator();
            return iterator.hasNext();
        }

        @Override
        public Map<String, Object> next() {
            return iterator.next();
        }

        // 模拟从ES获取查询结果
        private EsSqlResult getEsSqlResult(String jsonName) {
            if (StrUtil.isBlank(jsonName)) {
                return null;
            }
            EsSqlResult result = null;
            List<File> fileList = FileUtil.loopFiles(ResourceUtil.getResource("json").getFile());
            for (File file : fileList) {
                if (jsonName.equals(file.getName())) {
                    String json = FileUtil.readString(file, Charset.forName("UTF-8"));
                    result = JSONObject.parseObject(json, EsSqlResult.class);
                }
            }
            return result;
        }

    }

    // 3.返回结果传统一点 List<Map>
    private List<Map<String, Object>> convert(List<String> columns, EsSqlResult result) {
        List<Map<String, Object>> results = new ArrayList<>();
        for (List<Object> row : result.getRows()) {
            Map<String, Object> map = new HashMap<>();
            for (int i = 0; i < columns.size(); i++) {
                map.put(columns.get(i), row.get(i));
            }
            results.add(map);
        }
        return results;
    }


}

代码测试

@RestController
public class EsController {

    @Autowired
    private EsService esService;

    @GetMapping("/es")
    public Boolean suggestRequirement() {
        return esService.query();
    }


}


public class EsService {

    @Autowired
    private EsQueryProcessor processor;

    public Boolean query() {
        Stream<Map<String, Object>> mapStream = processor.scrollEsStream(null, null);
        mapStream.forEach(x -> System.out.println(x));
        return true;
    }
}

mock的ES返回结果json数据

第一次返回结果

{
  "cursor": "safajfkajskjkasg",
  "columns": [
    {
      "name": "username",
      "type": "String"
    },
    {
      "name": "age",
      "type": "String"
    }
  ],
  "rows": [
    [
      "zhao",
      "11"
    ],
    [
      "qian",
      "22"
    ]
  ]
}

第二次返回结果

{
  "cursor": "safajfkajskjkasg",
  "rows": [
    [
      "sun",
      "33"
    ],
    [
      "li",
      "44"
    ]
  ]
}

第三次返回结果

{
  "rows": [
    [
      "zhou",
      "55"
    ],
    [
      "wu",
      "66"
    ]
  ]
}

postMan请求, 控制台打印结果

es查询数据总量,设计模式,迭代器模式文章来源地址https://www.toymoban.com/news/detail-809724.html

到了这里,关于迭代器模式 实现ES大量数据查询的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 项目实战:ES的增加数据和查询数据

    最近需要做一个有关查询聊天记录的功能,通过资料了解到使用ES可以方便我们快速查询内容。自己进行ES框架的搭建,感兴趣的可以看博客进行学习:https://blog.csdn.net/weixin_45309155/article/details/132686375?spm=1001.2014.3001.5501 ES搭建好之后就是应用了,下面就先总结一下关于最近在项

    2024年02月07日
    浏览(36)
  • ES查询时只能查询10000条数据解决方案

    方法1: 在restful请求时,解除索引最大查询数的限制 _all表示所有索引,如果针对单个索引的话修改成索引名称即可!!! 此时变可以查询300万条数据了,数据量太大可能存在超时问题,查询数据时加上超时参数。  方法2: 在创建索引的时候加上 方法3:         在查询时

    2024年02月11日
    浏览(47)
  • ES分词字典更新查询不到数据

    存储于es的文档数据将会被分词存储 例如: 当我们通过ik远程扩展词库增加自定义字典 : “ 词的 ”; 已经存在的数据将不会重新分词,例如上面案例中, 已经存在的数据,就不能通过新增的字典 “词的” 查询到 “分词的句子” 这条数据 利用如下命令刷新索引即可 Java

    2024年02月14日
    浏览(32)
  • Elasticsearch ES操作:查询数据(全部、分页、单条)

    查询 条件查询 指定条数 返回结果

    2024年02月16日
    浏览(39)
  • 解决ES只能查询10000条数据的问题

    这篇文章是翻译过来的,原文在此,需要科学上网。 当查询页很深或者查询的数据量很大时,深查询就会出现。es 的自我保护机制允许的一次最大查询量是 10000 条数据。在请求中加入 trackTotalHits(true) 可以解除10000条的上限。 from size 这种实现方式有点类似于 MySQL 中的 limit。

    2024年02月12日
    浏览(32)
  • es解决只能默认查询10000条数据方案

    在使用es进行数据查询时,由于es官方默认限制了索引一次性最多只能查询10000条数据,这其实是es的一种保护机制,那么很多时候我们需要突破这种限制,例如需要进入数据同步的场景,则需要查询全部的数据,如何处理呢? 方案1:在设置索引属性时解除索引最大查询数的限

    2024年02月11日
    浏览(44)
  • ES如何查询索引的全量数据

    问题描述 查询全表数据也是日常工作中常见的一种查询场景。 在ES如果我们使用match_all查询索引的全量数据时,默认只会返回10条数据。 那么在ES如何查询索引的全量数据呢? 小实验 1、索引和数据准备 PUT book {   \\\"mappings\\\": {     \\\"properties\\\": {       \\\"name\\\": {         \\\"type\\\": \\\"te

    2023年04月10日
    浏览(40)
  • es的数据存储结构;近实时查询原因

    es 存储中大体可以看成 index(表) + document(行记录) 组成 es 支持分布式存储,一个 index 会产生多个分片,保存在不同的实例上。其中分为 若干个主分片 和 副分片 。当主分片挂了,会切换到副分片,主分片和副分片的数据是一致的(写的时候先找主分片,读的时候是2者都可

    2024年02月09日
    浏览(37)
  • 商城项目-es的海量查询/聚合/数据同步

    1.sql表 用户数据库: tb_user:用户表,其中包含用户的详细信息 tb_address:用户地址表 商品数据库 tb_item:商品表 订单数据库 tb_order:用户订单表 tb_order_detail:订单详情表,主要是订单中包含的商品信息 tb_order_logistics:订单物流表,订单的收货人信息 2.模块搭建 feign-api:是

    2024年01月21日
    浏览(43)
  • es 在数据量很大的情况下(数十亿级别)如何提高查询效率?_es能存多少数据

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新软件测试全套学习资料》

    2024年04月26日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包