java操作ElasticSearch之批量操作

这篇具有很好参考价值的文章主要介绍了java操作ElasticSearch之批量操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pom文件

# 版本号 
<version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>       

     <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${version.elasticsearch-rest-high-level-client}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${version.elasticsearch-rest-high-level-client}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${version.elasticsearch-rest-high-level-client}</version>
        </dependency>

RestHighLevelClient(操作ES的客户端,使用时注入bean即可)

 public RestHighLevelClient restHighLevelClient() {

        RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder
                                .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
                    }
                });
        return new RestHighLevelClient(rclientBuilder);

DeleteRequest批量删除

    /**
     * @Description: 根据es主键删除数据
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/2011:04
     */
    @Override
    public void deleteAllByIds(List<String> ids) {
        if (!CollectionUtils.isEmpty(ids)) {
            try {
                List<List<String>> outherList = SubListUtil.splitList(ids, count);
                for (List<String> innerList : outherList) {
                    //批量删除数据
                    BulkRequest request = new BulkRequest();
                    request.timeout("60s");
                    for (String id : innerList) {
// 类型json/_doc
                        DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
                        request.add(source);
                    }
                    BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
                    log.debug("ES批量删除数据 是否有失败 : " + response.hasFailures());
                }
            } catch (IOException e) {
                e.printStackTrace();
                log.error("ES批量删除数据:  " + e.toString());
                throw new RuntimeException("ES批量删除数据:  {}", e);
            } finally {
                ids.clear();
            }
        }
    }

BulkRequest批量新增

    /**
     * 批量更新es数据
     *
     * @param esDateList
     */
    private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
        if (CollectionUtils.isEmpty(esDateList)) {
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout("200s");
        try {
            for (int i = 0; i < esDateList.size(); i++) {
                CmsGoodsEntity object = esDateList.get(i);
                Map<String, Object> param = new LinkedHashMap<>();
                String goodid = object.getGoodid();
                param.put("tariffs_rate", object.getTariffsRate());
                param.put("inspection_charges_fee", object.getInspectionChargesFee());
                param.put("rates", object.getRates());
                UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
                updateRequest.doc(param);
                bulkRequest.add(updateRequest);
            }
            // 操作ES
            BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
            log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("***********批量更新ES数据异常***********");
        } finally {
            esDateList.clear();
        }
    }

DeleteByQueryRequest根据查询条件删除所有ES数据

    /**
     * @Description: 根据查看条件删除
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/2/1415:25
     */
    @Override
    public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
        //通过QueryBuilders中的搜索逻辑
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        //1 设置条件
        //设置删除条件: key = value
        TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
        if (!StringUtils.isEmpty(hisp)) {
            TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
            queryBuilder.must(hispTermQueryBuilder);
        }
        TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
        queryBuilder.must(sourceTermQueryBuilder);
        queryBuilder.must(supplierIdTermQueryBuilder);

        //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
        deleteByQueryRequest.setTimeout("6000s");
        deleteByQueryRequest.setQuery(queryBuilder);
        //指定删除索引
        deleteByQueryRequest.indices(restClientConfig.getIndex());
        deleteByQueryRequest.setConflicts("proceed");
        try {
            //3 通过deleteByQuery来发起删除请求
            BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
            if (deleteResponse.getDeleted() >= 1) {
                log.info("deleteData,删除成功,删除文档条数: " + deleteResponse.getDeleted() + " ,indexName:" + restClientConfig.getIndex());
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error("无法连接到ES目标服务器");
            throw new TargetServerException("无法连接到ES目标服务器");
        }
    }

UpdateByQueryRequest批量更新

    /**
     * @Description: 根据查看条件更新
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/2/1415:25
     */
    public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
        //通过QueryBuilders中的搜索逻辑
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        //1 设置条件
        //设置删除条件: key = value
        TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
        if (!StringUtils.isEmpty(hisp)) {
            TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
            queryBuilder.must(hispTermQueryBuilder);
        }
        TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
        queryBuilder.must(sourceTermQueryBuilder);
        queryBuilder.must(supplierIdTermQueryBuilder);

        //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.setTimeout("6000s");
        updateByQueryRequest.setQuery(queryBuilder);

        // 这个实际上就是对应给脚本中传参数的对象。
        HashMap<String, Object> params = new HashMap<>(16);
        params.put("hobby", "修改的参数value");
        final Script script = new Script(
                ScriptType.INLINE, "painless",
                "ctx._source.hobby = params.hobby",
                params);
        updateByQueryRequest.setScript(script);
        //指定索引
        updateByQueryRequest.indices(restClientConfig.getIndex());
        updateByQueryRequest.setConflicts("proceed");
        try {
            //3 通过deleteByQuery来发起删除请求
            BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
            if (deleteResponse.getUpdated() >= 1) {
                log.info("更新成功,更新文档条数: " + deleteResponse.getUpdated() + " ,indexName:" + restClientConfig.getIndex());
            }
        } catch (IOException e) {
            e.printStackTrace();
            log.error("无法连接到ES目标服务器");
            throw new TargetServerException("无法连接到ES目标服务器");
        }
    }

