[Java Framework] [ELK] Spring 整合ES (ElasticSearch7.15.x +)

这篇具有很好参考价值的文章主要介绍了[Java Framework] [ELK] Spring 整合ES (ElasticSearch7.15.x +)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ElasticSearch7.15.x 版本后,废弃了高级Rest客户端的功能
[Java Framework] [ELK] Spring 整合ES (ElasticSearch7.15.x +)

方法 / 步骤

一:POM依赖

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>7.17.5</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.2</version>
        </dependency>

二:相关配置

2.1 配置文件

# es服务地址 / 服务端口
elasticsearch:
  hostname: mws.com
  port: 9200
#  username: elastic
#  password: elastic

2.2 配置类

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.hostname}")
    private String hostname;
    @Value("${elasticsearch.port}")
    private Integer port;

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        //对应ElasticSearch的IP端口
        RestClient restClient = RestClient.builder(new HttpHost(hostname, port)).build();
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);

        // es设置了密码,可以这样连接
		/*
		CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("账号", "密码"));
        RestClient restClient = RestClient.builder(new HttpHost("xx.xx.xx.xx",9200)).setHttpClientConfigCallback(httpAsyncClientBuilder -> {
            httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpAsyncClientBuilder;
        }).build();
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
        */
    }
}

三: Java 客户端操作ES的API

3.1 索引的相关操作

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.IntegerNumberProperty;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Description:
 *
 * @author: YangGC
 */

@Slf4j
@RestController("/es")
public class IndexController {

    @Resource
    ElasticsearchClient elasticsearchClient;


    @PostMapping("/createIndex")
    public String createIndex(@RequestParam String indexName) throws IOException {
        //创建一个空字段索引
//        elasticsearchClient.indices().create(createIndex -> createIndex.index(indexName));

        /**
         *  分析器主要有两种情况会被使用:
         *      第一种是插入文档时,将text类型的字段做分词然后插入倒排索引,
         *      第二种就是在查询时,先对要查询的text类型的输入做分词,再去倒排索引搜索
         *  analyzer: 分词器
         *  searchAnalyzer: 查询分词器
         */
        //配置索引
        Map<String, Property> documentMap = new HashMap<>();
        documentMap.put("id", Property.of(val -> val.integer(IntegerNumberProperty.of(ival -> ival.index(true)))));
        documentMap.put("user_name", Property.of(val -> val.text(TextProperty.of(valt -> valt.index(true).analyzer("ik_max_word")
                .searchAnalyzer("ik_smart").index(true).store(true)))));
        documentMap.put("age", Property.of(val -> val.integer(IntegerNumberProperty.of(ival -> ival.index(true)))));

        CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(val -> val.index(indexName)
                .mappings(currMapping -> currMapping.properties(documentMap)).aliases("aliases"+indexName, aliases -> aliases.isWriteIndex(true)));
        boolean acknowledged = createIndexResponse.acknowledged();
        System.out.println("acknowledged = " + acknowledged);
        return indexName;
    }

    //测试判断是否拥有某个索引
    @GetMapping("/existsIndex")
    boolean existsIndex() throws IOException {
        //创建获取索引请求
        ExistsRequest existsRequest = new ExistsRequest.Builder().index("user").build();
        //执行获取索引请求判断是否有这个索引
        BooleanResponse booleanResponse = elasticsearchClient.indices().exists(existsRequest);
        return booleanResponse.value();
    }

    @GetMapping("/getIndex")
    Map<String, IndexState> getIndex() throws IOException {
        //创建获取索引请求
        GetIndexRequest indexRequest = new GetIndexRequest.Builder().index("user").build();
        //执行获取索引请求判断是否有这个索引
        GetIndexResponse indexResponse = elasticsearchClient.indices().get(indexRequest);
        return indexResponse.result();
    }

    @GetMapping("/deleteIndex")
    public String deleteIndex(@RequestParam String indexName) throws IOException {
        DeleteIndexResponse deleteIndexResponse = elasticsearchClient.indices().delete(index -> index.index(indexName));
        boolean acknowledged = deleteIndexResponse.acknowledged();
        System.out.println("acknowledged = " + acknowledged);
        return acknowledged+"";
    }

}

3.2 实体映射相关操作

3.2.1 创建实体类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * Description:
 *
 * @author: YangGC
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Staff {
    private Long staffNo;
    private String name;
    private Integer age;
}
3.2.2 Doc实体操作API
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.*;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Description:
 *
 * @author: YangGC
 */
@RestController
@RequestMapping("/esdoc")
public class EsDocController {

