/**
* 一个通用的es聚合查询方法
* @param restHighLevelClient
* @param start
* @param end
* @param index
* @param aggregationName
* @param painlessScript
* @param existsQueryParams
* @param notExistsQueryParams
* @return
* @throws IOException
*/
private List<String> queryFromEs(RestHighLevelClient restHighLevelClient, String start, String end, String[] index, String aggregationName, String painlessScript, List<String> existsQueryParams, List<String> notExistsQueryParams) throws IOException {
List<String> resultList = new ArrayList<>();
//创建一个搜索源
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//SearchRequest按一个或多个索引查询,需要一个SearchSourceBuilder,搜索源提供了搜索选项
SearchRequest searchRequest = new SearchRequest();
//聚合的条件
TermsAggregationBuilder aggregation = AggregationBuilders.terms(aggregationName).script(new Script(painlessScript)).size(Integer.MAX_VALUE).minDocCount(1).shardMinDocCount(0).showTermDocCountError(false);
//添加bool过滤器,进行条件查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//must --时间必须满足
boolQueryBuilder.must(QueryBuilders.rangeQuery("TIME").gte(start).lte(end));
//存在的条件
for (String existsQueryParam : existsQueryParams) {
boolQueryBuilder.must(QueryBuilders.existsQuery(existsQueryParam));
}
//不存在的条件
for (String notExistsQueryParam : notExistsQueryParams) {
boolQueryBuilder.mustNot(QueryBuilders.existsQuery(notExistsQueryParam));
}
//定义sourceBuilder按时间排序,正序,再传入之前的查询条件,from 0 size 0 不查原始数据
sourceBuilder.sort("TIME", SortOrder.ASC).from(0).size(0).query(boolQueryBuilder).aggregation(aggregation);
//定义查询的索引,定义搜索源,即sourceBuilder对象
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
log.info("-query from es index:{}, sql:{}-->", index, sourceBuilder);
//开始搜索,拿到结果
SearchResponse response = null;
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//拿到聚合数据
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
ParsedStringTerms empAccountAggregationsTerms = aggregations.get(aggregationName);
List<? extends Terms.Bucket> buckets = empAccountAggregationsTerms.getBuckets();
for (Terms.Bucket bucket : buckets) {
log.info("--query from es index results:-->" + bucket.getKey().toString());
resultList.add(bucket.getKey().toString());
}
}
return resultList;
}
2023-05-23更新 上面的条件是存在数据里,有定时任务触发
调整方法如下:
private void queryFromEs2Kafka(List<String> index, DmssScheduleJobPO dmssScheduleJobPO, EsDiscoverConfig esDiscoverConfig, String startTime, String endTime, RestHighLevelClient restHighLevelClient) throws IOException, ExecutionException, InterruptedException { JSONObject jsonObject = JSON.parseObject(esDiscoverConfig.getSearchCondition()); JSONObject painless_aggregations = jsonObject.getJSONObject("aggregations").getJSONObject("painless_aggregations"); log.debug("类型:" + dmssScheduleJobPO.getJobType() + "==jsonObject:==" + jsonObject); log.debug("==queryStr:==" + jsonObject.getString("query")); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.wrapperQuery(jsonObject.getString("query"))); // 添加时间范围限制 RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("time") .from(startTime) .to(endTime) .includeLower(true) .includeUpper(true); searchSourceBuilder.postFilter(rangeQueryBuilder); // 创建一个脚本对象 Script script = new Script(ScriptType.INLINE, "painless", painless_aggregations.getJSONObject("terms").getJSONObject("script").getString("source"), Collections.emptyMap()); // 创建一个 TermsAggregationBuilder 对象,并将脚本对象传递给 script 方法 TermsAggregationBuilder termsAggBuilder = AggregationBuilders.terms("painless_aggregations") .script(script) .size(2147483647) .minDocCount(1) .shardMinDocCount(0) .showTermDocCountError(false); // 将聚合条件添加到 searchSourceBuilder 中 searchSourceBuilder.aggregation(termsAggBuilder); searchSourceBuilder.from(0); searchSourceBuilder.size(0); log.info("--自动发现配置化请求参数:--" + searchSourceBuilder); //创建一个请求 SearchRequest searchRequest = new SearchRequest().indices(index.toArray(new String[]{})) .source(searchSourceBuilder); log.info("--自动发现2kafka配置化请求参数:--" + searchSourceBuilder); //发送请求 SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //拿到聚合数据 Aggregations aggregations = response.getAggregations(); if (aggregations != null) { send2KafkaUseEsResult(response, aggregations, dmssScheduleJobPO); } }
getSearchCondition为数据库中配置的条件:文章来源:https://www.toymoban.com/news/detail-475456.html
{"query":{"bool":{"must":[{"exists":{"field":"db_type","boost":1}},{"exists":{"field":"dst_ip","boost":1}},{"exists":{"field":"dst_port","boost":1}},{"exists":{"field":"dst_db_name","boost":1}},{"exists":{"field":"dst_db_table","boost":1}},{"exists":{"field":"dst_asset_name","boost":1}},{"exists":{"field":"dst_infosys_name","boost":1}}],"adjust_pure_negative":true,"boost":1}},"sort":[{"time":{"order":"asc"}}],"aggregations":{"painless_aggregations":{"terms":{"script":{"source":"if (doc.containsKey('dst_db_column') && doc['dst_db_column'].size() > 0) {return doc['_index'].value + '@_@'+doc['db_type'].values+'@_@'+doc['dst_ip'].values +'@_@'+ doc['dst_port'].values +'@_@'+ doc['dst_db_name'].values+'@_@'+ doc['dst_db_table'].values+'@_@'+ doc['dst_db_column'].values+'@_@'+doc['dst_asset_name.keyword'].values+'@_@'+doc['dst_infosys_name.keyword'].values; } else {return doc['_index'].value + '@_@'+doc['db_type'].values+'@_@'+doc['dst_ip'].values +'@_@'+ doc['dst_port'].values +'@_@'+ doc['dst_db_name'].values+'@_@'+ doc['dst_db_table'].values+'@_@'+doc['dst_asset_name.keyword'].values+'@_@'+doc['dst_infosys_name.keyword'].values;}","lang":"painless"},"size":2147483647,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false}}}}文章来源地址https://www.toymoban.com/news/detail-475456.html
到了这里,关于一个通用的es聚合查询方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!