ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据

这篇具有很好参考价值的文章主要介绍了ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ElasticSearch

1、ElasticSearch学习随笔之基础介绍
2、ElasticSearch学习随笔之简单操作
3、ElasticSearch学习随笔之java api 操作
4、ElasticSearch学习随笔之SpringBoot Starter 操作
5、ElasticSearch学习随笔之嵌套操作
6、ElasticSearch学习随笔之分词算法
7、ElasticSearch学习随笔之高级检索
8、ELK技术栈介绍
9、Logstash部署与使用
10、ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据
11、ElasticSearch 8.x 弃用了 High Level REST Client,移除了 Java Transport Client,推荐使用 Elasticsearch Java API
12、ElasticSearch 8.x 使用 snapshot(快照)进行数据迁移
13、ElasticSearch 8.x 版本如何使用 SearchRequestBuilder 检索

ElasticSearch,创始人 Shay Banon(谢巴农)



前言

本文主要应用 Rest High Level Client 来进行对 ElasticSearch 进行操作,虽说官方已经不推荐,但是 ES 升级带来的代价也是相当大的,所以,此处略去一万字。

  • 那什么是 BulkProcessor 呢?
    BulkProcessorElasticSearch 客户端中的一个功能,用于批量执行索引、更新或删除操作,BulkProcessor 运行将多个操作打包成一个请求进行发送,以提高效率和性能。

批量操作索引的好处:

  • 性能优势:将多个操作打包成一个请求,这样可以减少网络开销,提高数据传输效率,从而可以加快数据写入索引速度。
  • 减少开销:较少的网络开销和较少的服务器的交互,减少服务器开销,尤其是大规模写入数据时。
  • 原子性:批量操作可以保证一组操作要么全部成功,要么全部失败,报错数据的一致性。
  • 减少开发成本:批量操作,可以简化客户端代码,减少请求和管理连接的操作。

当然,批量操作也是有缺点的:

  • 内存消耗:在执行批量操作时,首先会将数据写入内存,这样会消耗更多的内存。
  • 错误处理复杂性:单条数据上传,如果出错可以重试或者进行记录操作等,但是批量操作中的某个请求失败,需要额外来处理,比单条操作复杂。
  • 延迟响应:批量操作可能导致请求排队等待,会产生一些延迟。

多余的不说,来上代码。

一:引入 pom

首先引入客户端依赖,我的测试 ES 服务是 8.7.0 版本的,这里对应 High Level REST Client 客户端 7.3.2 版本的。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.2</version>
</dependency>

之所以不用更高版本,是因为版本高了会报如下错误:

java.io.IOException: Unable to parse response body for Response{requestLine=POST /devintcompany@1562219164186/_doc?timeout=1m HTTP/1.1, host=http://192.168.*。*:9200, response=HTTP/1.1 201 Created}
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1473)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
	at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)
	at com.example.es.EsTest.addIndex(EsTest.java:97)
	at com.example.es.EsTest.main(EsTest.java:36)
Caused by: java.lang.NullPointerException
	at java.util.Objects.requireNonNull(Objects.java:203)
	at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
	at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:50)
	at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)
	at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:103)
	at org.elasticsearch.action.index.IndexResponse.fromXContent(IndexResponse.java:85)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)
	at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1395)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1471)
	... 5 more

亲自测试过的,应该还是版本不兼容的缘故,但是数据已经插入到 Index 了,就很奇怪。

二:创建 ES Client

这里初始化客户端,需要用户名密码进行认证的。

private static RestHighLevelClient createClient(){
    String hostname = "192.168.*.*";
    int port = 9200;
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("your username", "your password"));
    RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port))
            .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    return new RestHighLevelClient(restClientBuilder);
}

三:创建 BulkProcessor

这里创建 BulkProcessor 批量操作对象,通过 High Level REST Client 来绑定,加入监听器 BulkProcessor.Listener,如果批量操作失败或发生异常,在 afterBulk() 方法中处理。
批量处理需要设置的参数代码中已有注释,一般就设置这些参数就可以了,可根据自己的使用场景进行调节。