    @Autowired
    private ElasticsearchClient elasticsearchClient;

    //设置索引
    @RequestMapping("/createIndex")
    public void createIndex() throws IOException {
        Map<String, Property> property = new HashMap<>();
        property.put("name", new Property(new TextProperty.Builder().analyzer("ik_max_word").searchAnalyzer("ik_smart").index(true).store(true).build()));
        property.put("staffNo", new Property(new LongNumberProperty.Builder().index(true).store(true).build()));
        property.put("age", new Property(new IntegerNumberProperty.Builder().index(true).store(true).build()));
        TypeMapping typeMapping = new TypeMapping.Builder().properties(property).build();
        //该API还可设置分片设置,别名设置等等
        CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index("staff").mappings(typeMapping).build();
        CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(createIndexRequest);
        System.out.println(createIndexResponse.acknowledged());
    }





    @RequestMapping("/createDocument")
    public void createDocument() throws IOException {
        Staff staff001 = new Staff(001L,"员工001-王大力",22);

        IndexRequest<Staff> indexRequest = new IndexRequest.Builder<Staff>().index("staff").document(staff001).id("001").build();
        IndexResponse indexResponse = elasticsearchClient.index(indexRequest);
        //返回索引信息
        System.out.println(indexResponse.toString());
        //返回id
        System.out.println(indexResponse.result());
    }

    //查看是否存在
    @RequestMapping("/existsDocument")
    public void existsDocument() throws IOException {
        GetRequest getRequest = new GetRequest.Builder().index("staff").id("001").build();
        GetResponse<Staff> bookGetResponse = elasticsearchClient.get(getRequest, Staff.class);
        //查看是否存在
        //IndexResponse: {"_id":"001","_index":"staff","_primary_term":1,"result":"created","_seq_no":0,"_shards":{"failed":0.0,"successful":1.0,"total":2.0},"_type":"_doc","_version":1}
        System.out.println(bookGetResponse.found());
    }

    /**
     * 获取文档
     * @throws IOException
     */
    @RequestMapping("/getDocument")
    public void getDocument() throws IOException {
        GetRequest getRequest = new GetRequest.Builder().index("staff").id("001").build();
        GetResponse<Staff> bookGetResponse = elasticsearchClient.get(getRequest, Staff.class);

        Staff staff = bookGetResponse.source();
        System.out.println("staff = " + staff);
        /**
         * staff = Staff(staffNo=1, name=员工001-王大力, age=22)
         */
    }

    /**
     * 分页获取文档
     */
    @RequestMapping("/getDocumentByPage")
    public void getDocumentByPage() throws IOException {
        SearchRequest searchRequest = new SearchRequest.Builder().index("staff").from(0).size(10).build();
        SearchResponse<Staff> bookSearchResponse = elasticsearchClient.search(searchRequest, Staff.class);

        List<Hit<Staff>> bookList = bookSearchResponse.hits().hits();
        bookList.forEach(item->System.out.println(item.source()));
    }

    /**
     * 更新文档
     */
    @RequestMapping("/updateDocument")
    public void updateDocument() throws IOException {
        Staff staff = new Staff();
        staff.setName("员工001-出奇迹");
        UpdateRequest<Staff, Staff> bookBookUpdateRequest = new UpdateRequest.Builder<Staff, Staff>().index("staff").id("001").doc(staff).build();
        UpdateResponse<Staff> personUpdateResponse = elasticsearchClient.update(bookBookUpdateRequest, Staff.class);
        // 执行结果: Updated
        System.out.println(personUpdateResponse.result());
    }


    //删除文档信息
    @RequestMapping("/deleteDocument")
    public void deleteDocument() throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest.Builder().index("staff").id("001").build();
        DeleteResponse delete = elasticsearchClient.delete(deleteRequest);
        //删除状态 Deleted
        System.out.println(delete.result());
        //DeleteResponse: {"_id":"001","_index":"staff","_primary_term":1,"result":"deleted","_seq_no":2,"_shards":{"failed":0.0,"successful":1.0,"total":2.0},"_type":"_doc","_version":3}
        System.out.println(delete.toString());
    }

}

3.3 聚合相关操作