BulkRequest批量删除


    /**
     * 批量更新es数据
     *
     * @param esDateList
     */
    private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
        if (CollectionUtils.isEmpty(esDateList)) {
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout("600s");
        RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
        try {
            for (int i = 0; i < esDateList.size(); i++) {
                CmsGoodsEntity object = esDateList.get(i);
                Map<String, Object> param = new LinkedHashMap<>();
                String goodid = object.getGoodid();
                param.put("tariffs_rate", object.getTariffsRate());
                param.put("inspection_charges_fee", object.getInspectionChargesFee());
                param.put("rates", object.getRates());
                UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
                updateRequest.doc(param);
                bulkRequest.add(updateRequest);
            }
            // 操作ES
            BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
        } catch (IOException e) {
            log.error("批量更新ES数据异常,e", e);
            e.printStackTrace();
            throw new RuntimeException("ES批量更新数据异常:{}" + e);
        } finally {
            try {
                restHighLevelClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            esDateList.clear();
        }
    }

Script更新

public void updateHobby(String user, ESEntity esEntity) throws IOException {
        final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
        // 这个实际上就是对应给脚本中传参数的对象。
        HashMap<String, Object> params = new HashMap<>(16);
        params.put("hobby", "和女朋友爬山");
        final Script script = new Script(
                ScriptType.INLINE, "painless",
                "ctx._source.hobby = params.hobby",
                params);
        updateByQuery(queryBuilder, "索引名称", script);
    }

出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。

ElasticSearch超级实用API描述文章来源地址https://www.toymoban.com/news/detail-571649.html

以上代码需要变动一下,将一些参数替换掉。

到了这里,关于java操作ElasticSearch之批量操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch中批量操作(批量查询_mget、批量插入删除_bulk)

    有时候可以通过批量操作来减少网络请求。如:批量查询、批量插入数据。 当某一条数据不存在,不影响整体响应,需要通过found的值进行判断是否查询到数据。          在Elasticsearch中,支持批量的插入、修改、删除操作,都是通过_bulk的api完成的。 请求格式如下:(

    2024年02月12日
    浏览(49)
  • es head 新增字段、修改字段、批量修改字段、删除字段、删除数据、批量删除数据

    目录 一、新增字段 二、修改字段值 三、批量修改字段值 ​四、删除字段 五、删除数据/文档 六、批量删除数据/文档 put   http://{ip}:{port}/{index}/_mapping/{type}     其中,index是es索引、type是类型 数据: 例子: 注意:如果报错Types cannot be provided in put mapping requests, unless the in

    2024年02月04日
    浏览(56)
  • es使用java来批量创建文档和批量删除文档(基于es7.8)

    批量操作实际就是执行 bulk命令 先引入pom依赖: 创建User类 3、然后在名为user的索引里,插入多条数据,在main方法里进行测试: 打印结果是: 下面执行postman进行查询 http://127.0.0.1:9200/user/_search : 下面对指定的id进行批量删除: 执行结果是:

    2024年02月13日
    浏览(49)
  • Elasticsearch(七)--ES文档的操作(下)---删除文档

    上篇文章我们了解了ES的修改文档的操作,也同样分别通过ES的kibana客户端以及Java高级Rest客户端进行学习,那么本篇末尾要给大家介绍的是对文档的删除操作,同新修改文档,也有删除单条文档和批量删除文档操作,根据条件删除文档,我们本篇均会涉及到。 在ES中删除文档

    2023年04月12日
    浏览(34)
  • elasticsearch删除大批量数据方法

    一般回用如下: 示例: 如果删除任务完成了,返回如下: 数据查询任务:

    2024年02月12日
    浏览(68)
  • 解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!

    先说一下什么是数据库数据库中 并发一致性 问题! 1、在并发环境下,事务的隔离性很难保证,因此会出现很多并发一致性问题。 数据丢失 T1 和 T2 两个事务都对一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。 读脏数据 T1 修改一个数据,T2 随后读取

    2024年02月04日
    浏览(54)
  • java操作ElasticSearch之批量操作

    出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。 ElasticSearch超级实用API描述 以上代码需要变动一下,将一些参数替换掉。

    2024年02月16日
    浏览(46)
  • Java SpringBoot API 实现ES(Elasticsearch)搜索引擎的一系列操作(超详细)(模拟数据库操作)

    小编使用的是elasticsearch-7.3.2 基础说明: 启动:进入elasticsearch-7.3.2/bin目录,双击elasticsearch.bat进行启动,当出现一下界面说明,启动成功。也可以访问http://localhost:9200/ 启动ES管理:进入elasticsearch-head-master文件夹,然后进入cmd命令界面,输入npm run start 即可启动。访问http

    2024年02月04日
    浏览(57)
  • ES批量上传数据 - Python操作ES

    2024年02月11日
    浏览(39)
  • Java API批量操作Elasticsearch

    @Test public void batchAddIndex() throws IOException { BulkRequestBuilder bulkRequest = client .prepareBulk(); bulkRequest.add( client .prepareIndex( “batch_test1” , “batch” , “1” ) .setSource( jsonBuilder () .startObject() .field( “user” , “lzq” ) .field( “postDate” , new Date()) .field( “message” , “trying out Elasticsearch”

    2024年04月09日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包