一个通用的es聚合查询方法

这篇具有很好参考价值的文章主要介绍了一个通用的es聚合查询方法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 /**
     * 一个通用的es聚合查询方法
     * @param restHighLevelClient
     * @param start
     * @param end
     * @param index
     * @param aggregationName
     * @param painlessScript
     * @param existsQueryParams
     * @param notExistsQueryParams
     * @return
     * @throws IOException
     */
    private List<String> queryFromEs(RestHighLevelClient restHighLevelClient, String start, String end, String[] index, String aggregationName, String painlessScript, List<String> existsQueryParams, List<String> notExistsQueryParams) throws IOException {
        List<String> resultList = new ArrayList<>();
        //创建一个搜索源
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //SearchRequest按一个或多个索引查询,需要一个SearchSourceBuilder,搜索源提供了搜索选项
        SearchRequest searchRequest = new SearchRequest();
        //聚合的条件
        TermsAggregationBuilder aggregation = AggregationBuilders.terms(aggregationName).script(new Script(painlessScript)).size(Integer.MAX_VALUE).minDocCount(1).shardMinDocCount(0).showTermDocCountError(false);
        //添加bool过滤器,进行条件查询
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        //must --时间必须满足
        boolQueryBuilder.must(QueryBuilders.rangeQuery("TIME").gte(start).lte(end));
        //存在的条件
        for (String existsQueryParam : existsQueryParams) {
            boolQueryBuilder.must(QueryBuilders.existsQuery(existsQueryParam));
        }
        //不存在的条件
        for (String notExistsQueryParam : notExistsQueryParams) {
            boolQueryBuilder.mustNot(QueryBuilders.existsQuery(notExistsQueryParam));
        }
        //定义sourceBuilder按时间排序,正序,再传入之前的查询条件,from 0 size 0 不查原始数据
        sourceBuilder.sort("TIME", SortOrder.ASC).from(0).size(0).query(boolQueryBuilder).aggregation(aggregation);
        //定义查询的索引,定义搜索源,即sourceBuilder对象
        searchRequest.indices(index);
        searchRequest.source(sourceBuilder);
        log.info("-query from es index:{}, sql:{}-->", index, sourceBuilder);
        //开始搜索,拿到结果
        SearchResponse response = null;
        response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        //拿到聚合数据
        Aggregations aggregations = response.getAggregations();
        if (aggregations != null) {
            ParsedStringTerms empAccountAggregationsTerms = aggregations.get(aggregationName);
            List<? extends Terms.Bucket> buckets = empAccountAggregationsTerms.getBuckets();
            for (Terms.Bucket bucket : buckets) {
                log.info("--query from es index results:-->" + bucket.getKey().toString());
                resultList.add(bucket.getKey().toString());
            }
        }
        return resultList;
    }
2023-05-23更新 上面的条件是存在数据里,有定时任务触发

调整方法如下:

private void queryFromEs2Kafka(List<String> index, DmssScheduleJobPO dmssScheduleJobPO, EsDiscoverConfig esDiscoverConfig, String startTime, String endTime, RestHighLevelClient restHighLevelClient) throws IOException, ExecutionException, InterruptedException {
  JSONObject jsonObject = JSON.parseObject(esDiscoverConfig.getSearchCondition());
  JSONObject painless_aggregations = jsonObject.getJSONObject("aggregations").getJSONObject("painless_aggregations");
  log.debug("类型:" + dmssScheduleJobPO.getJobType() + "==jsonObject:==" + jsonObject);
  log.debug("==queryStr:==" + jsonObject.getString("query"));
  SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  searchSourceBuilder.query(QueryBuilders.wrapperQuery(jsonObject.getString("query")));
  // 添加时间范围限制
  RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("time")
          .from(startTime)
          .to(endTime)
          .includeLower(true)
          .includeUpper(true);
  searchSourceBuilder.postFilter(rangeQueryBuilder);
  // 创建一个脚本对象
  Script script = new Script(ScriptType.INLINE, "painless", painless_aggregations.getJSONObject("terms").getJSONObject("script").getString("source"), Collections.emptyMap());
  // 创建一个 TermsAggregationBuilder 对象,并将脚本对象传递给 script 方法
  TermsAggregationBuilder termsAggBuilder = AggregationBuilders.terms("painless_aggregations")
          .script(script)
          .size(2147483647)
          .minDocCount(1)
          .shardMinDocCount(0)
          .showTermDocCountError(false);
  // 将聚合条件添加到 searchSourceBuilder 中
  searchSourceBuilder.aggregation(termsAggBuilder);
  searchSourceBuilder.from(0);
  searchSourceBuilder.size(0);
  log.info("--自动发现配置化请求参数:--" + searchSourceBuilder);
  //创建一个请求
  SearchRequest searchRequest = new SearchRequest().indices(index.toArray(new String[]{}))
          .source(searchSourceBuilder);
  log.info("--自动发现2kafka配置化请求参数:--" + searchSourceBuilder);
  //发送请求
  SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  //拿到聚合数据
  Aggregations aggregations = response.getAggregations();
  if (aggregations != null) {
    send2KafkaUseEsResult(response, aggregations, dmssScheduleJobPO);
  }
}

getSearchCondition为数据库中配置的条件:

