使用Elasticsearch进行分组聚合统计

这篇具有很好参考价值的文章主要介绍了使用Elasticsearch进行分组聚合统计。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

要使用Elasticsearch进行分组聚合统计,可以使用聚合(aggregation)功能。聚合操作允许您根据指定的条件对文档进行分组,并计算每个分组的聚合结果。

针对普通类型的字段,DSL构建语法:

{
  "aggs": {
    "agg_name": {
      "agg_type": {
        "agg_parameters"
      }
    },
    "agg_name2": {
      "agg_type": {
        "agg_parameters"
      }
    },
    ...
  }
}

aggs: aggregations关键字的别名,代表着分组

agg_name: 这个是自定义的名字,可以针对你自己的字段命名一个,最好加上_agg后缀

agg_type: 聚合类型

agg_parameters:聚合参数

聚合类型(agg_type)

Elasticsearch中支持多种聚合类型(agg_type)用于不同的聚合操作。以下是一些常用的聚合类型及其功能:

  1. Terms(词条聚合):按照字段值进行分组,统计每个分组的文档数量。
  2. Sum(求和聚合):计算指定字段的总和。
  3. Avg(平均值聚合):计算指定字段的平均值。
  4. Min(最小值聚合):找出指定字段的最小值。
  5. Max(最大值聚合):找出指定字段的最大值。
  6. Stats(统计聚合):计算指定字段的统计信息,包括最小值、最大值、总和、平均值和文档数量。
  7. Extended Stats(扩展统计聚合):计算指定字段的扩展统计信息,包括最小值、最大值、总和、平均值、标准差和文档数量。
  8. Cardinality(基数聚合):计算指定字段的唯一值数量。
  9. Date Histogram(日期直方图聚合):按照时间间隔对日期字段进行分组。
  10. Range(范围聚合):将文档按照指定范围进行分组,例如按照价格范围、年龄范围等。
  11. Nested(嵌套聚合):在嵌套字段上执行子聚合操作。

除了上述示例外,Elasticsearch还提供了更多聚合类型,如Geo Distance(地理距离聚合)、Date Range(日期范围聚合)、Filter(过滤聚合)等。

聚合参数(agg_parameters)

在Elasticsearch中,聚合(aggregation)可以使用不同的参数来控制其行为和结果。以下是一些常用的聚合参数:

1. field(字段):指定要聚合的字段。
2. size(大小):限制返回的聚合桶的数量。
3. script(脚本):使用脚本定义聚合逻辑。
4. min_doc_count(最小文档数量):指定聚合桶中文档的最小数量要求。
5. order(排序):按照指定字段对聚合桶进行排序。
6. include/exclude(包含/排除):根据指定的条件包含或排除聚合桶。
7. format(格式):对聚合结果进行格式化。
8. precision_threshold(精度阈值):用于基数聚合的精度控制。
9. interval(间隔):用于日期直方图聚合的时间间隔设置。
10. range(范围):用于范围聚合的范围定义。

具体可用的参数取决于聚合类型和使用的Elasticsearch版本。

DSL查询实践

准备工具: Kibana或者Elasticvue

在这里,我使用Elasticvue

网址:Elasticvue - Elasticsearch gui for the browser

这个工具我是装在火狐上的,连接上后能看到节点信息、集群健康、索引信息等等,也支持REST查询,类似在Kibana使用Devtools差不多。

elasticsearch聚合统计,Elasticsearch,elasticsearch,大数据,搜索引擎

单个分组DSL查询, 求分组后的平均值

{
  "size": 0,
  "aggs": {
    "id_agg": {
      "terms": {
        "field": "id", 
        "size": 3 #在有的情况下,如果你的文档数量太多,会导致查询超时、返回数据过多的问题
      },
      "aggs": {
        "sub_id_agg": { 
          "terms": { #匹配搜索
            "field": "id"
          }
        }
      }
    }
  }
}

 

 这张图上面有几个关键信息

`/orderv4/order/_search` 是一个 Elasticsearch 的 REST API 端点,用于执行针对名为 `order` 的索引的搜索操作。

  1. /orderv4/order: 表示索引的名称是 `orderv4`,类型(Type)的名称是 `order`。
  2. 在较新的 Elasticsearch 版本中,类型的概念已经逐渐被弃用,因此索引名称后面的 `/order` 可以省略。
  3. _search: 表示执行搜索操作。