public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {
    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            System.out.println("开始执行批量操作,ID: " + executionId);
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (response.hasFailures()) {
                System.out.println("批量操作完成,ID: " + executionId);
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            System.out.println("批量操作失败,ID: " + executionId);
            failure.printStackTrace();
        }
    };
    BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        bulkRequest.timeout(TimeValue.timeValueSeconds(100));
        client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
    }), listener);
    // 当达到1000个操作时触发批量请求
    builder.setBulkActions(1000);
    // 当达到5MB大小时触发批量请求
    builder.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB));
    // 每5秒触发一次批量请求,无论大小和操作数如何
    builder.setFlushInterval(TimeValue.timeValueSeconds(5));
    // 设置退避策略,以防服务器过载或拒绝请求
    builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3));
    // 设置并发请求的数量为1,即同时只有一个批量请求在执行
    builder.setConcurrentRequests(1);
    return builder.build();
}

四:批量推数据

我们在 main 方法中进行测试,代码如下:文章来源地址https://www.toymoban.com/news/detail-795879.html

public static void main(String[] args) throws IOException {
    RestHighLevelClient client = createClient();
    BulkProcessor bulkProcessor = getBulkProcessor(client);
    for (int i = 0; i < 10; i++) {
        String source = "{\"ApplianceType\":[{\"ApplianceTypeCn\":\"国产\",\"ApplianceTypeEn\":\"Domestic\",\"ApplianceTypeId\":\"1\"}],\"ApplicationCount\":0,\"ClassICount\":17,\"ClassIICount\":1,\"ClassIIICount\":0,\"Classification\":[{\"Cn\":\"2002版分类\",\"En\":\"2002 reg. category of relevant app.\",\"Id\":\"Class2002\",\"Items\":[{\"Cn\":\"Ⅰ类\",\"En\":\"Class Ⅰ\",\"Id\":\"1\",\"Id2\":\"I\",\"Items\":[{\"Cn\":\"进口第一类医疗器械(含第一类体外诊断试剂)备案信息\",\"En\":\"Information on imported ClassⅠmedical devices (including ClassⅠ IVD reagents)\",\"Id\":\"100\"}]},{\"Cn\":\"Ⅱ类\",\"En\":\"Class Ⅱ\",\"Id\":\"2\",\"Id2\":\"II\",\"Items\":[{\"Cn\":\"妇产科、辅助生殖和避孕器械\",\"En\":\"Obstetrics and gynecology, assisted reproductive and contraceptive devices\",\"Id\":\"201818\"}]}]},{\"Class1Code\":[{\"Id\":\"02\"}],\"Class2Code\":[{\"Id\":\"03\"}],\"DataType\":[{\"Id\":\"1\"},{\"Id\":\"3\"}],\"ProductClassificationCode\":[{\"Id\":\"09\"}],\"ProductClassificationNameCode\":[]}],\"Company\":{\"Cn\":\"海南创鑫医药科技发展有限公司\",\"En\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"Id\":\"1000002388\"},\"CompanyAliasCn\":[\"海南创鑫医药科技发展有限公司\"],\"CompanyAliasEn\":[\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\"],\"CompanyCn\":\"海南创鑫医药科技发展有限公司\",\"CompanyCnSearch\":\"海南创鑫医药科技发展有限公司\",\"CompanyEn\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyEnSearch\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyId\":\"1000002388\",\"CompanyType\":{\"Cn\":\"国内公司\",\"En\":\"Domestic company\",\"Id\":\"Domestic company\"},\"CompanyTypeCn\":\"国内公司\",\"CompanyTypeEn\":\"Domestic company\",\"CompanyTypeId\":\"Domestic company\",\"DomesticCount\":18,\"EffectiveRegistrationCount\":18,\"FirstApplicationYear\":null,\"FirstRegistrationYear\":\"2017\",\"IVD\":\"0\",\"ImportCount\":0,\"LatestApplicationYear\":null,\"LatestRegistrationYear\":\"2020\",\"Listing\":{\"Cn\":null,\"En\":null,\"Id\":null},\"ListingCn\":null,\"ListingEn\":null,\"ListingId\":null,\"TotalCount\":18,\"company_registration_relation\":{\"name\":\"company\"},\"website_url\":\"\"}";
        bulkProcessor.add(new IndexRequest("devintcompany@1562219164186").source(source, XContentType.JSON));
        System.out.println("添加第 " + i + "条数据!");
    }
    try {
        bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
        client.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("添加完成!");
}

到了这里,关于ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Elasticsearch进行数据批量操作

    Elasticsearch是一个开源的搜索和分析引擎,基于Lucene库开发。它可以用来实现文本搜索、数据分析、实时数据处理等功能。在大数据时代,Elasticsearch成为了处理和分析大量数据的首选工具之一。 数据批量操作是Elasticsearch中的一种常见操作,它可以用来对大量数据进行创建、更

    2024年02月22日
    浏览(45)
  • ElasticSearch(7.8版本)聚合查询使用javaHighLevelRestClient实现(从MySQL聚合查询概念->ES聚合概念及实操)

    申明:本文是在实现ES聚合功能中,将过程中查找的多篇博客文献拼接在一起,参考到的博文全部在标题中附上了原文的超链接,分享出来仅是为了提做一个笔记以防忘记,并给大家提供一个参考。 聚合操作指的是在数据查找基础上对于数据进一步整理筛选行为,聚合操作也

    2023年04月24日
    浏览(58)
  • Elasticsearch 8.X 如何优雅的实现字段名称批量修改?

    写入es前,数据格式如下 需求:单纯用pipeline可不可以实现,如果写入key包含json_提换为空,包含tmp提换为core,因为key字段有很多不考虑穷举,最终效果要如下: ——问题来源:死磕Elasticsearch知识星球 https://t.zsxq.com/0bzWL3w1X Elasticsearch mapping 一旦创建是不允许修改的!允许更

    2023年04月09日
    浏览(39)
  • SpringBoot 整合ElasticSearch实现模糊查询,批量CRUD,排序,分页,高亮

    准备一个空的SpringBoot项目 写入依赖 注意你的SpringBoot和你的es版本,一定要对应,如果不知道的可以查看这篇文章:https://blog.csdn.net/u014641168/article/details/130386872 我的版本是2.2.6,所以用的ES版本是 6.8.12,安装es请看这篇文章:https://blog.csdn.net/u014641168/article/details/130622430 查看

    2024年02月08日
    浏览(53)
  • elasticsearch-JAVA-使用UpdateByQueryRequest进行条件批量修改

    最近项目中用到了 es 搜索引擎 ,需求用到了 根据条件修改es 的字段数据 ,网上查了很久 很多都是 查询出要修改的id 然后再根据id 进行单个修改 这样太费事了 又看了看 es 是有批量修改的语法的 Java api 就这样写 这样写 应该是有些缺陷的 但最总也实现了 类似于 sql 的 UPDA

    2024年02月11日
    浏览(50)
  • 最新ThinkPHP版本实现证书查询系统,实现批量数据导入,自动生成电子证书

    前提:朋友弄了一个培训机构,培训考试合格后,给发证书,需要一个证书查询系统。委托我给弄一个,花了几个晚上给写的证书查询系统。 实现功能: 前端按照姓名+手机号码进行证书查询 证书信息展示+证书展示,支持点击下载 后端证书信息录入+带一寸照片数据批量导入

    2024年01月23日
    浏览(48)
  • Python图像处理实战:使用PIL库批量添加水印的完整指南【第27篇—python:Seaborn】

    在日常图像处理中,为图片添加水印是一项常见任务。有多种方法和工具可供选择,而今天我们将专注于使用Python语言结合PIL库批量添加水印。 需要注意的是,所选用的图片格式不应为JPG或JPEG,因为这两种格式的图片不支持透明度设置。 先前的文章已经详细介绍过PIL库,这

    2024年01月16日
    浏览(51)
  • Spring Boot 集成 ElasticSearch:实现模糊查询、批量 CRUD、排序、分页和高亮功能

    文章来源:https://blog.csdn.net/qq_52355487/article/details/123805713 在pom.xml里加入如下依赖 非常重要:检查依赖版本是否与你当前所用的版本是否一致,如果不一致,会连接失败! 1.创建、判断存在、删除索引 2.对文档的CRUD 创建文档: 注意:如果添加时不指定文档ID,他就会随机生成

    2024年02月04日
    浏览(42)
  • vue实现动态路由添加(简单无废话版本)

    最近练习vue的项目,有关于后台管理系统的动态添加路由部分,根据思路实现了基本的功能,在这里记录一下,等后面学习后在进行优化。 这里只是记录我个人最后实现的思路,本人由于是初学者,可能思路和代码有不正确地方,还求多见谅。也请能不吝赐教,一同进步。 我

    2023年04月08日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包