ES查询客户端初始化(RestHighLevelClient)

这篇具有很好参考价值的文章主要介绍了ES查询客户端初始化(RestHighLevelClient)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ElasticSearchProperties  配置类,可以配置在中心中

public class ElasticSearchProperties {

    /**
     * 连接超时时间(毫秒)
     */
    private Integer connectTimeout = 3000;

    /**
     * socket 超时时间
     */
    private Integer socketTimeout = 30000;

    /**
     * 连接请求超时时间
     */
    private Integer connectionRequestTimeout = 500;

    /**
     * keepAlive策略时间
     */
    private Integer keepAliveStrategy = 60 * 1000;
}
EsClientFactory  es客户端工厂
/**
 * @author yangwenxin
 * @Date 2022/5/13
 * @Description
 */
@Slf4j
public class EsClientFactory {

    /**
     * 创建es客户端
     *
     * @param properties 配置信息
     * @param address    地址, 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203
     * @param userName   用户名
     * @param password   密码
     * @return
     */
    public static RestHighLevelClient createRestHighLevelClient(ElasticSearchProperties properties, String address,
                                                                String userName, String password) {
        //ElasticSearch 连接地址地址
        HttpHost[] httpHosts = getElasticSearchHttpHosts(address);

        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setRequestConfigCallback(requestConfigBuilder -> {
            //设置连接超时时间
            requestConfigBuilder.setConnectTimeout(properties.getConnectTimeout());
            requestConfigBuilder.setSocketTimeout(properties.getSocketTimeout());
            requestConfigBuilder.setConnectionRequestTimeout(properties.getConnectionRequestTimeout());
            return requestConfigBuilder;
        }).setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.disableAuthCaching();
            httpClientBuilder.setKeepAliveStrategy((response, context) -> properties.getKeepAliveStrategy());
            //设置账密
            return getHttpAsyncClientBuilder(httpClientBuilder, userName, password);
        });
        log.info("init esClient success, address : {}", address);
        return new RestHighLevelClient(restClientBuilder);
    }


    /**
     * ElasticSearch 连接地址
     * 多个逗号分隔
     * 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203
     */
    private static HttpHost[] getElasticSearchHttpHosts(String address) {
        String[] hosts = address.split(",");
        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for (int i = 0; i < httpHosts.length; i++) {
            String host = hosts[i];
            host = host.replaceAll("http://", "").replaceAll("https://", "");
            httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http");
        }
        return httpHosts;
    }

    private static HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder, String username, String password) {
        if (StringUtils.isBlank(username) || StringUtils.isEmpty(password)) {
            return httpClientBuilder;
        }
        //账密设置
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //es账号密码(一般使用,用户elastic)
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpClientBuilder;
    }
}
ElasticSearchClientService  es客户端初始化,从数据库中读取信息初始化客户端。其中DataSourceConfig就是配置了一些es链接的信息,账号密码地址等。
/**
 * @author yangwenxin
 * @Date 2022/5/13
 * @Description
 */
@Service
@Slf4j
public class ElasticSearchClientService {

    @Autowired
    private ElasticSearchProperties elasticSearchProperties;

    private Map<String, EsConfig> esClientMap = new HashMap<>();

    @PostConstruct
    public void init() {
        initEsClientMap();
    }

    /**
     * 初始化es数据源客户端到缓存
     */
    public void initEsClientMap() {
        // TODO 数据库查询数据源信息 自己项目中查询数据库方法,或者将ES信息配置在配置中心
        List<DataSourceConfig> dataSourceConfigs = Lists.newArrayList();
        if (CollectionUtils.isEmpty(dataSourceConfigs)) {
            return;
        }
        // address相同认为是一组  es数据源初始化address包含账号密码
        Map<String, List<DataSourceConfig>> dataSourceConfigMaps = dataSourceConfigs.stream().collect(
                Collectors.groupingBy(
                        DataSourceConfig::getAddress
                ));

        dataSourceConfigMaps.forEach((key, value) -> {
            DataSourceConfig dataSourceConfig = value.get(0);
            RestHighLevelClient restHighLevelClient = EsClientFactory.createRestHighLevelClient(
                    elasticSearchProperties,
                    dataSourceConfig.getAddress(),
                    dataSourceConfig.getUsername(),
                    dataSourceConfig.getPassword()
            );
            value.forEach(v -> {
                EsConfig esConfig = new EsConfig(restHighLevelClient, v.getInstance());
                esClientMap.put(v.getDataSourceNo(), esConfig);
            });
        });
    }

