Elasticsearch Java REST Client 批量操作(Bulk API)

这篇具有很好参考价值的文章主要介绍了Elasticsearch Java REST Client 批量操作(Bulk API)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查询文章来源地址https://www.toymoban.com/news/detail-515835.html

BulkRequest

BulkRequest可用于使用单个请求执行多个索引、更新和/或删除操作。
它需要至少一个操作添加到 Bulk 请求中:

# 方式一:
@GetMapping("test")
public String test() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new IndexRequest("edu-app-user", "doc", "1")
            .source(XContentType.JSON, "name", "foo"));
    request.add(new IndexRequest("edu-app-user", "doc", "2")
            .source(XContentType.JSON, "name", "elastic"));
    request.add(new IndexRequest("edu-app-user", "doc", "3")
            .source(XContentType.JSON, "name", "wdz"));

    BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
    return response(bulk);
}
# 混合操作
@GetMapping("test1")
public String test1() throws IOException {
     BulkRequest request = new BulkRequest();
     request.add(new DeleteRequest("edu-app-user", "doc", "3"));
     request.add(new UpdateRequest("edu-app-user", "doc", "2")
             .doc(XContentType.JSON, "name", "update"));
     request.add(new IndexRequest("edu-app-user", "doc", "4")
             .source(XContentType.JSON, "name", "baz"));
     BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
     return response(bulk);
 }
private String response(BulkResponse bulk){
    String str = "";
    for (BulkItemResponse responses : bulk) {
        System.out.println(responses.toString());
        DocWriteResponse response = responses.getResponse();
        switch (responses.getOpType()) {
            case CREATE:
                System.out.println("创建数据-----------------");
                break;
            case INDEX:
                IndexResponse indexResponse = (IndexResponse) response;
                System.out.println("操作索引数据-----------------"+indexResponse.toString());
                str = indexResponse.toString();
                break;
            case DELETE:
                DeleteResponse deleteResponse = (DeleteResponse) response;
                System.out.println("操作删除数据-----------------"+deleteResponse.toString());
                str = deleteResponse.toString();
                break;
            case UPDATE:
                UpdateResponse updateResponse = (UpdateResponse) response;
                System.out.println("操作更新数据-----------------"+updateResponse.toString());
                str = updateResponse.toString();
                break ;
        }
    }
    return str;
}
# 异步处理
@GetMapping("test3")
public void test3() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new DeleteRequest("edu-app-user", "doc", "3"));
    request.add(new UpdateRequest("edu-app-user", "doc", "100")
            .doc(XContentType.JSON, "name", "update"));
    request.add(new IndexRequest("edu-app-user", "doc", "1")
            .source(XContentType.JSON, "name", "baz"));
    restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT,new BulkListen());
}
# 监听
package com.wdz.es.config.es;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;

public class BulkListen implements ActionListener<BulkResponse> {
    @Override
    public void onResponse(BulkResponse bulkItemResponses) {
        String response = response(bulkItemResponses);
        System.out.println("异步成功: "+response);
    }

    @Override
    public void onFailure(Exception e) {
        System.out.println("异步处理失败:"+e.getMessage());
    }

    public static String response(BulkResponse bulk) {
        String str = "";
        for (BulkItemResponse responses : bulk) {
            System.out.println(responses.toString());
            DocWriteResponse response = responses.getResponse();

            switch (responses.getOpType()) {
                case CREATE:
                    System.out.println("创建数据-----------------");
                    break;
                case INDEX:
                    IndexResponse indexResponse = (IndexResponse) response;
                    System.out.println("操作索引数据-----------------" + indexResponse.toString());
                    str = indexResponse.toString();
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) response;
                    System.out.println("操作删除数据-----------------" + deleteResponse.toString());
                    str = deleteResponse.toString();
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) response;
                    System.out.println("操作更新数据-----------------" + updateResponse.toString());
                    str = updateResponse.toString();
                    break;
            }
            // 获取失败的处理
            BulkItemResponse.Failure failure = responses.getFailure();
            System.out.println(failure.toString());
        }
        return str;
    }
}