3.3.1 创建实体类
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * Description:
 *
 * @author: YangGC
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class EntryRecord implements Serializable {

    /**
     * 索引字段应以业务字段为依据使用驼峰命名约定
     */
    private Long id;

//    @TableId
//    @JsonSerialize(using = ToStringSerializer.class)
    private Long member_be_present_id;

    private Long member_id;
    private Long club_id;
    private Long store_id;

    private String store_name;

    private String name;
    private String mobile;

    //进店方式,1 saas进店,2 人脸进店,3 手环进店(设备进店)
    private Integer arrival_mode;

    private String face_img;

    private Integer stu;
    /**
     * 进店时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime time;

    /**
     * 离店时间
     */

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime end_time;

    private Long card_id;
    private Long card_detail_id;
    private String card_name;

//    private String chipCode;
    private String remark;

    //入场会员类型  1 会员 2 员工
    private Integer member_type;
3.3.2 创建操作类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.LongTermsBucket;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import com.yanggc.pojo.doc.EntryRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;

/**
 * Description:
 *
 * @author: YangGC
 */
@Slf4j
@RestController
public class EntryRecordController {

    @Resource
    ElasticsearchClient client;

    /**
     * 找出时间最新的几条数据
     *
     * @param topNum
     * @return
     */
    @GetMapping("/entryRecordNewTopN")
    public void entryRecordNewTopN(@RequestParam("newTopNum") Integer topNum) throws IOException {
        SortOptions time = new SortOptions.Builder().field(f -> {
            f.field("time");
            f.order(SortOrder.Desc);
            return f;
        }).build();

        SearchResponse<EntryRecord> search = client.search(s ->
                s.index("entry_record_v1").size(topNum).sort(time)
                        .query(q -> q.range(r -> r.field("time").gte(JsonData.of("2022-07-01 00:00:00")).lt(JsonData.of("2022-08-01 00:00:00")))), EntryRecord.class);

        for (Hit<EntryRecord> hit : search.hits().hits()) {
            EntryRecord source = hit.source();
            System.out.println(source.toString());
        }
    }


    /**
     * dsl:
     * # topN 每个门店进店次数最多的3个人
     * GET /entry_record_v1/_search
     * {
     *     "query": {
     *     "range": {
     *       "time": {
     *         "gte": "2022-07-01 00:00:00",
     *         "lte": "2022-07-31 00:00:00"
     *       }
     *     }
     *   },
     *   "size": 0,
     *   "aggs": {
     *     "group_by_store": {
     *       "terms": {
     *         "field": "store_id",
     *         "size": 280
     *       },
     *       "aggs": {
     *         "group_by_member_id":{
     *           "terms": {
     *             "field": "member_id",
     *             "size": 100
     *           }
     *         }
     *       }
     *     }
     *   }
     * }
     */


    /**
     * 七月份 前200个门店每家门店进店次数最多的三个人
     *
     * @param topNum
     * @return
     */
    @GetMapping("/newTopNByStore")
    public void newTopNByStore(@RequestParam("newTopNum") Integer topNum) throws IOException {
        SearchResponse<Void> searchResponse = client.search(s -> s.index("entry_record_v1")
                .query(q -> q.range(r -> r.field("time").gte(JsonData.of("2022-07-01 00:00:00")).lt(JsonData.of("2022-08-01 00:00:00")))).size(0)
                .aggregations("group_store", a -> a.terms(ter -> ter.field("store_id").size(200))
                        .aggregations("count_store_member", group_by_member_id -> group_by_member_id.terms(ter -> ter.field("member_id").size(topNum)))), Void.class);

        List<LongTermsBucket> array = searchResponse.aggregations().get("group_store")._get()._toAggregate().lterms().buckets().array();
        for (LongTermsBucket longTermsBucket : array) {
            List<LongTermsBucket> countStoreMemberList = longTermsBucket.aggregations().get("count_store_member")._get()._toAggregate().lterms().buckets().array();
            countStoreMemberList.forEach(var->{
                System.out.println(var.key());
                System.out.println(var.docCount());
            });
        }
    }


    /**
     * 根据名称搜索会员
     *
     * @param name
     */
    @GetMapping("/searchMemberByName")
    public void searchMemberByName(@RequestParam("name") String name) throws IOException {
        Query query = MatchQuery.of(mq -> mq.field("name").query(name))._toQuery();
        SearchResponse<EntryRecord> searchResponse = client.search(s -> s.index("entry_record_v1").size(10).query(query), EntryRecord.class);
        for (Hit<EntryRecord> hit : searchResponse.hits().hits()) {
            System.out.println(hit.source());
        }
    }


}

参考资料 & 致谢

[1] Elasticsearch Clients
[2] Elasticsearch Clients - Aggregations 文章来源地址https://www.toymoban.com/news/detail-403029.html

到了这里,关于[Java Framework] [ELK] Spring 整合ES (ElasticSearch7.15.x +)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包