    @PreDestroy
    public void closeEsClient() {
        if (CollectionUtils.isEmpty(esClientMap)) {
            return;
        }
        esClientMap.forEach((key, value) -> {
            try {
                RestHighLevelClient client = value.getRestHighLevelClient();
                if (client != null) {
                    client.close();
                }
            } catch (Exception e) {
                log.error("close restHighLevelClient error! key: {},errMsg : {}", key, e.getMessage());
            }
        });
    }

    public EsConfig getEsConfigByNo(String dateSourceNo) {
        return esClientMap.get(dateSourceNo);
    }

    public synchronized void refreshEsDataSource(){
        initEsClientMap();
        log.info("refresh esDataSource success!");
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EsConfig {
        /**
         * es客户端
         */
        private RestHighLevelClient restHighLevelClient;
        /**
         * 索引名称
         */
        private String dataSourceName;
    }
}
EsClientUtil 查询工具类
@Slf4j
public class EsClientUtil {

    private static final String DEFAULT_TYPE = "_doc";

    /**
     * query string 查询, queryString语法。语法自行百度
     * @param request
     * @return
     * @throws IOException
     */
    public static LogDataResp queryStringQuery(RestHighLevelClient client, QueryLogDataReq request) {
        String indexName = request.getQueryIndex();
        try {
            SearchRequest searchRequest = new SearchRequest(indexName);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

            // 查询语句
            BoolQueryBuilder must =
                    QueryBuilders.boolQuery().must(
                            QueryBuilders.queryStringQuery(request.getQueryScript())
                    );

            // 时间范围  TODO 时区
            if(StringUtils.isNotBlank(request.getTimeFieldName()) && request.getTimeFrameMinutes() != null){
                RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(request.getTimeFieldName())
                        .gte("now-" + request.getTimeFrameMinutes() + "m")
                        .lte("now")
                        // es时区默认是UTC
                        .timeZone("UTC");
                must.must(rangeQueryBuilder);
                // 根据时间降序排列
                searchSourceBuilder.sort(request.getTimeFieldName(), SortOrder.DESC);
            }
            searchSourceBuilder.query(must).from(0).size(request.getQuerySize());
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            // 返回值解析
            return buildLogDataResponse(searchResponse,null);

        } catch (Exception e) {
            String errMsg = MessageFormat.format("EsClientUtil#queryStringQuery failed, params={0}", request);
            log.error(errMsg, e);
            return LogDataResp.buildFailed(e.getMessage());
        }
    }

