pom文件
# 版本号
<version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${version.elasticsearch-rest-high-level-client}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${version.elasticsearch-rest-high-level-client}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${version.elasticsearch-rest-high-level-client}</version>
</dependency>
RestHighLevelClient(操作ES的客户端,使用时注入bean即可)
public RestHighLevelClient restHighLevelClient() {
RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
}
});
return new RestHighLevelClient(rclientBuilder);
DeleteRequest批量删除
/**
* @Description: 根据es主键删除数据
* @Params:
* @Return:
* @Author: Mr.myq
* @Date: 2022/12/2011:04
*/
@Override
public void deleteAllByIds(List<String> ids) {
if (!CollectionUtils.isEmpty(ids)) {
try {
List<List<String>> outherList = SubListUtil.splitList(ids, count);
for (List<String> innerList : outherList) {
//批量删除数据
BulkRequest request = new BulkRequest();
request.timeout("60s");
for (String id : innerList) {
// 类型json/_doc
DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
request.add(source);
}
BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
log.debug("ES批量删除数据 是否有失败 : " + response.hasFailures());
}
} catch (IOException e) {
e.printStackTrace();
log.error("ES批量删除数据: " + e.toString());
throw new RuntimeException("ES批量删除数据: {}", e);
} finally {
ids.clear();
}
}
}
BulkRequest批量新增
/**
* 批量更新es数据
*
* @param esDateList
*/
private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
if (CollectionUtils.isEmpty(esDateList)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("200s");
try {
for (int i = 0; i < esDateList.size(); i++) {
CmsGoodsEntity object = esDateList.get(i);
Map<String, Object> param = new LinkedHashMap<>();
String goodid = object.getGoodid();
param.put("tariffs_rate", object.getTariffsRate());
param.put("inspection_charges_fee", object.getInspectionChargesFee());
param.put("rates", object.getRates());
UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
updateRequest.doc(param);
bulkRequest.add(updateRequest);
}
// 操作ES
BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("***********批量更新ES数据异常***********");
} finally {
esDateList.clear();
}
}
DeleteByQueryRequest根据查询条件删除所有ES数据
/**
* @Description: 根据查看条件删除
* @Params:
* @Return:
* @Author: Mr.myq
* @Date: 2023/2/1415:25
*/
@Override
public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
//通过QueryBuilders中的搜索逻辑
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
//1 设置条件
//设置删除条件: key = value
TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
if (!StringUtils.isEmpty(hisp)) {
TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
queryBuilder.must(hispTermQueryBuilder);
}
TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
queryBuilder.must(sourceTermQueryBuilder);
queryBuilder.must(supplierIdTermQueryBuilder);
//2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.setTimeout("6000s");
deleteByQueryRequest.setQuery(queryBuilder);
//指定删除索引
deleteByQueryRequest.indices(restClientConfig.getIndex());
deleteByQueryRequest.setConflicts("proceed");
try {
//3 通过deleteByQuery来发起删除请求
BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
if (deleteResponse.getDeleted() >= 1) {
log.info("deleteData,删除成功,删除文档条数: " + deleteResponse.getDeleted() + " ,indexName:" + restClientConfig.getIndex());
}
} catch (IOException e) {
e.printStackTrace();
log.error("无法连接到ES目标服务器");
throw new TargetServerException("无法连接到ES目标服务器");
}
}
UpdateByQueryRequest批量更新
/**
* @Description: 根据查看条件更新
* @Params:
* @Return:
* @Author: Mr.myq
* @Date: 2023/2/1415:25
*/
public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
//通过QueryBuilders中的搜索逻辑
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
//1 设置条件
//设置删除条件: key = value
TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
if (!StringUtils.isEmpty(hisp)) {
TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
queryBuilder.must(hispTermQueryBuilder);
}
TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
queryBuilder.must(sourceTermQueryBuilder);
queryBuilder.must(supplierIdTermQueryBuilder);
//2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.setTimeout("6000s");
updateByQueryRequest.setQuery(queryBuilder);
// 这个实际上就是对应给脚本中传参数的对象。
HashMap<String, Object> params = new HashMap<>(16);
params.put("hobby", "修改的参数value");
final Script script = new Script(
ScriptType.INLINE, "painless",
"ctx._source.hobby = params.hobby",
params);
updateByQueryRequest.setScript(script);
//指定索引
updateByQueryRequest.indices(restClientConfig.getIndex());
updateByQueryRequest.setConflicts("proceed");
try {
//3 通过deleteByQuery来发起删除请求
BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
if (deleteResponse.getUpdated() >= 1) {
log.info("更新成功,更新文档条数: " + deleteResponse.getUpdated() + " ,indexName:" + restClientConfig.getIndex());
}
} catch (IOException e) {
e.printStackTrace();
log.error("无法连接到ES目标服务器");
throw new TargetServerException("无法连接到ES目标服务器");
}
}
BulkRequest批量删除
/**
* 批量更新es数据
*
* @param esDateList
*/
private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
if (CollectionUtils.isEmpty(esDateList)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("600s");
RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
try {
for (int i = 0; i < esDateList.size(); i++) {
CmsGoodsEntity object = esDateList.get(i);
Map<String, Object> param = new LinkedHashMap<>();
String goodid = object.getGoodid();
param.put("tariffs_rate", object.getTariffsRate());
param.put("inspection_charges_fee", object.getInspectionChargesFee());
param.put("rates", object.getRates());
UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
updateRequest.doc(param);
bulkRequest.add(updateRequest);
}
// 操作ES
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
} catch (IOException e) {
log.error("批量更新ES数据异常,e", e);
e.printStackTrace();
throw new RuntimeException("ES批量更新数据异常:{}" + e);
} finally {
try {
restHighLevelClient.close();
} catch (IOException e) {
e.printStackTrace();
}
esDateList.clear();
}
}
Script更新
public void updateHobby(String user, ESEntity esEntity) throws IOException {
final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
// 这个实际上就是对应给脚本中传参数的对象。
HashMap<String, Object> params = new HashMap<>(16);
params.put("hobby", "和女朋友爬山");
final Script script = new Script(
ScriptType.INLINE, "painless",
"ctx._source.hobby = params.hobby",
params);
updateByQuery(queryBuilder, "索引名称", script);
}
出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。
ElasticSearch超级实用API描述文章来源地址https://www.toymoban.com/news/detail-571649.html
以上代码需要变动一下,将一些参数替换掉。
文章来源:https://www.toymoban.com/news/detail-571649.html
到了这里,关于java操作ElasticSearch之批量操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!