左侧是DSL请求体,右边是返回结果

took: 执行搜索的时间,单位是毫秒

timed_out:搜索是否超时

_shards:分片执行情况,这里的total代表参与搜索的总分片数

hits:和搜索文档匹配的文档信息,total代表和搜索条件匹配的总文档数

aggregations:里面是聚合结果,id_agg是刚才在dsl查询的时候设置的聚合名称,sum_other_doc_count代表除了bucket里面的文档数量,还有多少条没有展示。buckets里面的key就是文档里面的id的值是多少,doc_count 表示文档数量,换句话来说就是,id = 0 的数量为 1

使用Java构建分组查询

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

// 添加聚合操作
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("id_agg").field("id");
aggregationBuilder.subAggregation(AggregationBuilders.terms("sub_id_agg").field("id"));

searchSourceBuilder.aggregation(aggregationBuilder);

基于nested嵌套类型分组查询

nested(嵌套)是一种特殊的数据类型和查询方式,用于处理嵌套文档结构。它允许在文档中嵌套其他文档,并以一种有层次结构的方式进行索引和查询。

在使用nested查询的时候,先要对你的索引设置Mapping配置。把字段类型设置为nested。

一种是在建索引的时候,就配好Mapping,另外一种方式是直接对索引文档更新。

POST youer_index/_mapping/your_type
{
    "properties":{
        "item_list":{ # 在Java的ESDO模型里,就代表了一个List<Item>, Item是你自己定义的业务对象
            "type":"nested", #给item_list设置嵌套类型
            "properties":{
                "id":{
                    "type":"long"
                },
                "name":{
                    "type":"string"
                },
                "price":{
                    "type":"long"
                }
            }
        }
    }
}

nested字段DSL查询案例

{
    "aggregations":{
        "item_list_agg":{
            "nested":{
                "path":"item_list" # 字段路径必须,不然查不出结果
            },
            "aggs":{
                "sub_item_list_agg":{
                    "terms":{
                        "field":"item_list.id"
                    }
                }
            }
        }
    }
}

使用Java构建nested分组查询

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 添加聚合操作
AggregationBuilder nested = AggregationBuilders.nested("id_nested_agg", "item_list");
        
// 构建一个terms
TermsAggregationBuilder terms = AggregationBuilders.terms("id_nested_sub_agg").field("id");
        
// 将terms加到nested中
nested.subAggregation(terms);
        
// 添加到最终的查询中
searchSourceBuilder.aggregation(nested);

更多的案例,如果有兴趣的朋友可以自己摸索。下面我就分享一个实战中,如何用Java针对普通字段类型和nested字段类型构建查询语句,同时支持返回多个字段值。

import com.github.houbb.heaven.util.lang.StringUtil;
import org.apache.commons.collections.CollectionUtils;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author : kenny
 * @since : 2023/5/18
 **/
public class AggregationBuilderExample {
    /**
     * 构造一个单桶分组查询,支持普通字段类型和nested字段类型
     * @param aggregationFields
     * @return
     */
    public static AggregationBuilder buildSingeBucketAggregationBuilder(List<String> aggregationFields) {
        if (CollectionUtils.isEmpty(aggregationFields)) {
            throw new RuntimeException("Aggregate search requires aggregate fields!");
        }

        aggregationFields = aggregationFields.stream().filter(StringUtil::isNotEmpty).collect(Collectors.toList());
        String aggregationField = aggregationFields.get(0);
        int dotIndex = aggregationField.indexOf(".");
        AggregationBuilder aggregationBuilder;
        if (dotIndex != -1) {
            String path = aggregationField.substring(0, dotIndex);
            aggregationBuilder = AggregationBuilders.nested(aggregationField + "nested_agg", path);
            AggregationBuilder nestedTerms = AggregationBuilders.terms(aggregationField).field(aggregationField).size(1000);
            aggregationBuilder = aggregationBuilder.subAggregation(nestedTerms);
        }else {
            aggregationBuilder = AggregationBuilders.terms(aggregationField + "_agg").field(aggregationField).size(1000);
        }

        return aggregationBuilder;
    }


