springboot+es批量新增、批量修改、根据内部id批量查询

这篇具有很好参考价值的文章主要介绍了springboot+es批量新增、批量修改、根据内部id批量查询。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pom.xml配置

<dependency>
    <!--  ElasticSearch-->
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.8.0</version>
    <exclusions>
        <exclusion>
            <artifactId>elasticsearch</artifactId>
            <groupId>org.elasticsearch</groupId>
        </exclusion>
        <exclusion>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    <version>2.3.9.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty4-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.8.0</version>
</dependency>

yml配置

elasticsearch:
  hosts: ${ES_HOST:ip:端口}
es_user_name: ${ES_USER_NAME:xxxx}
es_password: ${ES_PASSWORD:xxxxx}

EsConfig配置

@Configuration
@Slf4j
public class EsConfig {
    @Value("${es_user_name}")
    private String username;
    @Value("${es_password}")
    private String password;
    public static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder=RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS=builder.build();
    }
    @Bean
    RestHighLevelClient restHighLevelClient(@Value("${spring.elasticsearch.hosts}") String hosts) {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
        String[] hostsWithPort = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostsWithPort.length];
        for (int i = 0; i < hostsWithPort.length; i++) {
            String hostWithPort = hostsWithPort[i];
            String[] hostPort = hostWithPort.split(":");
            String host = hostPort[0];
            String port = hostPort[1];
            httpHosts[i] = new HttpHost(host, Integer.parseInt(port));
        }
        return new RestHighLevelClient(RestClient.builder(httpHosts).setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider)));
    }
}

ElasticSearchConfig 配置

@Configuration
public class ElasticSearchConfig {

    /**
     * 防止netty的bug
     * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
     */
    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }
}

启动类配置

public static void main(String[] args) {
    System.setProperty("es.set.netty.runtime.available.processors", "false");
    SpringApplication.run(xxxx.class, args);
}

//批量操作的对象

private  BulkProcessor bulkProcessor=createBulkProcessor();
@Autowired
RestHighLevelClient restHighLevelClient;
private  BulkProcessor createBulkProcessor() {

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              BulkResponse response) {
            if (!response.hasFailures()) {
                log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
            } else {
                BulkItemResponse[] items = response.getItems();
                for (BulkItemResponse item : items) {
                    if (item.isFailed()) {
                        log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                        break;
                    }
                }
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              Throwable failure) {

            List<DocWriteRequest<?>> requests = request.requests();
            List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
            log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
        }
    };

    BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
    }), listener);
    //到达10000条时刷新
    builder.setBulkActions(10000);
    //内存到达8M时刷新
    builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
    //设置的刷新间隔10s
    builder.setFlushInterval(TimeValue.timeValueSeconds(10));
    //设置允许执行的并发请求数。
    builder.setConcurrentRequests(8);
    //设置重试策略
    builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
    return builder.build();
}

批量修改

public void bulkUpdate(EsUpdateBO esUpdateBO){
    List<UpdateRequest> updateRequests=new ArrayList<>();

    esUpdateBO.getIds().forEach(e->{
        //获取id
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("es索引名称");
        //更新的id
        updateRequest.id(e);
        //更新的数据
        Map<String,Object> map=new HashMap<>();
        map.put(esUpdateBO.getIsWarning(),"1");

        updateRequest.doc(map);
        updateRequests.add(updateRequest);
    });
    updateRequests.forEach(bulkProcessor::add);
}

批量新增

public void bulkAdd(List<Map<String, Object>> result) {
    List<IndexRequest> indexRequests = new ArrayList<>();
    result.forEach(e -> {
        IndexRequest request = new IndexRequest("es索引名称");
     
        request.source(JSON.toJSONString(e), XContentType.JSON);
        request.opType(DocWriteRequest.OpType.CREATE);
        indexRequests.add(request);
    });
    indexRequests.forEach(bulkProcessor::add);

}

根据es内部id批量查询数据文章来源地址https://www.toymoban.com/news/detail-514316.html