    /**
     * 查询所有的索引
     * @param client
     * @return
     * @throws IOException
     */
    public static List<String> queryAllIndexAlias(RestHighLevelClient client) {
        try {
            GetAliasesRequest getAliasesRequest = new GetAliasesRequest();
            GetAliasesResponse getAliasesResponse = client.indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);
            Map<String, Set<AliasMetaData>> aliases = getAliasesResponse.getAliases();
            if(!aliases.isEmpty()){
                return new ArrayList<>(aliases.keySet());
            }
            return Arrays.asList();
        } catch (Exception e) {
            String errMsg = "EsClientUtil#queryAllIndexAlias error";
            log.error("errMsg" , e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 向索引插入单条记录
     * @param client
     * @param indexName    索引
     * @param docId        文档ID
     * @param docSource    文档 JSON格式
     * @return
     * @throws IOException
     */
    public static boolean insertDoc(RestHighLevelClient client, String indexName, String docId, Object docSource) {
        try {
            IndexRequest request = new IndexRequest(indexName,DEFAULT_TYPE);
            if (null != docId) {
                request.id(docId);
            }
            request.source(JSON.toJSONString(docSource), XContentType.JSON);
            IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
            if ((indexResponse.status() == RestStatus.CREATED) || (indexResponse.status() == RestStatus.OK)) {
                return true;
            }
            log.error("insertDoc fail status:{} => {}", indexResponse.status(), indexResponse);
            return false;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#insertDoc failed, indexName={0},docId={1}, docSource={2}",
                            indexName,docId,docSource);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 向索引插入批量记录  不指定id
     * @param client
     * @param indexName   索引名称
     * @param docSources  数据 JSON格式
     * @return
     */
    public static boolean insertDocBulk(RestHighLevelClient client,String indexName, List<Object> docSources) {
        try {
            if (CollectionUtils.isEmpty(docSources)) {
                return false;
            }
            BulkRequest bulkRequest = new BulkRequest();
            docSources.forEach(docSource -> {
                IndexRequest request = new IndexRequest(indexName,DEFAULT_TYPE);
                request.source(JSON.toJSONString(docSource), XContentType.JSON);
                bulkRequest.add(request);
            });
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (!bulkResponse.hasFailures()) {
                return true;
            }
            log.error("insertDocBulk fail status:{}, errorMsg:{}", bulkResponse.status(),
                    bulkResponse.buildFailureMessage());
            return false;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#insertDocBulk failed, indexName={0}",
                            indexName);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 向索引插入批量记录  指定id
     * @param client
     * @param indexName      索引名称
     * @param docSourceMap   key->Id value->docSource JSON格式
     * @return
     */
    public static boolean insertDocBulk(RestHighLevelClient client,String indexName, Map<String,Object> docSourceMap) {
        try{
            if (CollectionUtils.isEmpty(docSourceMap)) {
                return false;
            }
            BulkRequest bulkRequest = new BulkRequest();
            docSourceMap.forEach((key,value) -> {
                IndexRequest request = new IndexRequest(indexName,DEFAULT_TYPE);
                request.id(key);
                request.source(JSON.toJSONString(value), XContentType.JSON);
                bulkRequest.add(request);
            });
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (!bulkResponse.hasFailures()) {
                return true;
            }
            log.error("insertDocBulk fail status:{}, errorMsg:{}", bulkResponse.status(),
                    bulkResponse.buildFailureMessage());
            return false;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#insertDocBulk failed, indexName={0}",
                            indexName);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 根据ID 删除文档
     * @param client
     * @param indexName  索引名称
     * @param docId
     * @return
     */
    public static boolean deleteDocById(RestHighLevelClient client, String indexName, String docId) {
        try{
            DeleteRequest deleteRequest = new DeleteRequest(indexName,DEFAULT_TYPE,docId);
            DeleteResponse delete = client.delete(deleteRequest, RequestOptions.DEFAULT);
            int total = delete.getShardInfo().getTotal();
            return total > 0;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#deleteDocById failed, indexName={0},docId={1} ",
                            indexName, docId);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 根据ID更新文档
     * @param client
     * @param indexName   索引名称
     * @param docId       文档ID
     * @param docSource   文档数据 JSON格式
     * @return
     */
    public static boolean updateDocById(RestHighLevelClient client, String indexName, String docId, Object docSource) {
        try{
            UpdateRequest updateRequest = new UpdateRequest(indexName,DEFAULT_TYPE,docId);
            updateRequest.doc(JSON.toJSONString(docSource),XContentType.JSON);
            // 执行更新
            UpdateResponse update = client.update(updateRequest, RequestOptions.DEFAULT);
            int total = update.getShardInfo().getTotal();
            return total > 0;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#updateDoc failed, indexName={0},docSource={1} ",
                            indexName, docSource);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 根据ID查询
     * @param client
     * @param indexName
     * @param docId
     * @return
     */
    public static <T> T queryDocById(RestHighLevelClient client, String indexName, String docId, Class<T> clazz) {
        try {
            GetRequest request = new GetRequest(indexName,DEFAULT_TYPE,docId);
            GetResponse response = client.get(request, RequestOptions.DEFAULT);
            if(response.isSourceEmpty()){
                return null;
            }
            return JSON.parseObject(response.getSourceAsString(), clazz);
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#queryDocById failed, indexName={0},docId={1} ",
                            indexName, docId);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }

    /**
     * 根据条件查询分页
     * 注:from-size分页 可以随机跳转页面,在深度分页的情况下,效率是非常低的。深度分页参考search_after 不支持随机跳转,只支持
     * @param index    索引
     * @param pageNo   页码(第几页)
     * @param pageSize 页容量- Elasticsearch默认配置单次最大限制10000
     */
    public static <T> List<T> searchPageByIndex(RestHighLevelClient client, String index, SearchSourceBuilder searchSourceBuilder, Integer pageNo, Integer pageSize, Class<T> clazz) {
        if(searchSourceBuilder == null){
            searchSourceBuilder = new SearchSourceBuilder();
        }
        searchSourceBuilder.from((pageNo-1)*pageSize);
        searchSourceBuilder.size(pageSize);
        return searchByQuery(client, index, searchSourceBuilder, clazz);
    }

    /**
     * 条件查询
     *
     * @param indexName         索引
     * @param sourceBuilder 条件查询构建起
     * @param <T>           数据类型
     * @return T 类型的集合
     */
    public static <T> List<T> searchByQuery(RestHighLevelClient client, String indexName, SearchSourceBuilder sourceBuilder, Class<T> clazz) {
        try {
            // 构建查询请求
            SearchRequest searchRequest = new SearchRequest(indexName).source(sourceBuilder);
            // 获取返回值
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            if (null == hits || hits.length == 0) {
                return new ArrayList<>(0);
            }
            List<T> resultList = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                resultList.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
            return resultList;
        } catch (Exception e) {
            String errMsg =
                    MessageFormat.format("EsClientUtil#searchByQuery failed, indexName={0}, sourceBuilder={1} ",
                            sourceBuilder);
            log.error(errMsg, e);
            throw new RuntimeException(errMsg, e);
        }
    }


    /**
     * 结果解析构建返回结果
     * @param searchResponse
     * @param aggName
     * @return
     */
    private static LogDataResp buildLogDataResponse(SearchResponse searchResponse, String aggName){
        LogDataResp logDataResponse = new LogDataResp();
        if(searchResponse.status().getStatus() != RestStatus.OK.getStatus()){
            logDataResponse.setSucceeded(Boolean.FALSE);
            return logDataResponse;
        }

        long totalHits = searchResponse.getHits().getTotalHits();
        logDataResponse.setTotalHit(totalHits);
        SearchHit[] hits = searchResponse.getHits().getHits();
        if(hits.length > 0){
            List<Map> hitSourceList = new ArrayList<>();
            for(SearchHit hit : hits){
                hitSourceList.add(hit.getSourceAsMap());
            }
            logDataResponse.setHitSourceList(hitSourceList);
        }

        if(aggName != null){
            Aggregations aggregations = searchResponse.getAggregations();
            Terms aggregation = (Terms)aggregations.get(aggName);
            List<? extends Terms.Bucket> buckets = aggregation.getBuckets();
            Map map = Maps.newHashMap();
            List<Map> aggregationBucketList = new ArrayList<>();
            for (Terms.Bucket bucket : buckets) {
                map.put(bucket.getKeyAsString(),bucket.getDocCount());
            }
            aggregationBucketList.add(map);
            logDataResponse.setAggregationBucketList(aggregationBucketList);
        }
        return logDataResponse;
    }


}

另外,如果说把es当成一个数据库使用,可以看下开源项目easy-es,操作更方便。文档地址:快速开始 | Easy-Es文章来源地址https://www.toymoban.com/news/detail-505683.html

到了这里,关于ES查询客户端初始化(RestHighLevelClient)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring data elasticsearch使用7.x客户端兼容es 8.x和使用ssl构建RestHighLevelClient

    es在7.x中默认加入elastic security组件所以java client需要使用ssl连接es server. es 8.x 中废弃了 RestHighLevelClient ,使用新版的 java api client ,但是spring data elasticsearch还未更新到该版本.所以需要兼容es 8.x 如下是RestHighLevelClient构建方法: spring data elasticsearch客户端依赖(基于spring boot2.7使用最新

    2024年02月13日
    浏览(34)
  • 【Elasticsearch学习笔记五】es常用的JAVA API、es整合SpringBoot项目中使用、利用JAVA代码操作es、RestHighLevelClient客户端对象

    目录 一、Maven项目集成Easticsearch 1)客户端对象 2)索引操作 3)文档操作 4)高级查询 二、springboot项目集成Spring Data操作Elasticsearch 1)pom文件 2)yaml 3)数据实体类 4)配置类 5)Dao数据访问对象 6)索引操作 7)文档操作 8)文档搜索 三、springboot项目集成bboss操作elasticsearch

    2023年04月09日
    浏览(38)
  • RestHighLevelClient初始化http参数的含义

    high-level-rest-client 初始化 一般初始化时需要设置验证信息、http相关参数; Http相关参数介绍 keepalive keepalive用以维护长连接,长连接可以复用,但一定情况下需要中断长连接,如长连接长时间没有被使用的场景,需要中断长连接来节省资源。如上述ES客户端初始化案例中,则是

    2024年02月12日
    浏览(43)
  • Java项目初始化ES、MYSQL表结构及表数据

    其中SystemMapper为  如果索引不存在就创建es索引    根据需要、添加es别名

    2024年02月12日
    浏览(29)
  • Elasticsearch8.x版本中RestHighLevelClient被弃用,新版本中全新的Java客户端Elasticsearch Java API Client中常用API练习

    在Es7.15版本之后,es官方将它的高级客户端RestHighLevelClient标记为弃用状态。同时推出了全新的java API客户端Elasticsearch Java API Client,该客户端也将在Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。 Elasticsearch Java API Client支持除Vector title search API和Find structure API之外的所有

    2023年04月08日
    浏览(36)
  • es相关的官方客户端与spring客户端对比与介绍

    es提供的 TransportClient 传统的客户端,基于TCP传输协议与Elasticsearch通信。 已经被弃用,不推荐使用。 适用于Elasticsearch 5.x及以前的版本 因为Elasticsearch 6.x及以上版本已不再支持TCP Transport协议,TransportClient无法连接Elasticsearch集群。 RestHighLevelClient 是一个高级的REST客户端,主要用于与

    2024年02月02日
    浏览(44)
  • 【JavaScript】JavaScript 变量 ① ( JavaScript 变量概念 | 变量声明 | 变量类型 | 变量初始化 | ES6 简介 )

    JavaScript 变量 是用于 存储数据 的 容器 , 通过 变量名称 , 可以 获取 / 修改 变量 中的数据 ; 变量 的 本质 是 存放数据 的 一块内存空间 ; 在 JavaScript 中, 使用 var / let / const 来声明变量 , 每个变量都有一个 变量名 和 一个 变量值 ; JavaScript 变量声明 : var : 使用

    2024年03月15日
    浏览(40)
  • Scala连接ES客户端

    大家好,我是楚生辉,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己! 本文详细的介绍了如何使用Scala语言连接上Elasticsearch客户端,有需要的小伙伴可以自行获取与学习~ 使用方法 写入ES中,就要规划,是写入到一个索引中,还是分割索

    2024年01月21日
    浏览(37)
  • java用es客户端创建索引

    先用java创建esClient 创建es索引模板 新建索引 批量插入数据 1.先批量生成数据 2批量导入方法

    2024年02月11日
    浏览(37)
  • 【ES实战】ES创建Transports客户端时间过长分析

    2023年10月19日 在创建ES Transport客户端的时,当出现以下场景时,影响连接速度。 使用ES Transport 客户端创建与集群的链接。 连接地址里面有不存在的IP 在增加ES节点时,采用逐个增加的方式 整个建立链接的过程会非常耗时。 采用jar依赖如下 创建连接代码如下 输出结果 是否可

    2024年02月07日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包