    /**
     * 构造一个多桶分组查询,支持普通字段类型和nested字段类型
     * @param aggregationFields
     * @return
     */
    public static List<AggregationBuilder> buildMultiplexBucketAggregationBuilder(List<String> aggregationFields){
        if (CollectionUtils.isEmpty(aggregationFields)) {
            throw new RuntimeException("Aggregate search requires aggregate fields!");
        }

        aggregationFields = aggregationFields.stream().filter(StringUtil::isNotEmpty).collect(Collectors.toList());
        List<AggregationBuilder> aggregations = new ArrayList<>();
        for (String field : aggregationFields){
            int dotIndex = field.indexOf(".");
            AggregationBuilder aggregationBuilder;
            if (dotIndex != -1) {
                String path = field.substring(0, dotIndex);
                aggregationBuilder = AggregationBuilders.nested(field + "_nested_agg", path);
                AggregationBuilder nestedTerms = AggregationBuilders.terms(field).field(field).size(1000);
                aggregationBuilder = aggregationBuilder.subAggregation(nestedTerms);
            }else {
                aggregationBuilder = AggregationBuilders.terms(field).field(field).size(1000);
            }

            aggregations.add(aggregationBuilder);
        }

        return aggregations;
    }
}

针对于结果的解析我们同样也构造一个解析方法文章来源地址https://www.toymoban.com/news/detail-553373.html


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author : kenny
 * @since : 2023/5/18
 **/
public class AggregationResultParserExample {

    /**
     * 针对单桶聚合统计
     * @param json ES执行搜索之后返回的MetricAggregation信息
     * @return
     */
    public static Map<String, Integer> parseSingleBucketAggregations(String json) {
        JSONObject jsonObject = JSONObject.parseObject(json);
        if (jsonObject == null){
            return null;
        }

        Map<String, Integer> resultMap = new HashMap<>();
        try {
            internal_ParseSingBucketAggregations(jsonObject, resultMap);
        }catch (Exception ex){
            // 处理你自己的异常
        }
        return resultMap;
    }


    private static void internal_ParseSingBucketAggregations(JSONObject jsonObject, Map<String, Integer> map) {
        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();

            if (value instanceof JSONObject) {
                JSONObject childObject = (JSONObject) value;
                if (childObject.containsKey("key") && childObject.containsKey("doc_count")) {
                    String childKey = childObject.getJSONObject("key").getString("value");
                    int docCount = childObject.getJSONObject("doc_count").getIntValue("value");
                    map.put(childKey, docCount);
                }
                internal_ParseSingBucketAggregations(childObject, map);
            } else if (value instanceof JSONArray) {
                JSONArray childArray = (JSONArray) value;
                for (Object element : childArray) {
                    if (element instanceof JSONObject) {
                        JSONObject childObject = (JSONObject) element;
                        internal_ParseSingBucketAggregations(childObject, map);
                    }
                }
            }
        }
    }

    /**
     * 解析多桶分组统计
     * @param json ES执行搜索之后返回的MetricAggregation信息
     * @return
     */
    public static Map<String, List<Map<String, Object>>> parseMultiplexBucketAggregations(String json) {
        JSONObject jsonRoot = JSONObject.parseObject(json);
        if (jsonRoot == null){
            return Collections.emptyMap();
        }

        Map<String, List<Map<String, Object>>> resultMap = new HashMap<>();
        try {
            internal_ParseMultiplexBucketAggregations(jsonRoot, "", resultMap);
        }catch (Exception ex){
            // 处理你自己的异常
        }

        return resultMap;
    }

    private static void internal_ParseMultiplexBucketAggregations(JSONObject jsonObject, String prefix, Map<String, List<Map<String, Object>>> resultMap) {
        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();

            if (value instanceof JSONObject) {
                JSONObject childObject = (JSONObject) value;
                if (childObject.containsKey("buckets")) {
                    List<Map<String, Object>> bucketList = new ArrayList<>();
                    JSONArray buckets = childObject.getJSONObject("buckets").getJSONArray("elements");
                    for (int i = 0; i < buckets.size(); i++) {
                        JSONObject bucket = buckets.getJSONObject(i);
                        Map<String, Object> bucketMap = new HashMap<>();
                        JSONObject bucketMembers = bucket.getJSONObject("members");
                        for (Map.Entry<String, Object> bucketEntry : bucketMembers.entrySet()) {
                            String bucketKey = bucketEntry.getKey();
                            Object bucketValue = bucketEntry.getValue();
                            if (bucketValue instanceof JSONObject) {
                                JSONObject valueObject = (JSONObject) bucketValue;
                                if (valueObject.containsKey("value")) {
                                    bucketMap.put(bucketKey, valueObject.get("value"));
                                }
                            }
                        }
                        bucketList.add(bucketMap);
                    }

                    resultMap.put(prefix + key, bucketList);
                } else {
                    internal_ParseMultiplexBucketAggregations(childObject, prefix + key + "_", resultMap);
                }
            }
        }
    }
}

