🌸简介
ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html
其中的Java Rest Client又包括两种:
- Java Low Level Rest Client
- Java High Level Rest Client
- 我这边以
Java High Level Rest Client
为例
🌻初始化RestClient
在elasticsearch提供的API中,与elasticsearch一切交互都封装在一个名为RestHighLevelClient的类中,必须先完成这个对象的初始化,建立与elasticsearch的连接。
分为三步:
1)引入es的RestHighLevelClient依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
2)因为SpringBoot默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
3)初始化RestHighLevelClient:
@Value("${es.url}")
private String esUrl;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(
RestClient.builder(HttpHost.create(esUrl))
.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒)
.setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)
}));
}
设置密码
@Value("${es.url}")
private String esUrl;
@Value("${es.password}")
private String password;
@Bean
public RestHighLevelClient restHighLevelClient() {
//es验证账号密码
final CredentialsProvider provider = new BasicCredentialsProvider();
//填写账号密码
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", password));
return new RestHighLevelClient(
RestClient.builder(HttpHost.create(esUrl))
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(provider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
return requestConfigBuilder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒)
.setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)
}));
}
🌻创建索引库
mapping是对索引库中文档的约束,常见的mapping属性包括:
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float、
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
代码分为三步:
- 创建Request对象。因为是创建索引库的操作,因此Request是CreateIndexRequest。
- 添加请求参数,其实就是DSL的JSON参数部分。因为json字符串很长,这里是定义了静态字符串常量> > MAPPING_TEMPLATE,让代码看起来更加优雅。
- 发送请求,client.indices()方法的返回值是IndicesClient类型,封装了所有与索引库操作有关的方法。
public static final String MAPPING_TEMPLATE = "{\n" +
" \"mappings\":{\n" +
" \"properties\":{\n" +
" \"msgId\":{\n" +
" \"type\":\"keyword\"\n" +
" },\n" +
" \"money\":{\n" +
" \"type\":\"double\"\n" +
" },\n" +
" \"type\":{\n" +
" \"type\":\"integer\"\n" +
" },\n" +
" \"fromUserId\":{\n" +
" \"type\":\"integer\",\n" +
" \"index\":\"true\"\n" +
" },\n" +
" \"convType\":{\n" +
" \"type\":\"integer\",\n" +
" \"index\":\"true\"\n" +
" },\n" +
" \"convId\":{\n" +
" \"type\":\"integer\",\n" +
" \"index\":\"true\"\n" +
" },\n" +
" \"msgType\":{\n" +
" \"type\":\"integer\"\n" +
" },\n" +
" \"msg_body\":{\n" +
" \"type\":\"keyword\"\n" +
" },\n" +
" \"msgTime\":{\n" +
" \"type\":\"keyword\"\n" +
" },\n" +
" \"subMsgType\":{\n" +
" \"type\":\"keyword\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}" ;
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.123.114:9200")
));
}
@AfterEach
void tearDown() throws IOException {
this.client.close();
}
@Test
public void createMessageIndex() throws IOException {
CreateIndexRequest request = new CreateIndexRequest("message");
// 2.准备请求的参数:DSL语句
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3.发送请求
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
log.info(GsonUtil.toJson(createIndexResponse));
}
🌻删除索引库
@Test
void testDeleteMessageIndex() throws IOException {
// 1.创建Request对象
DeleteIndexRequest request = new DeleteIndexRequest("message");
// 2.发送请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
🌻判断索引库是否存在
@Test
void testExistsMessageIndex() throws IOException {
// 1.创建Request对象
GetIndexRequest request = new GetIndexRequest("message");
// 2.发送请求
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 3.输出
System.err.println(exists ? "索引库已经存在!" : "索引库不存在!");
}
🌻小结
JavaRestClient操作elasticsearch的流程基本类似。核心是client.indices()方法来获取索引库的操作对象。
索引库操作的基本步骤:
- 初始化RestHighLevelClient
- 创建XxxIndexRequest。XXX是Create、Get、Delete
- 准备DSL( Create时需要,其它是无参)
- 发送请求。调用RestHighLevelClient#indices().xxx()方法,xxx是create、exists、delete
🌻RestClient操作文档
🌻增加文档数据
@Test
public void testIndexDocument() throws IOException {
String json ="{\n" +
" \"id\":1,\n" +
" \"deleted\":0,\n" +
" \"fromUserId\":100,\n" +
" \"convType\":0,\n" +
" \"convId\":130,\n" +
" \"msgType\":1,\n" +
" \"msgBody\":\"我是王五\",\n" +
" \"msgId\":\"9421317434423236568961360\",\n" +
" \"payload\":\"\",\n" +
" \"msgTime\":\"1\",\n" +
" \"sendResult\":0,\n" +
" \"subMsgType\":0,\n" +
" \"money\":200.1,\n" +
" \"type\":1\n" +
"}";
IndexRequest request = new IndexRequest("message").id("3");
// 2.准备请求的参数:DSL语句
request.source(json, XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
}
🌻 查询文档
🌻根据Id查询
@Test
void testGetDocumentById() throws IOException {
// 1.准备Request
GetRequest request = new GetRequest("message", "3");
// 2.发送请求,得到响应
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3.解析响应结果
String json = response.getSourceAsString();
}
🌻根据多个Id查询
public PageResult<XtUserMsgRecord> getListByIds(String... ids) {
if (null == ids || ids.length == 0) {
return new PageResult<>();
}
SearchRequest request = new SearchRequest(INDEX_NAME);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.idsQuery().addIds(ids));
request.source().query(boolQuery);
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return handleResponse(search);
} catch (IOException e) {
log.error("es:list error", e);
}
return new PageResult<>();
}
🌻分页查询文档
@Data
public class PageResult<T> {
private int size = 0;
private int page = 10;
private Long total;
private List<T> list;
private T queryCondition;
public PageResult() {
}
public PageResult(Long total, List<T> list) {
this.total = total;
this.list = list;
}
}
public PageResult<?> list(PageResult<?> pageResult) {
SearchRequest request = new SearchRequest(INDEX_NAME);
// 2.准备请求参数
// 2.2.分页
buildQuery(pageResult.getQueryCondition(), request);
int page = pageResult.getPage();
int size = pageResult.getSize();
request.source().from((page - 1) * size).size(size);
// 排序 注意这里最好是keyword类型字段
request.source().sort(SortBuilders.fieldSort("?").order(SortOrder.DESC));
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return handleResponse(pageResult, search);
} catch (IOException e) {
log.error("es:list error", e);
}
return pageResult;
}
/**
* 处理分页结果
* @param pageResult
* @param response
* @return
*/
private PageResult<?> handleResponse(PageResult<?> pageResult, SearchResponse response) {
SearchHits searchHits = response.getHits();
long total = searchHits.getTotalHits().value;
SearchHit[] hits = searchHits.getHits();
List<?> list = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
? item = JSON.parseObject(json, ?.class);
list.add(item);
}
pageResult.setTotal(total);
pageResult.setList(list);
return pageResult;
}
/**
* 过滤查询条件
* @param params
* @param request
*/
private void buildQuery(? params, SearchRequest request) {
if (null != params) {
// 1.准备Boolean查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
Integer ? = params.getFromUserId();
if (null != ?) {
boolQuery.filter(QueryBuilders.termQuery("?", ?));
}
// 3.设置查询条件
request.source().query(boolQuery);
}
}
这里的代码只是给示例,具体传值由个人替换
🌻删除文档
@Test
void testDeleteDocument() throws IOException {
// 1.准备Request
DeleteRequest request = new DeleteRequest("message", "3");
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
}
🌻修改文档
有修改两种方式:
- 全量修改:本质是先根据id删除,再新增
- 增量修改:修改文档中的指定字段值
在RestClient的API中,全量修改与新增的API完全一致,判断依据是ID:
- 如果新增时,ID已经存在,则修改
- 如果新增时,ID不存在,则新增
这里不再赘述,我们主要关注增量修改。
与之前类似,也是三步走:文章来源:https://www.toymoban.com/news/detail-777600.html
- 1)准备Request对象。这次是修改,所以是UpdateRequest
- 2)准备参数。也就是JSON文档,里面包含要修改的字段
- 3)更新文档。这里调用client.update()方法
@Test
void testUpdateDocument() throws IOException {
// 1.准备Request
UpdateRequest request = new UpdateRequest("message", "3");
// 2.准备请求参数
request.doc(
"fromUserId", "11",
"msgBody", "土豆土豆,我是地瓜"
);
// 3.发送请求
client.update(request, RequestOptions.DEFAULT);
}
🌻 批量导入文档
@Test
void testBulkRequest() throws IOException {
// 批量查询酒店数据 修改为自己的
List<Message> messages= messageService.list();
// 1.创建Request
BulkRequest request = new BulkRequest();
// 2.准备参数,添加多个新增的Request
for (Message message: messages) {
// 2.2.创建新增文档的Request对象
request.add(new IndexRequest("message")
.id(message.getId().toString())
.source(JSON.toJSONString(message), XContentType.JSON));
}
// 3.发送请求
client.bulk(request, RequestOptions.DEFAULT);
}
🌻 小结
文档操作的基本步骤:文章来源地址https://www.toymoban.com/news/detail-777600.html
- 初始化RestHighLevelClient
- 创建XxxRequest。XXX是Index、Get、Update、Delete、Bulk
- 准备参数(Index、Update、Bulk时需要)
- 发送请求。调用RestHighLevelClient#.xxx()方法,xxx是index、get、update、delete、bulk
- 解析结果(Get时需要)
🌻 最后简单把方法封装
/**
* @Author: hrd
* @CreateTime: 2023/11/27 15:08
* @Description:
*/
public interface IESIndex<R, P extends BaseModel> {
/**
* @param p
* @param id 唯一ID
*/
IndexResponse saveOrUpdateById(P p, String id);
R list(R r);
R getListByIds(String... ids);
DeleteResponse deleteById(String id);
BulkByScrollResponse deleteByQuery(QueryBuilder queryBuilder);
@NotNull
default String buildOrder() {
return "createTime";
}
default QueryBuilder getQueryBuilder(P p) {
return null;
}
}
/**
* @Author: hrd
* @CreateTime: 2023/5/22 14:03
* @Description:
*/
@EqualsAndHashCode(callSuper = true)
@Getter
@Setter
public class BaseModel extends BasePageModel implements Serializable, ToMap {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 0:正常 1:已删除
*/
@TableField("deleted")
@TableLogic
private Boolean deleted;
/**
* 创建时间
*/
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@TableField(value = "create_time", fill = FieldFill.INSERT)
private Date createTime = new Date();
/**
* 修改时间
*/
@TableField(value = "modify_time", fill = FieldFill.INSERT_UPDATE)
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date modifyTime = new Date();
}
/**
* @Author: hrd
* @CreateTime: 2023/11/30 13:57
* @Description:
*/
@Slf4j
@Component
@RequiredArgsConstructor
public abstract class ESIndexServer<T extends BaseModel> implements IESIndex<PageResult<T>, T> {
private final RestHighLevelClient restHighLevelClient;
protected abstract String getIndexName();
@Override
public IndexResponse saveOrUpdateById(T p, String id) {
if (null == p || null == id || "".equals(id)) {
return null;
}
IndexRequest request = new IndexRequest(this.getIndexName()).id(id);
// 2.准备参数
request.source(JSON.toJSONString(p), XContentType.JSON);
try {
IndexResponse index = restHighLevelClient.index(request, RequestOptions.DEFAULT);
log.info("es:{}", index);
return index;
} catch (IOException e) {
log.error("es:error", e);
}
return null;
}
@SuppressWarnings("unchecked")
public Class<T> getTypeClass() {
Type type = getClass().getGenericSuperclass();
if (!(type instanceof ParameterizedType)) {
throw new IllegalStateException("Type must be a parameterized type");
}
ParameterizedType parameterizedType = (ParameterizedType) type;
// 获取泛型的具体类型 这里是单泛型
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
if (null == actualTypeArguments || actualTypeArguments.length < 1) {
throw new IllegalStateException("Number of type arguments must be 1");
}
return (Class<T>) actualTypeArguments[0];
}
@Override
public PageResult<T> list(PageResult<T> pageResult) {
SearchRequest request = new SearchRequest(this.getIndexName());
// 2.准备请求参数
// 2.2.分页
buildQuery(pageResult.getQueryCondition(), request);
int page = pageResult.getPage();
int size = pageResult.getSize();
request.source().from((page - 1) * size).size(size);
String orderField = buildOrder();
if (StringUtils.isEmpty(orderField)) {
request.source().sort(SortBuilders.fieldSort(orderField).order(SortOrder.DESC));
}
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return handleResponse(pageResult, search);
} catch (IOException e) {
log.error("es:list error", e);
}
return pageResult;
}
@Override
public PageResult<T> getListByIds(String... ids) {
if (null == ids || ids.length == 0) {
return new PageResult<>();
}
SearchRequest request = new SearchRequest(this.getIndexName());
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.idsQuery().addIds(ids));
request.source().query(boolQuery);
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
return handleResponse(search);
} catch (IOException e) {
log.error("es:list error", e);
}
return new PageResult<>();
}
@Override
public DeleteResponse deleteById(String id) {
DeleteRequest request = new DeleteRequest(this.getIndexName(), id);
// 2.发送请求
try {
return restHighLevelClient.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public BulkByScrollResponse deleteByQuery(QueryBuilder queryBuilder) {
DeleteByQueryRequest request = new DeleteByQueryRequest(this.getIndexName());
request.setQuery(queryBuilder);
// 2.发送请求
try {
return restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 处理分页结果
*
* @param pageResult
* @param response
* @return
*/
protected PageResult<T> handleResponse(PageResult<T> pageResult, SearchResponse response) {
SearchHits searchHits = response.getHits();
long total = searchHits.getTotalHits().value;
SearchHit[] hits = searchHits.getHits();
List<T> xtUserMsgRecordList = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
xtUserMsgRecordList.add(JSON.parseObject(json, this.getTypeClass()));
}
pageResult.setTotal(total);
pageResult.setList(xtUserMsgRecordList);
pageResult.setQueryCondition(null);
return pageResult;
}
protected PageResult<T> handleResponse(SearchResponse response) {
return this.handleResponse(new PageResult<>(), response);
}
/**
* 过滤条件
*
* @param params
* @param request
*/
protected void buildQuery(T params, SearchRequest request) {
String[] filterQuery = getFilterQuery();
if (null != params) {
// 1.准备Boolean查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
Map<String, String> map = params.toMap();
for (String key : filterQuery) {
String value = map.get(key);
if (key != null && value != null) {
boolQuery.filter(QueryBuilders.termQuery(key, value));
}
}
QueryBuilder queryBuilder = getQueryBuilder(params);
if (null != queryBuilder) {
boolQuery.filter(queryBuilder);
}
// 3.设置查询条件
request.source().query(boolQuery);
}
}
protected abstract String[] getFilterQuery();
}
/**
* @Author: hrd
* @CreateTime: 2023/11/27 15:06
* @Description:
*/
@Slf4j
@Component
public class ESMsgIndexService extends ESIndexServer<XtUserMsgRecord> {
private static final String INDEX_NAME = "你的index名称";
public ESMsgIndexService(RestHighLevelClient restHighLevelClient) {
super(restHighLevelClient);
}
@Override
protected String getIndexName() {
return INDEX_NAME;
}
@Override
public String buildOrder() {
return "msgTime";
}
@Override
protected String[] getFilterQuery() {
return new String[]{"fromUserId", "convId", "msgId", "type"};
}
}
到了这里,关于Springboot引入elasticsearch-rest-high-level-client的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!