【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

这篇具有很好参考价值的文章主要介绍了【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据,问题随笔,# Elasticsearch,Java,java,elasticsearch,restClient
【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据,问题随笔,# Elasticsearch,Java,java,elasticsearch,restClient
【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据,问题随笔,# Elasticsearch,Java,java,elasticsearch,restClient

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据,问题随笔,# Elasticsearch,Java,java,elasticsearch,restClient

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
 * 清理ES历史数据定时任务
 */
@Component
public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 根据索引名称删除当前日期两个月前的那一天的历史文档数据
     * @param jobContext
     */
    @Scheduled
    public void cleanESHistoryData(JobContext jobContext) {
    	// jobContext为定时任务中回传数据
        String indexName = jobContext.getData();
        if (StringUtils.isBlank(indexName)) {
            LOGGER.warn("ES索引名称不能为空!");
            return;
        }
        long startTimeMillis = System.currentTimeMillis();
        String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
        try {
            String startTimeStr = twoMonthsAgoDate + " 00:00:00";
            // 初始化时间,形如2023-08-06 00:00:00
            Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
            // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
            for (int i = 0; i < 24; i++) {
                Date startTime = initialStartTime;
                startTime = DateUtils.addHours(startTime, i);
                Date endTime = DateUtils.addHours(startTime, 1);
                LOGGER.info("正在清理索引:[{}],时间:{} 至 {}的历史文档数据...", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
                long currentStartTimeMillis = System.currentTimeMillis();
                // 指定操作的索引库
                SearchRequest searchRequest = new SearchRequest(indexName);
                // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("createTimeStr.keyword")
                                                                                    .from(DateTool.format(startTime, DF_FULL))
                                                                                    .to(DateTool.format(endTime, DF_FULL)))
                                                                                    .size(1000);
                // 设置滚动查询结果在内存中的过期时间为1min
                Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
                // 将滚动以及构造的查询条件放入查询请求
                searchRequest.scroll(scroll).source(searchSourceBuilder);
                SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                // 记录要滚动的ID
                String scrollId = searchResponse.getScrollId();
                SearchHit[] hits = searchResponse.getHits().getHits();
                while (hits != null && hits.length > 0) {
                    // 创建批量处理请求对象
                    BulkRequest bulkRequest = new BulkRequest();
                    for (SearchHit hit : hits) {
                        DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
                        bulkRequest.add(deleteRequest);
                    }
                    // 执行批量删除请求操作
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    // 构造滚动查询条件,继续滚动查询
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                    scrollRequest.scroll(scroll);
                    searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    scrollId = searchResponse.getScrollId();
                    hits = searchResponse.getHits().getHits();
                }
                // 当前滚动查询结束,清除滚动,释放服务器内存资源
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                LOGGER.info("清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
            }
            LOGGER.info("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
        } catch (Exception e) {
            LOGGER.error(String.format("[cleanESHistoryData] 定时任务-清理索引:[%],时间:%的历史文档数据失败,耗时%ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
        }
    }
}

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据,问题随笔,# Elasticsearch,Java,java,elasticsearch,restClient

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。文章来源地址https://www.toymoban.com/news/detail-632172.html

到了这里,关于【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring data elasticsearch使用7.x客户端兼容es 8.x和使用ssl构建RestHighLevelClient

    es在7.x中默认加入elastic security组件所以java client需要使用ssl连接es server. es 8.x 中废弃了 RestHighLevelClient ,使用新版的 java api client ,但是spring data elasticsearch还未更新到该版本.所以需要兼容es 8.x 如下是RestHighLevelClient构建方法: spring data elasticsearch客户端依赖(基于spring boot2.7使用最新

    2024年02月13日
    浏览(53)
  • Elasticsearch8.x版本中RestHighLevelClient被弃用,新版本中全新的Java客户端Elasticsearch Java API Client中常用API练习

    在Es7.15版本之后,es官方将它的高级客户端RestHighLevelClient标记为弃用状态。同时推出了全新的java API客户端Elasticsearch Java API Client,该客户端也将在Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。 Elasticsearch Java API Client支持除Vector title search API和Find structure API之外的所有

    2023年04月08日
    浏览(52)
  • java用es客户端创建索引

    先用java创建esClient 创建es索引模板 新建索引 批量插入数据 1.先批量生成数据 2批量导入方法

    2024年02月11日
    浏览(53)
  • es最大相似度检索(原生与java客户端)

    原生rest: 对“不好”进行分词, \\\"operator\\\": \\\"and\\\" 意思是同时满足。 结果: java RestHighLevelClient

    2024年02月11日
    浏览(49)
  • 【U8+】使用天联高级版客户端登录用友U8,指定U8服务器地址。

    【问题描述】 当使用U8客户端电脑作为天联高级版软件服务器的时候, 即:U8应用服务器和远程服务器不是同一台电脑。 每次新建天高用户后, 新的天高用户,登录天高客户端后并打开U8登录界面, 用友U8的登录窗口服务器地址即为天高服务器的计算机名称, 而不是用友U

    2024年02月13日
    浏览(47)
  • Redis的Java客户端-Java客户端以及SpringDataRedis的介绍与使用

    Spring Data Redis底层支持同时兼容Jedis和Lettuce两种不同的Redis客户端,可以根据需要任意选择使用其中的一种。这样既可以保留现有代码使用的Jedis方式,也可以通过使用基于Netty的高性能Lettuce客户端,提升应用的性能和吞吐量。 Jedis是一个传统的Java Redis客户端,使用BIO进行So

    2024年02月08日
    浏览(65)
  • java webSocket服务端、客户端、心跳检测优雅解决

    项目分为三个端,项目之间需要webSocket通信。 WebSocketConfig WebSocketServer

    2024年02月17日
    浏览(40)
  • es(Elasticsearch)客户端Elasticsearch-head安装使用(04Elasticsearch-head安装篇)

    elasticsearch-head是一款专门针对于elasticsearch的客户端工具,用来展示数据。elasticsearch-head是基于JavaScript语言编写的,可以使用npm部署,npm是Nodejs下的包管理器 安裝方式利用npm和nodejs进行安装启动,github中给出的安装方法也是这种,本文就是以这种方式进行解说 es(Elasticsearc

    2024年01月17日
    浏览(52)
  • es相关的官方客户端与spring客户端对比与介绍

    es提供的 TransportClient 传统的客户端,基于TCP传输协议与Elasticsearch通信。 已经被弃用,不推荐使用。 适用于Elasticsearch 5.x及以前的版本 因为Elasticsearch 6.x及以上版本已不再支持TCP Transport协议,TransportClient无法连接Elasticsearch集群。 RestHighLevelClient 是一个高级的REST客户端,主要用于与

    2024年02月02日
    浏览(69)
  • JAVA使用WebSocket实现多客户端请求

    工作前提:两个服务之间实现聊天通讯,因为介于两个服务,两个客户端 方案1:多个服务端,多个客户端,使用redis把用户数据ip进行存储,交互拿到redis数据进行推送 方案2: 一个服务端,多个客户端,拿到客户端的id和需要推送的id进行拼接存储 此文章使用的是方案2 1. 引

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包