因为最近要从elasticsearch中获取数据给前端展示,然后自己摸索到了一些查询方法,记录一下,以防忘记
只展示业务层的代码逻辑:
一、一次普通的查询方法:
public ResultVO<List<PageVO<PageVulVo>>> page(PageParam param, @ResTypeValue String[] resTypeValues) {
//排序
if (StringUtils.isEmpty(param.getSortParams())) {
param.setSortParams("first_time desc");
}
String sortParams = param.getSortParams();
//搜索字段
// Map<String, Object> map = ParamUtils.getMapBySearch(param);
List<SearchParam> searchParams = param.getSearchParams();
//查询条件
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
this.queryVul(searchParams, nativeSearchQueryBuilder);
//排序和分页
PubUtils.sortAndPage(sortParams, nativeSearchQueryBuilder, param);
//过滤
nativeSearchQueryBuilder.withCollapseField("code");
//查询
Page<RealVul> pageResult = realVulRepository.search(nativeSearchQueryBuilder.build());
//组装
List<PageVulVo> pageVulVoList = pageResult.getContent().stream().map(e -> new PageVulVo(e)).collect(Collectors.toList());
//返回
return ResultUtils.pageOk(pageVulVoList, pageResult.getTotalElements());
}
搜索条件方法:
public NativeSearchQueryBuilder queryVul(List<SearchParam> searchParams, NativeSearchQueryBuilder nativeSearchQueryBuilder) {
//查询条件
BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
for (SearchParam searchParam : searchParams) {
String value = searchParam.getFieldValue();
String name = searchParam.getFieldName();
switch (name){
case "vul_name":
mustQuery.must(QueryBuilders.wildcardQuery("vul_name", "*" + value + "*"));
break;
case "cve_code":
mustQuery.must(QueryBuilders.matchQuery("cve_code", value));
break;
case "port":
mustQuery.must(QueryBuilders.matchQuery("port",value));
break;
case "scan_type":
mustQuery.must(QueryBuilders.matchQuery("scan_type",value));
break;
case "create_time":
List<Long> createTime = DateUtils.parseStringToLong(value);
mustQuery.must(QueryBuilders.rangeQuery("create_time").gte(createTime.get(0)).lte(createTime.get(1)));
break;
default:
break;
}
}
nativeSearchQueryBuilder.withQuery(mustQuery);
return nativeSearchQueryBuilder;
}
分页和排序的方法:
public static NativeSearchQueryBuilder sortAndPage(String sortParams, NativeSearchQueryBuilder nativeSearchQueryBuilder, PageParam param) {
Pageable pageable = PageRequest.of(param.getCurrPage() - 1, param.getPageNums());
nativeSearchQueryBuilder.withPageable(pageable);
if (sortParams.toLowerCase().contains("desc")) {
nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort(sortParams.substring(0, sortParams.indexOf(" "))).order(SortOrder.DESC));
} else {
nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort(sortParams.substring(0, sortParams.indexOf(" "))).order(SortOrder.ASC));
}
return nativeSearchQueryBuilder;
}
这就是普通的查询办法。
二、带有统计和求最高级别的字段值
例如要对整个elastcsearch中的数据进行统计分组和巧合的时候,上述的方法就不能满足要求了。
例如要算出以下这种数据
第一列是求出最高级别,第二列是算出数量,第三和第四列是算出不同级别的数量。
为了应对这种需求,所以新的方法:
因为用的多线程是线程池,所以要自己格外配置线程池。文章来源:https://www.toymoban.com/news/detail-403030.html
@Resource
public ElasticsearchRestTemplate estTemplate;
@Autowired
public AsyncTaskExecutor asyncExecutor;
public ResultVO<List<PageVO<PageVulByIpVo>>> pageVulByIp(PageParam param, @ResTypeValue String[] resTypeValue) throws ExecutionException, InterruptedException {
//查询条件
NativeSearchQueryBuilder nativeSearchQueryBuilder = queryPreposition(param);
//分组
nativeSearchQueryBuilder.addAggregation(
AggregationBuilders.terms("ip_group").field("ip").size(Integer.MAX_VALUE)
//风险等级
.subAggregation(AggregationBuilders.max("vul_severity_max").field("vul_severity"))
//ip名称
.subAggregation(AggregationBuilders.terms("ip_name").field("ip"))
//发现时间
.subAggregation(AggregationBuilders.max("create_time").field("create_time"))
//排序+分页
.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_sort", null)
.from(param.getCurrPage()).size(param.getPageNums()))
);
nativeSearchQueryBuilder.addAggregation(
//统计总数
AggregationBuilders.terms("ip_count").field("ip").size(Integer.MAX_VALUE)
);
//获取分页中的ip集合
List<String> keyList = new ArrayList<>();
//储存风险等级
HashMap<String, Integer> vulSeverityMap = new HashMap<>();
//储存ip内容;顺序为:ip,create_time
HashMap<String, String> ipContent = new HashMap<>();
//查询结果
AggregatedPage<RealVul> pageResult = estTemplate.queryForPage(nativeSearchQueryBuilder.build(), RealVul.class);
//进行分组
Terms stringTerms = (Terms) Objects.requireNonNull(pageResult.getAggregation("ip_group"));
Terms ipCount = (Terms) Objects.requireNonNull(pageResult.getAggregation("ip_count"));
//获取总数
long total = Long.valueOf(ipCount.getBuckets().size());
for (Terms.Bucket bucket : stringTerms.getBuckets()) {
String ip = bucket.getKeyAsString();
//获取风险等级
Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
double vulSeverity = ((ParsedMax) aggregationMap.get("vul_severity_max")).getValue();
Integer value = Integer.valueOf((int) vulSeverity);
//获取时间
Aggregation time = aggregationMap.get("create_time");
String createTimeString = ((ParsedMax) time).getValueAsString();
//获取ip地址
Terms ipName = (Terms) Objects.requireNonNull(aggregationMap.get("ip_name"));
String ipString = ipName.getBuckets().get(0).getKeyAsString();
ipContent.put(ip,ipString+"/"+createTimeString);
vulSeverityMap.put(ip, value);
//获取ip
keyList.add(ip);
}
//获取当前用户所需的数据
List<PageVulByIpVo> vulByIpVoList = new ArrayList<>();
//资产相关的名称
BaseVO<List<AssetRestVO>> assetByIPList = assetClient.getAssetByIPList(keyList);
List<AssetRestVO> data = assetByIPList.getData();
//组装
for (String ip : keyList) {
//多线程查询字段
Future<Integer> vulSize = asyncExecutor.submit(this.getVulSize(Arrays.asList(ip), null, null, null));
Future<Map> stateMap = asyncExecutor.submit(this.getStateMap(null, Arrays.asList(ip), null, null, null));
Future<Map> vulSeveritysMap = asyncExecutor.submit(this.getVulSeverityMap(Arrays.asList(ip), null, null, null));
//组装
Integer vulSizeOne = vulSize.get();
Map stateMapOne = stateMap.get();
Map vulSeverity = vulSeveritysMap.get();
Integer vulSeverityMax = vulSeverityMap.get(ip);
String[] content = ipContent.get(ip).split("/");
AssetRestVO assetVo=new AssetRestVO();
if (!CollectionUtils.isEmpty(data)){
assetVo = data.stream().filter(assetRestVO -> ip.equals(assetRestVO.getIp())).collect(Collectors.toList()).get(0);
}
PageVulByIpVo vulByIpVo = new PageVulByIpVo(content, vulSeverityMax, vulSizeOne, stateMapOne, vulSeverity, assetVo);
vulByIpVoList.add(vulByIpVo);
}
//返回
return ResultUtils.pageOk(vulByIpVoList, total);
}
//算出数量
public Callable<Integer> getVulSize(List<String> ipList, Integer bsId, Integer areaId, Integer orgId) {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (!CollectionUtils.isEmpty(ipList)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("ip", ipList));
}
if (null != bsId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
}
if (null != areaId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
}
if (null != orgId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
}
//查询条件
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT)
.withQuery(boolQueryBuilder)
.withCollapseField("code")
.build();
//查询
int count = (int) estTemplate.count(searchQuery);
return count;
}
};
return callable;
}
/**
* 获取整改状态
*/
public Callable<Map> getStateMap(String cveCode, List<String> ipList, Integer bsId, Integer areaId, Integer orgId) {
Callable<Map> callable = new Callable<Map>() {
@Override
public Map call() throws Exception {
Map<String, Integer> stateMap = new HashMap<>();
stateMap.put("finish", 0);
stateMap.put("doing", 0);
stateMap.put("wait_submit", 0);
stateMap.put("waiting_process", 0);
stateMap.put("ignore", 0);
//查询条件
//查询整改状态条件
for (RealVulStateEnum stateEnum : RealVulStateEnum.values()) {
int value = stateEnum.getValue();
//查询条件
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (!StringUtils.isEmpty(cveCode)) {
boolQueryBuilder.must(QueryBuilders.matchQuery("cve_code", cveCode));
}
if (!CollectionUtils.isEmpty(ipList)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("ip", ipList));
}
if (null != bsId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
}
if (null != areaId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
}
if (null != orgId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
}
boolQueryBuilder.must(QueryBuilders.matchQuery("state", value));
nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
//过滤
nativeSearchQueryBuilder.withCollapseField("code");
nativeSearchQueryBuilder.withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT);
long count = estTemplate.count(nativeSearchQueryBuilder.build());
switch (RealVulStateEnum.getRealVulStateEnum(value)) {
case PENDING:
stateMap.put("waiting_process", (int) count);
break;
case PROCESSING:
stateMap.put("doing", (int) count);
break;
case RECTIFIED:
case CHECKED:
stateMap.put("finish", stateMap.get("finish") + (int) count);
break;
case SUBMIT:
case SUBMIT_REJECT:
stateMap.put("wait_submit", stateMap.get("wait_submit") + (int) count);
break;
default:
stateMap.put("ignore", (int) count);
break;
}
}
return stateMap;
}
};
return callable;
}
/**
* 获取风险等级
*
* @param asList
* @return
*/
private Callable<Map> getVulSeverityMap(List<String> asList, Integer bsId, Integer areaId, Integer orgId) {
Callable<Map> callable = new Callable<Map>() {
@Override
public Map call() throws Exception {
//初始化数量
Map<String, Integer> vulSeverityMap = new HashMap<>();
vulSeverityMap.put("super_risk", 0);
vulSeverityMap.put("high_risk", 0);
vulSeverityMap.put("medium_risk", 0);
vulSeverityMap.put("low_risk", 0);
vulSeverityMap.put("unknown", 0);
//查询条件 依次统计5个风险等级
for (VulSeverityEnum vulSeverityEnum : VulSeverityEnum.values()) {
int value = vulSeverityEnum.getValue();
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//查询条件
if (!CollectionUtils.isEmpty(asList)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("ip", asList));
}
if (null != bsId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
}
if (null != areaId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
}
if (null != orgId) {
boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
}
boolQueryBuilder.must(QueryBuilders.matchQuery("vul_severity", value));
nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
//过滤
nativeSearchQueryBuilder.withCollapseField("code");
nativeSearchQueryBuilder.withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT);
long count = estTemplate.count(nativeSearchQueryBuilder.build());
switch (VulSeverityEnum.getVulSeverityEnum(value)) {
case UNKNOWN:
vulSeverityMap.put("unknown", (int) count);
break;
case LOW:
vulSeverityMap.put("low_risk", (int) count);
break;
case MEDIUM:
vulSeverityMap.put("medium_risk", (int) count);
break;
case HIGH:
vulSeverityMap.put("high_risk", (int) count);
break;
default:
vulSeverityMap.put("super_risk", (int) count);
break;
}
}
return vulSeverityMap;
}
};
return callable;
}
三、批量修改
//批量修改状态
List<UpdateQuery> updateQueryList = new ArrayList<>();
IndexRequest indexRequest = new IndexRequest();
indexRequest.source("state", RealVulStateEnum.PENDING.getValue());
for (String id : idList) {
UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(RealVul.class).withId(id).withIndexRequest(indexRequest).build();
updateQueryList.add(updateQuery);
}
estTemplate.bulkUpdate(updateQueryList);
四、对于数据很大的es进行统计分组,一次性的计算,可用startScroll 来进行计算文章来源地址https://www.toymoban.com/news/detail-403030.html
/**
* scroll游标快照超时时间,单位ms
*/
private static final long SCROLL_TIMEOUT = 60 * 1000;
/**
* 多线程调用(线程池)
*/
@Autowired
public AsyncTaskExecutor asyncExecutor;
public void selectVul() throws Exception {
Future<Integer> vul = asyncExecutor.submit(this.vulInsertMysql(VulTypeEnum.IP_INSERT.getValue()));
、、、、、、、
}
public Callable<Integer> vulInsertMysql(Integer type) {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
nativeSearchQueryBuilder.withPageable(PageRequest.of(0, 500));
//分组
nativeSearchQueryBuilder.addAggregation(
AggregationBuilders.terms("ip_group").field("ip").size(Integer.MAX_VALUE)
//风险等级
.subAggregation(AggregationBuilders.max("vul_severity_max").field("vul_severity"))
);
//漏洞
AggregatedPage<RealVul> vulIp = (AggregatedPage<RealVul>) elasticsearchRestTemplate.startScroll(SCROLL_TIMEOUT, nativeSearchQueryBuilder.build(), RealVul.class);
while (vulIp.hasContent()) {
List<VulIpSeverityInfo> vulList = vulIp.getContent().stream().map(e -> {
VulIpSeverityInfo vulIpSeverityInfo = new VulIpSeverityInfo(e.getIp(), e.getVulSeverity());
return vulIpSeverityInfo;
}).collect(Collectors.toList());
listToPage(vulList, type);
//取下一页,scrollId在es服务器上可能会发生变化,需要用最新的。发起continueScroll请求会重新刷新快照保留时间
vulIp = (AggregatedPage<RealVul>) elasticsearchRestTemplate.continueScroll(vulIp.getScrollId(), SCROLL_TIMEOUT, RealVul.class);
}
//及时释放es服务器资源
log.info("漏洞表的ip全部取出");
elasticsearchRestTemplate.clearScroll(vulIp.getScrollId());
return 1;
}
};
log.info("漏洞统计ip开始");
return callable;
到了这里,关于实战中关于elasticsearch中的查询方法--高级查询的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!