{"query":{"bool":{"must":[{"exists":{"field":"db_type","boost":1}},{"exists":{"field":"dst_ip","boost":1}},{"exists":{"field":"dst_port","boost":1}},{"exists":{"field":"dst_db_name","boost":1}},{"exists":{"field":"dst_db_table","boost":1}},{"exists":{"field":"dst_asset_name","boost":1}},{"exists":{"field":"dst_infosys_name","boost":1}}],"adjust_pure_negative":true,"boost":1}},"sort":[{"time":{"order":"asc"}}],"aggregations":{"painless_aggregations":{"terms":{"script":{"source":"if (doc.containsKey('dst_db_column') && doc['dst_db_column'].size() > 0) {return  doc['_index'].value + '@_@'+doc['db_type'].values+'@_@'+doc['dst_ip'].values +'@_@'+ doc['dst_port'].values +'@_@'+ doc['dst_db_name'].values+'@_@'+ doc['dst_db_table'].values+'@_@'+ doc['dst_db_column'].values+'@_@'+doc['dst_asset_name.keyword'].values+'@_@'+doc['dst_infosys_name.keyword'].values; } else {return  doc['_index'].value + '@_@'+doc['db_type'].values+'@_@'+doc['dst_ip'].values +'@_@'+ doc['dst_port'].values +'@_@'+ doc['dst_db_name'].values+'@_@'+ doc['dst_db_table'].values+'@_@'+doc['dst_asset_name.keyword'].values+'@_@'+doc['dst_infosys_name.keyword'].values;}","lang":"painless"},"size":2147483647,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false}}}}文章来源地址https://www.toymoban.com/news/detail-475456.html

到了这里,关于一个通用的es聚合查询方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • es使用和常用查询(包含多字段聚合查询,实体类方式保存es)

    1.导入es相关jar包 2.增加es配置 3.读取es相关配置   4.创建es实体类,与es mapping设计一致  5.创建es结构  6.创建类继承 ElasticsearchRepository 实现通过api保存实体类ESData到es  7.保存实体类ESData到es  8.es查询 多字段匹配查询,分组查询,分组后聚合

    2024年02月03日
    浏览(49)
  • 商城项目-es的海量查询/聚合/数据同步

    1.sql表 用户数据库: tb_user:用户表,其中包含用户的详细信息 tb_address:用户地址表 商品数据库 tb_item:商品表 订单数据库 tb_order:用户订单表 tb_order_detail:订单详情表,主要是订单中包含的商品信息 tb_order_logistics:订单物流表,订单的收货人信息 2.模块搭建 feign-api:是

    2024年01月21日
    浏览(43)
  • Elasticsearch专栏-6.es基本用法-聚合查询

    在es中,所有的聚合查询都是放在aggs中进行的。平均值、总和、最大、最小、数量对应的分别是:avg、sum、max、min、value_count 分组用到的是terms 上面语句中的size:3,指的是分组后,只展示前三个分组内容。size:0,指的是所有query查询结果,也就是原始数据,不需

    2024年02月09日
    浏览(39)
  • ES聚合查询 基于RestHighLevelClient依赖 Java操作

    一、介绍 (偏自我理解)         1.ES聚合查询通用流程                 1.分组 ( 好比Mysql --- group by )                 2.组内聚合 也叫 组内指标( 好比Mysql --- SUM()、COUNT()、AVG()、MAX()、MIN() )         2.桶(我要是es开发者,我起名叫啥都行)                 1.满足特

    2024年02月06日
    浏览(48)
  • kibana es创建模板,索引,导入数据,简单聚合查询

    1.创建模板 2.获取模板

    2024年02月13日
    浏览(54)
  • ES设置最大查询条数限制,打破限制,聚合分组数量限制打破

    ​ 今天在做 ElasticSearch 进行查询的时候发现,在进行分页的时候,数据超出10000以后得页数,查询的时候会报错。后查询了 es 官方文档发现,查询数量的默认值是 10000 ; 官网链接:https://www.elastic.co/guide/en/elasticsearch/reference/7.8/index-modules.html#dynamic-index-settings 报错内容: 在

    2024年02月03日
    浏览(39)
  • 一个神奇的SQL聚合查询案例

    今天给大家分享一个 SQL 案例,假如存在以下两个表: 每个表包含 3 条数据。请问,以下查询返回结果是什么? 不同数据库对于上面的查询实现并不一致,大体可以分为两种。 对于 MySQL、SQL Server、PostgreSQL 以及 SQLite 而言,查询结果如下: 它们的实现逻辑如下: 也就是说,

    2024年02月02日
    浏览(37)
  • 当es使用script脚本查询聚合等操作遇到空字段报错问题解决方案

            在使用ES的脚本时,如果脚本中引用了不存在或者空的字段,则会导致脚本执行失败并抛出错误。这是因为ES会在脚本执行之前尝试检索引用的字段,如果该字段不存在则会抛出异常。         因此,在使用ES脚本时,需要确保所引用的字段都存在且不为空。可

    2024年02月11日
    浏览(54)
  • ES中使用 Top Hits 查询分桶聚合结果的每个桶的详细数据

    Top hits(顶部命中)是一个聚合功能,用于在查询结果中返回每个桶(bucket)中的顶部 N 个文档。这对于需要在聚合结果中查看每个桶中的最相关或最高评分文档的情况非常有用。 简单来说,Top Hits 就是对聚合结果中相关文档的详细展示,它不同于 Post Filter,Post Filter 是基于

    2024年02月07日
    浏览(35)
  • 原生语言操作和spring data中RestHighLevelClient操作Elasticsearch,索引,文档的基本操作,es的高级查询.查询结果处理. 数据聚合.相关性系数打分

    ​ Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elasti

    2024年02月05日
    浏览(87)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包