到了这里,关于使用Elasticsearch进行分组聚合统计的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 五、浅析[ElasticSearch]底层原理与分组聚合查询

    集群节点介绍 es配置文件夹中 客户端节点 当主节点和数据节点配置都设置为false的时候,该节点只能处理路由请求,处理搜索,分发索引操作等,从本质上来说该客户节点表现为智能负载平衡器。独立的客户端节点在一个比较大的集群中是非常有用的,他协调主节点和数据节

    2024年02月16日
    浏览(46)
  • ElasticSearch分组统计查询

    maven依赖: 构建配置类: 根据两个字段进行统计: 实体定义: 创建索引文件:

    2024年02月02日
    浏览(45)
  • ElasticSearch 聚合统计

    度量聚合:求字段的平均值,最小值,最大值,总和等 桶聚合:将文档分成不同的桶,桶的划分可以根据字段的值,范围,日期间隔 管道聚合:在桶聚合的结果上执行进一步计算 进行聚合的语法如下 聚合也可以进行嵌套 平均值聚合 在 ElasticSearch 中进行聚合统计时,默认情

    2024年02月04日
    浏览(32)
  • ElasticSearch 分组统计,每组取最新数据

    ElasticSearch按任务id分组统计 查询方法: 任务ID一个,网站ID若干 求: 按网站ID分组,crawTotal最大,且时间为最新的一条数据。

    2024年02月10日
    浏览(47)
  • Elasticsearch 查询和聚合查询:基本语法和统计数量

    摘要:Elasticsearch是一个强大的分布式搜索和分析引擎,提供了丰富的查询和聚合功能。本文将介绍Elasticsearch的基本查询语法,包括预发查询和聚合查询,以及如何使用聚合功能统计数量。 Elasticsearch是一种开源的分布式搜索和分析引擎,广泛应用于各种场景,包括日志分析、

    2024年02月11日
    浏览(48)
  • Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

    官方提供了两个客户端elasticsearch、elasticsearch-dsl 第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。 封装后依然暴露

    2024年02月14日
    浏览(41)
  • 4.4 使用分组聚合进行组内计算

    该方法提供的是分组聚合步骤中的拆分功能,能根据索引或字段对数据进行分组 DataFrame.groupby(by=None, axis=0, level=None, as_index=True, sort=True, group_keys=True, squeeze=False, **kwargs) by参数的特别说明 如果传入的是一个函数则对索引进行计算并分组。 如果传入的是一个字典或者Series则字典

    2023年04月23日
    浏览(39)
  • 使用 Java 流进行分组和聚合,高效处理大量数据不再是梦!

    了解使用 Java Streams 解决问题的直接途径,Java Streams 是一个允许我们快速有效地处理大量数据的框架。 当我们对列表中的元素进行分组时,我们可以随后聚合分组元素的字段以执行有意义的操作,帮助我们分析数据。一些示例是加法、平均值或最大值/最小值。这些单个字段

    2024年02月07日
    浏览(44)
  • Elasticsearch使用篇 - 管道聚合

    基于前一次聚合的结果,进行二次聚合统计。 从结构上可以分为兄弟级(Sibling)管道聚合和父级(Parent)管道聚合两种方式。 兄弟级管道聚合:在同一聚合级别上可以产生新的聚合。 父级管道聚合:由父聚合提供输出,子聚合能够产生新的桶,然后可以添加到父桶中。 基

    2024年02月06日
    浏览(39)
  • Elasticsearch(三)聚合基本使用

    基础概念 bucket 数据分组,一些数据按照某个字段进行bucket划分,这个字段值相同的数据放到一个bucket中。可以理解成Java中的MapString, List结构,类似于Mysql中的group by后的查询结果。 metric: 对一个数据分组执行的统计,比如计算最大值,最小值,平均值等 类似于Mysql中的max

    2024年02月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包