# 结果
org.elasticsearch.action.bulk.BulkItemResponse@6ca820cd
操作删除数据-----------------DeleteResponse[index=edu-app-user,type=doc,id=3,version=4,result=deleted,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@25ae673
操作更新数据-----------------UpdateResponse[index=edu-app-user,type=doc,id=2,version=4,seqNo=5,primaryTerm=1,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
org.elasticsearch.action.bulk.BulkItemResponse@5e6d6373
操作索引数据-----------------IndexResponse[index=edu-app-user,type=doc,id=4,version=1,result=created,seqNo=6,primaryTerm=1,shards={"total":2,"successful":1,"failed":0}]

可选操作

# 设置超时时间(两种方式)
request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 
# 设置在继续执行索引/更新/删除操作之前必须处于活动状态的分片副本数。
request.waitForActiveShards(2); 
# 提供的分片副本数ActiveShardCount: 可以是 ActiveShardCount.ALL,ActiveShardCount.ONE或 ActiveShardCount.DEFAULT(默认)
request.waitForActiveShards(ActiveShardCount.ALL); 	

批处理 BulkProcessor

@GetMapping("test2")
public void test2() throws IOException {
   BulkProcessor.Listener listener = new BulkProcessor.Listener() {
       @Override
       public void beforeBulk(long executionId, BulkRequest request) {
           System.out.println(executionId+"批处理之前request:"+JSONObject.toJSONString(request));
       }
       @Override
       public void afterBulk(long executionId, BulkRequest request,
                             BulkResponse response) {
           System.out.println(executionId+"批处理之后request:"+JSONObject.toJSONString(request));
           System.out.println(executionId+"批处理之后response:"+JSONObject.toJSONString(response));
       }
       @Override
       public void afterBulk(long executionId, BulkRequest request,
                             Throwable failure) {
           System.out.println(executionId+"批处理异常request:"+JSONObject.toJSONString(request));
           System.out.println(executionId+"批处理异常failure:"+failure.getMessage());
       }
   };

   BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
           (request, bulkListener) ->
                   restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
   BulkProcessor bulkProcessor =
            BulkProcessor.builder(bulkConsumer, listener).build();
   bulkProcessor.add(new IndexRequest("edu-app-user","doc","1002").source(XContentType.JSON,"name","BulkProcessor"));
   bulkProcessor.add(new DeleteRequest("edu-app-user","doc","2"));
   bulkProcessor.add(new UpdateRequest("edu-app-user","doc","28").doc(XContentType.JSON,"name","更新测试"));
   // 这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且还禁止向其添加任何新请求
   // 直到所有请求都已处理或指定的等待时间过去
   try {
       bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   // 该close()方法可用于立即关闭BulkProcessor
   bulkProcessor.close();
   System.out.println(JSONObject.toJSONString(bulkConsumer));
   System.out.println(JSONObject.toJSONString(add));
   System.out.println(JSONObject.toJSONString(bulkProcessor));
}
# 异常结果
1批处理之前request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理异常request:{"description":"requests[1], indices[bulk-test]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理异常failure:Validation Failed: 1: source is missing;2: content type is missing;
{}
{}
{}
# 成功结果
1批处理之前request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批处理之后request:{"description":"requests[1], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"100","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"100","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":1,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1}],"took":{"days":0,"daysFrac":1.0300925925925926E-6,"hours":0,"hoursFrac":2.4722222222222223E-5,"micros":89000,"microsFrac":89000.0,"millis":89,"millisFrac":89.0,"minutes":0,"minutesFrac":0.0014833333333333332,"nanos":89000000,"seconds":0,"secondsFrac":0.089,"stringRep":"89ms"}}

# 混合批处理结果
1批处理之前request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
{}
{}
{}
1批处理之后request:{"description":"requests[3], indices[edu-app-user]","parentTask":{"id":-1,"nodeId":"","set":false},"refreshPolicy":"NONE","shouldStoreResult":false}
1批处理之后response:{"fragment":false,"ingestTook":{"days":0,"daysFrac":-1.1574074074074074E-8,"hours":0,"hoursFrac":-2.7777777777777776E-7,"micros":-1000,"microsFrac":-1000.0,"millis":-1,"millisFrac":-1.0,"minutes":0,"minutesFrac":-1.6666666666666667E-5,"nanos":-1000000,"seconds":0,"secondsFrac":-0.001,"stringRep":"-1"},"ingestTookInMillis":-1,"items":[{"failed":false,"fragment":false,"id":"1002","index":"edu-app-user","itemId":0,"opType":"INDEX","response":{"fragment":false,"id":"1002","index":"edu-app-user","primaryTerm":1,"result":"CREATED","seqNo":16,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":1},"type":"doc","version":1},{"failed":false,"fragment":false,"id":"2","index":"edu-app-user","itemId":1,"opType":"DELETE","response":{"fragment":false,"id":"2","index":"edu-app-user","primaryTerm":1,"result":"DELETED","seqNo":7,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":5},"type":"doc","version":5},{"failed":false,"fragment":false,"id":"28","index":"edu-app-user","itemId":2,"opType":"UPDATE","response":{"fragment":false,"id":"28","index":"edu-app-user","primaryTerm":1,"result":"UPDATED","seqNo":17,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"edu-app-user","uUID":"_na_"},"indexName":"edu-app-user"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"doc","version":8},"type":"doc","version":8}],"took":{"days":0,"daysFrac":1.0069444444444445E-6,"hours":0,"hoursFrac":2.4166666666666667E-5,"micros":87000,"microsFrac":87000.0,"millis":87,"millisFrac":87.0,"minutes":0,"minutesFrac":0.00145,"nanos":87000000,"seconds":0,"secondsFrac":0.087,"stringRep":"87ms"}}

multi Get API 多获取

multiGetAPI 在单个 http 请求中并行执行多个请求get 。
MultiGetRequest,添加 `MultiGetRequest.Item 来配置要获取的内容:

@GetMapping("get")
public MultiGetResponse get(){
     MultiGetRequest request = new MultiGetRequest();
     MultiGetRequest.Item item = new MultiGetRequest.Item(index, type, "28");
     MultiGetRequest.Item item1 = new MultiGetRequest.Item(index, type, "1");
     MultiGetRequest.Item item2 = new MultiGetRequest.Item(index, type, "3");
     request.add(item);
     request.add(item1);
     request.add(item2);
     MultiGetResponse mget = null;
     try {
         mget = restHighLevelClient.mget(request, RequestOptions.DEFAULT);
     } catch (IOException e) {
         e.printStackTrace();
     }
     return mget;
 }

可选参数:

# 禁用抓取_source
new MultiGetRequest.Item("index", "type", "example_id").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)

# 过滤数据
String[] includes = new String[] {"name", "age"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(item.fetchSourceContext(fetchSourceContext));
    

多获取异步处理方式与其他异步一致更新泛型即可

上一篇:Elasticsearch Java REST Client Term Vectors API
下一篇:Elasticsearch Java REST Client Search APIs 查询

到了这里,关于Elasticsearch Java REST Client 批量操作(Bulk API)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • elasticsearch bulk 批量操作

    bulk 请求体如下: { action: { metadata }}n { request body }n { action: { metadata }}n { request body }n 测试索引示例 PUT batch_test { “mappings”: { “properties”: { “id”:{ “type”: “keyword” }, “name”:{ “type”: “text” }, “age”:{ “type”: “integer” } } } } 测试原始数据 PUT /_bulk {“index”:{“_i

    2024年02月07日
    浏览(41)
  • ElasticSearch中批量操作(批量查询_mget、批量插入删除_bulk)

    有时候可以通过批量操作来减少网络请求。如:批量查询、批量插入数据。 当某一条数据不存在,不影响整体响应,需要通过found的值进行判断是否查询到数据。          在Elasticsearch中,支持批量的插入、修改、删除操作,都是通过_bulk的api完成的。 请求格式如下:(

    2024年02月12日
    浏览(49)
  • Elasticsearch学习--索引的批量操作mget、bulk

    1. 基本用法 查询id是1、2的数据 2. 提取index  3. ids的用法 4. 指定source create:不存在则创建,存在则报错 delete:删除文档 update:全量替换或部分更新 index:索引(动词) 1. 自动生成id   2. 删除操作是懒删除 并没有真正的删除,只是标记为删除  3. index(可以是创建,也可以

    2024年02月10日
    浏览(53)
  • 初识ElasticSearch(5) -批量操作之bulk | 条件查询 | 其它查询

    本系列笔记结合HTTP请求(使用postman调用,源文件见GitHub)和ElasticsearchRestTemplate进行调用学习 ElasticsearchRestTemplate封装了RestHighLevelClient,有些场景还得用RestHighLevelClient来操作 版本说明:使用的SpringBoot-2.3.5,对应的ElasticSearch-7.6.2;所以还是可以用RestHighLevelClient ElasticSearch-7

    2023年04月08日
    浏览(76)
  • 最新版ES8的client API操作 Elasticsearch Java API client 8.0

    作者:ChenZhen 本人不常看网站消息,有问题通过下面的方式联系: 邮箱:1583296383@qq.com vx: ChenZhen_7 我的个人博客地址:https://www.chenzhen.space/🌐 版权:本文为博主的原创文章,本文版权归作者所有,转载请附上原文出处链接及本声明。📝 如果对你有帮助,请给一个小小的s

    2024年02月04日
    浏览(41)
  • Java API批量操作Elasticsearch

    @Test public void batchAddIndex() throws IOException { BulkRequestBuilder bulkRequest = client .prepareBulk(); bulkRequest.add( client .prepareIndex( “batch_test1” , “batch” , “1” ) .setSource( jsonBuilder () .startObject() .field( “user” , “lzq” ) .field( “postDate” , new Date()) .field( “message” , “trying out Elasticsearch”

    2024年04月09日
    浏览(45)
  • Elasticsearch rest-high-level-client 基本操作

    本篇主要讲解一下 rest-high-level-client 去操作 Elasticsearch , 虽然这个客户端在后续版本中会慢慢淘汰,但是目前大部分公司中使用Elasticsearch 版本都是6.x 所以这个客户端还是有一定的了解 前置准备 准备一个SpringBoot环境 2.2.11 版本 准备一个Elasticsearch 环境 我这里是8.x版本 引入依赖

    2024年01月22日
    浏览(53)
  • 【Elasticsearch学习笔记五】es常用的JAVA API、es整合SpringBoot项目中使用、利用JAVA代码操作es、RestHighLevelClient客户端对象

    目录 一、Maven项目集成Easticsearch 1)客户端对象 2)索引操作 3)文档操作 4)高级查询 二、springboot项目集成Spring Data操作Elasticsearch 1)pom文件 2)yaml 3)数据实体类 4)配置类 5)Dao数据访问对象 6)索引操作 7)文档操作 8)文档搜索 三、springboot项目集成bboss操作elasticsearch

    2023年04月09日
    浏览(51)
  • SpringBoot整合ElasticSearch之Java High Level REST Client

    1 搭建SpringBoot工程 2 引入ElasticSearch相关坐标。 3 编写核心配置类 编写核心配置文件: 这里可以不写在配置,可以直接写在代码中,只是一般都是写在配置文件中 编写核心配置类 4 测试客户端对象 记得把maven的单元测试关了 注意:使用@Autowired注入RestHighLevelClient 如果报红线

    2024年02月05日
    浏览(52)
  • (Rest风格API)Elasticsearch索引操作、映射配置、数据操作、查询操作

    1.请求方式:put 2.请求路径:索引库名 3.请求参数:json格式 number_of_shards 是指索引要做多少个分片,只能在创建索引时指定,后期无法修改。 number_of_replicas 是指每个分片有多少个副本,后期可以动态修改 什么是分片? ES中所存数据的文件块,也是数据的最小单元块。假如有

    2024年04月26日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包