public List<Map<String, Object>> getWaringEventList(List<String> ids) throws IOException {
    String[] idsStr = ids.toArray(new String[ids.size()]);
    List<Map<String, Object>> details=new ArrayList<>();
    SearchRequest searchRequest = new SearchRequest("es索引名称");
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    IdsQueryBuilder idsQueryBuilder = QueryBuilders.idsQuery().addIds(idsStr);
    boolQueryBuilder.must(idsQueryBuilder);
    searchSourceBuilder.query(boolQueryBuilder);
    searchSourceBuilder.size(10000);
    //将所有的条件进行整合
    searchRequest.source(searchSourceBuilder);
    //根据restHighLevelClient进行查询
    SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    SearchHits searchHits = search.getHits();
    SearchHit[] hits = searchHits.getHits();
    for (SearchHit inhit : hits) {
Map<String, Object> thirdSourceAsMap = inhit.getSourceAsMap();
    }

到了这里,关于springboot+es批量新增、批量修改、根据内部id批量查询的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot+JPA实现批量处理新增、修改

    jpa的sava与saveAll save()方法 根据源码我们可以看出来,save是先通过判断这个对象是不是新的,新的便会新增,否则就是执行的修改。整个是有分两步进行的,先查询再新增 saveAll()方法 saveAll()方法是一种更新多条的一种方式,里面传的存对象的集合。分析源码我们可以看出

    2024年02月09日
    浏览(30)
  • ElasticSearch|ES 快速批量查询 doc 的 _id 的方法

    已更新整合到新文章:https://dataartist.blog.csdn.net/article/details/130139631 比较慢的查询方法:如果使用如下 body 查询 ES 索引中内容的话,实际上应该会遍历索引中所有字段,如果字段内容很长的话,速度会比较慢: 结果形如: 比较快的查询方法:如果使用如下 body 查询 ES 索引中

    2024年02月14日
    浏览(37)
  • 笔记|ElasticSearch|ES 快速批量查询 doc 的 _id 的方法

    已更新整合到新文章:https://dataartist.blog.csdn.net/article/details/130139631 比较慢的查询方法:如果使用如下 body 查询 ES 索引中内容的话,实际上应该会遍历索引中所有字段,如果字段内容很长的话,速度会比较慢: 结果形如: 比较快的查询方法:如果使用如下 body 查询 ES 索引中

    2024年02月12日
    浏览(35)
  • Java查询es数据,根据指定id检索(in查询),sql权限过滤,多字段匹配检索,数据排序

    Java集成Elasticsearch,进行索引数据查询,并进行sql权限过滤,指定id检索(in查询),多字段匹配检索,数据排序。由于权限过滤是根据sql语句判断当前用户或其部门可查询的数据,所以采用以下方法: 1.通过sql过滤出当前用户可查询的数据id集合idsList; 2.将当前用户可查询的

    2024年02月22日
    浏览(54)
  • Mybatis 批量新增 只返回第一主键ID 其他返回null

    出现 问题 是加 on duplicate key update 受了影响, 去掉 on duplicate key update代码 ON DUPLICATE key update是根据索引字段是否重复来判断是否执行,如果重复则执行update,否则则执行insert。 优先级主键唯一索引 当主键重复时则执行update 当主键不重复,唯一索引重复时也执行update 当主键

    2024年02月16日
    浏览(31)
  • 交友项目【根据id查询单条动态&发布评论&查询评论列表】

    目录 1:根据id查询单条动态 1.1:接口分析 1.2:流程分析 1.3:代码实现 2:发布评论 2.1:接口分析 2.2:流程分析 2.3:代码实现 3:查询评论列表 3.1:接口分析 3.2:流程分析 3.3:代码实现 1.1:接口分析 API 接口文档: http://192.168.136.160:3000/project/19/interface/api/151 注意: 单条动

    2023年04月23日
    浏览(38)
  • 根据指定端口查询进程id,并杀掉进程

    @echo off  echo ================================================ set port=8080 netstat -nao|findstr !port! echo ================================================ for /f \\\"tokens=2,5\\\" %%i in (\\\'netstat -nao^|findstr :%%port%%\\\') do (     ::if \\\"!processed[%%j]!\\\" == \\\"\\\" (     if not defined processed[%%j] (         set pname=N/A         for /f \\\"to

    2024年02月09日
    浏览(30)
  • Java根据id对elasticsearch查询操作

    一、根据一个id查询 二、根据多个ids查询

    2024年02月12日
    浏览(40)
  • 03 SpringBoot实战 -微头条之首页门户模块(跳转某页面自动展示所有信息+根据hid查询文章全文并用乐观锁修改阅读量)

    需求描述: 进入新闻首页portal/findAllType, 自动返回所有栏目名称和id 接口描述 url地址:portal/findAllTypes 请求方式:get 请求参数:无 响应数据: 成功 代码编写 PortalController : TypeService: TypeServiceImpl: 达到的效果是,不需要任何参数, 只要访问portal/findAllType, 就返回news_type表中的所有

    2024年01月24日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包