批量插入千万数据到Elasticsearch之bulkProcessor

这篇具有很好参考价值的文章主要介绍了批量插入千万数据到Elasticsearch之bulkProcessor。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.下面是我的代码,之前测试400万的数据到es成功,后面到生产数据1300万的数据导入es的时候出现连接超时错误,io错误;

public static void bulkDeleteByUserNoRequest(String index, List<String> userNos) throws IOException {
    //创建ES客户端
    try (RestHighLevelClient client = getClient()) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                // 执行之前调用
                System.out.println("操作" + request.numberOfActions() + "条数据");
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                // 执行之后调用
                System.out.println("成功" + request.numberOfActions() + "条数据,用时" + response.getTook());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                // 失败时调用
                System.out.println("失败" + request.numberOfActions() + "条数据");
                System.out.println("失败" + failure);
            }
        };
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener)
                .setBulkActions(5000)
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(10L))
                .setConcurrentRequests(10)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();

        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.termsQuery("userNo", userNos));
        searchSourceBuilder.size(10000);
        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.getHits().getTotalHits().value == 0) {
            return;
        }
        if (searchResponse.getHits().getTotalHits().value > 0) {
            for (SearchHit hit : searchResponse.getHits().getHits()) {
                bulkProcessor.add(new DeleteRequest(index).id(hit.getId()));
            }
            bulkProcessor.flush();
            bulkProcessor.close();
            client.close();
        }

2.遇到的问题连接超时和io问题,这是因为发起的线程太多了上面我设置了10个线程,一个线程5000的数据,.setBulkActions(5000)  .setConcurrentRequests(10) 因为线程太多导致有一个线程不成功的话就会导致缺少数据,后面解决把.setBulkActions(2000)  .setConcurrentRequests(1)就成功导入1000万数据成功了

bulkprocessor使用,elasticsearch,java,大数据

3.遇到的坑

最后一次如果没有达到5000的不会发送请求,手动刷新一次.flush,关闭客户端不能立马关闭不然会出现I/O异常,使用bulkProcessor.awaitClose( timeout: executeTime/1000,TimeUnit.SECoNDS);关闭,加在flush的后面一行

bulkprocessor使用,elasticsearch,java,大数据

 4.最终代码成功导入一千万数据到es完整代码耗时30分钟文章来源地址https://www.toymoban.com/news/detail-766312.html

 public static void bulksaveByUserNoRequest(String index, List<String> userNos) throws IOException {
        //创建ES客户端
        try (RestHighLevelClient client = getClient()) {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    // 执行之前调用
                    System.out.println("操作" + request.numberOfActions() + "条数据");
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    // 执行之后调用
                    System.out.println("成功" + request.numberOfActions() + "条数据,用时" + response.getTook());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    // 失败时调用
                    System.out.println("失败" + request.numberOfActions() + "条数据");
                    System.out.println("失败" + failure);
                }
            };
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                            (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                            listener)
                    .setBulkActions(2000)
                    .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(10L))
                    .setConcurrentRequests(2)
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();

            try {
                for (Map tempMap : list) {
                    IndexRequest indexRequest = new IndexRequest(indexName)
                            .id((String) tempMap.get("userNo")).source(JSON.toJSONString(tempMap), XContentType.JSON);
                    bulkProcessor.add(indexRequest);
                }
                bulkProcessor.flush();
                bulkProcessor.awaitClose(5, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("错误信息:{}", e.getMassage)
            } finally {
                if (bulkProcessor != null) {
                    bulkProcessor.close();
                }
                if (client != null) {
                    client.close();
                }
            }
        } catch (Exception e) {
            log.error("错误信息:{}", e.getMassage)
        }
    }

到了这里,关于批量插入千万数据到Elasticsearch之bulkProcessor的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch中批量操作(批量查询_mget、批量插入删除_bulk)

    有时候可以通过批量操作来减少网络请求。如:批量查询、批量插入数据。 当某一条数据不存在,不影响整体响应,需要通过found的值进行判断是否查询到数据。          在Elasticsearch中,支持批量的插入、修改、删除操作,都是通过_bulk的api完成的。 请求格式如下:(

    2024年02月12日
    浏览(37)
  • 18. ElasticSearch系列之批量插入与更新

    本文介绍工作中Python版常用的高效ES批量插入、更新数据方式 1. 批量插入 2.批量更新 批量更新只需要改动action的以下内容即可 欢迎关注公众号算法小生或沈健的技术博客

    2024年02月11日
    浏览(32)
  • mybatis批量插入数据list超过一定长度时报错的解决办法(批量插入数据,数据过多时报错解决和批量修改报错

    在使用MyBatis进行批量新增时,如果数据量较大,可以考虑分批次插入以减少数据库的负载压力。这里提供一种基于MyBatis的分批次插入的方法: 创建一个新的Mapper XML文件(例如:BatchInsertMapper.xml)来定义批量插入的SQL语句。在该XML文件中,添加如下内容:   请将上述代码中

    2024年02月16日
    浏览(37)
  • java实现批量插入数据

    日常工作或者学习中,可能会遇到批量插入数据的需求,一般情况下数据量少的时候,我们会直接调用批量接口插入数据即可,当数据量特别大时,可能由于数据库限制,插入的数据最多不能超过100条(假如限制100条),就算是数据库支持一次性插入千条也会耗内存,如果使用

    2024年02月11日
    浏览(47)
  • java 批量插入数据

    批量插入数据,常见的使用mybatis foreach 插入的方式,原始的方式和批处理 xml mapper: 对于数据量不是很大的,基本够用。如果同步数据特别慢,再考虑其它的方式。或者晚上凌晨再同步数据。 批量插入 数据库连接: 原始的方法写起来麻烦些。 MybatisGeneralBatchUtils  SpringUtil 

    2024年02月11日
    浏览(70)
  • Mysql大数据批量插入方法

    MySQL是当前最流行的关系型数据库之一,大数据批量插入是MySQL中常用的操作之一。在处理大量数据时,如果一条一条地插入会极大地影响效率,因此批量插入是一个更好的选择,可以大大提高数据的处理速度。下面介绍几种MySQL大数据批量插入的方法。 使用LOAD DATA INFILE语句

    2024年02月10日
    浏览(31)
  • 使用saveOrUpdateBatch实现批量插入更新数据

    saveOrUpdateBatch 是 Hibernate 中的一个方法,可以用来批量插入或更新数据。这个方法的参数是一个 List,里面可以存储多个实体对象。当 Hibernate 执行这个方法时,会检查每个实体对象是否存在主键,如果存在主键就执行 update 操作,否则执行 insert 操作。 使用 saveOrUpdateBatch 的代

    2024年02月11日
    浏览(38)
  • 批量插入数据、MVC三层分离

    八、批量插入数据 1、使用Statement() 2、使用PreparedStatement() 3、使用批量操作API 4、优化 九、MVC三层分离  

    2024年02月14日
    浏览(28)
  • python批量插入数据到mysql

    使用python批量插入数据到mysql的三种方法 单条insert的话插入5w条数据大约用时5秒左右,相对来说效率不高

    2024年02月10日
    浏览(38)
  • 数据库批量插入数据的三种方法

    测试环境:SpringBoot项目+MybatisPlus框架+MySQL数据库+Lombok 方法一: for循环插入(单条) (总耗时:n分钟,每次都要获取连接Connection、释放连接和关闭资源等操作,比较耗时,这里就没测了) 方法二: 批量插入saveBatch (4~7秒,这里用到了MybatisPLus的saveBatch批量插入方法,实际

    2024年02月14日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包