耐心看,多看代码中写的注释,keyid是文档中的_id
目录
耐心看,多看代码中写的注释,keyid是文档中的_id
1.导入包
2.插入格式
3.插入类
--------附录(新增,删除,更新,插入等)
1.导入包
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.9.3</elasticsearch.version>
</properties>
<!-- elasticsearch相关 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
2.插入格式
将对象转换为map文章来源:https://www.toymoban.com/news/detail-720722.html
public static <T> Map<String, Object> beanToMap(T bean) {
Map<String, Object> map = Maps.newHashMap();
if (bean != null) {
BeanMap beanMap = BeanMap.create(bean);
for (Object key : beanMap.keySet()) {
map.put(key.toString(), beanMap.get(key));
}
}
return map;
}
创建map文章来源地址https://www.toymoban.com/news/detail-720722.html
try {
//下面这行是你要插入es的数据,根据自己的数据来
List<DyingVideoDO> allVideoList = videoMapper.findAllVideoList();
List<Map<String, Object>> mapList = new ArrayList<>();
for (DyingVideoDO row:allVideoList){
Map<String, Object> videoMap = BeanUtil.beanToMap(row);
mapList.add(videoMap);
}
//调用插入方法
syncVideoEsService.createData(mapList);
} catch (Exception e) {
//可有可无
log.error("sync video data error :{}", e);
return 0;
}
/**
* 创建文档
* @param list list
*/
//索引名称
private final String index = "t_dying_video";
private final String alias = "t_dying_video";
@Autowired
private ElasticsearchService elasticsearchService;
public void createData(List<Map<String, Object>> list) throws Exception {
try {
if (!CollectionUtil.isEmpty(list)) {
List<String> idList = list.stream().map(it -> String.valueOf(it.get("id"))).collect(Collectors.toList());
List<Map<String,Object>> jointVideoArray = new ArrayList<Map<String,Object>>();
list.stream().forEach(e -> {
Map<String, Object> obj = new HashMap<String, Object>() {{
put("keyid", e.get("id"));
put("title", e.get("title"));//标题
put("video_url", e.get("video_url"));
put("pic_url",e.get("pic_url"));
put("small_pic_url",e.get("small_pic_url"));
put("media_content",e.get("media_content"));
put("update_content",e.get("update_content"));
put("publish_time",e.get("publish_time"));
put("label",e.get("label"));
put("title_desc",e.get("title_desc"));//由update_content清洗
put("voice_content",e.get("voice_content"));//由media_content清洗
}};
jointVideoArray.add(obj);
});
//调用下面插入类中的方法
//elasticsearchService.batchDeleteRequest(alias, idList);
//elasticsearchService.batchDeleteRequest(jointAlias, idList);
elasticsearchService.batchInsertRequest(alias, jointVideoArray);
}
} catch (Exception e) {
log.error("插入数据到ES异常",e);
throw e;
}
}
3.插入类
/**
* 批量新增文档
*/
public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
BulkRequest request = new BulkRequest();
for (int i = 0; i < list.size(); i++) {
Map<String, Object> item = list.get(i);
request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
}
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulk.status().getStatus() == 200) {
if (!bulk.hasFailures()) {
return true;
}
log.error("批量创建索引{}文档失败", index);
}
return false;
}
--------附录(新增,删除,更新,插入等)
package com.dengtacj.synces.service;
import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Elasticsearch服务
*
*/
@Slf4j
@Service
public class ElasticsearchService {
@Autowired
public RestHighLevelClient restHighLevelClient;
protected static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// 默认缓冲限制为100MB,此处修改为30MB。
builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
/**
* 创建索引
*
* @param index 索引名称
* @return 是否成功
*/
public boolean createIndexRequest(String index) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0));
try {
CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, COMMON_OPTIONS);
log.info("所有节点确认响应 : {}", response.isAcknowledged());
log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
log.info("创建索引【{}】成功", index);
return true;
} catch (IOException e) {
log.error("创建索引库【{}】失败", index, e);
}
return false;
}
/**
* 创建索引
*
* @param index 索引名称
* @param mapping 索引结构
* @return 是否成功
*/
public boolean createIndexRequest(String index, String mapping) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
try {
createIndexRequest.settings(Settings.builder()
// 分片数
.put("index.number_of_shards", 3)
// 副本数
.put("index.number_of_replicas", 0)
// 默认分词器
// .put("analysis.analyzer.default.tokenizer", "index_ansj")
);
createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("所有节点确认响应 : {}", response.isAcknowledged());
log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
log.info("创建索引【{}】成功", index);
return true;
} catch (Exception e) {
log.error("创建索引库【{}】失败", index, e);
}
return false;
}
/**
* 创建索引
*
* @param index 索引名称
* @param settings 索引设置
* @param mapping 索引结构
* @return 是否成功
*/
public boolean createIndexRequest(String index, String settings, String mapping) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
try {
createIndexRequest.settings(settings, XContentType.JSON);
createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("所有节点确认响应 : {}", response.isAcknowledged());
log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
log.info("创建索引【{}】成功", index);
return true;
} catch (Exception e) {
log.error("创建索引库【{}】失败", index, e);
}
return false;
}
/**
* 创建索引
*
* @param index 索引名称
* @param settings 索引设置
* @param mapping 索引结构
* @param alias 索引别名
* @return 是否成功
*/
public boolean createIndexRequest(String index, String settings, String mapping, String alias) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
try {
createIndexRequest.settings(settings, XContentType.JSON);
createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
createIndexRequest.alias(new Alias(alias));
CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("所有节点确认响应 : {}", response.isAcknowledged());
log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
log.info("创建索引【{}】成功", index);
return true;
} catch (Exception e) {
log.error("创建索引库【{}】失败", index, e);
}
return false;
}
/**
* 删除索引
*
* @param index 索引名称
*/
public boolean deleteIndexRequest(String index) {
try {
boolean exists = restHighLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
if (exists) {
AcknowledgedResponse response = restHighLevelClient.indices().delete(new DeleteIndexRequest(index), COMMON_OPTIONS);
// 判断是否确认响应
if (response.isAcknowledged()) {
return true;
}
} else {
log.info("索引【{}】不存在", index);
return true;
}
} catch (IOException e) {
log.error("删除索引库【{}】失败", index, e);
}
return false;
}
/**
* 新增文档
*/
public boolean insertRequest(String index, String id, Object object) {
IndexRequest indexRequest = new IndexRequest(index).id(id).source(BeanUtil.beanToMap(object), XContentType.JSON);
try {
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
if (indexResponse.status().getStatus() == 200) {
log.info("创建索引{}文档成功", index);
return true;
}
} catch (IOException e) {
log.error("创建索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
}
return false;
}
/**
* 新增文档
*/
public boolean insertRequest(String index, Map<String, Object> item) {
IndexRequest indexRequest = new IndexRequest(index).id((String) item.get("id")).source(item, XContentType.JSON);
try {
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
if (indexResponse.status().getStatus() == 200) {
log.info("创建索引{}文档成功", index);
return true;
}
} catch (IOException e) {
log.error("创建索引文档 {" + index + "} 数据 {" + item + "} 失败", e);
}
return false;
}
/**
* 批量新增文档
*/
public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
BulkRequest request = new BulkRequest();
for (int i = 0; i < list.size(); i++) {
Map<String, Object> item = list.get(i);
request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
}
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulk.status().getStatus() == 200) {
if (!bulk.hasFailures()) {
return true;
}
log.error("批量创建索引{}文档失败", index);
}
return false;
}
/**
* 修改文档
*/
public void updateRequest(String index, String id, Object object) {
UpdateRequest updateRequest = new UpdateRequest(index, id);
updateRequest.doc(BeanUtil.beanToMap(object), XContentType.JSON);
try {
restHighLevelClient.update(updateRequest, COMMON_OPTIONS);
} catch (IOException e) {
log.error("更新索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
}
}
/**
* 批量修改文档
*/
public boolean batchUpdateRequest(String index, List<Map<String, Object>> list) throws Exception {
BulkRequest request = new BulkRequest();
for (int i = 0; i < list.size(); i++) {
Map<String, Object> item = list.get(i);
request.add(new UpdateRequest(index, (String) item.get("keyid")).doc(item, XContentType.JSON));
}
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulk.status().getStatus() == 200) {
if (!bulk.hasFailures()) {
log.info("批量修改索引{}文档成功", index);
return true;
}
log.error("批量修改索引{}文档失败,失败原因:{}", index, bulk.buildFailureMessage());
}
return false;
}
/**
* 删除文档
*/
public void deleteRequest(String index, String id) {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
try {
restHighLevelClient.delete(deleteRequest, COMMON_OPTIONS);
} catch (IOException e) {
log.error("删除索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
}
}
/**
* 批量删除文档
*/
public boolean batchDeleteRequest(String index, List<String> list) throws Exception {
BulkRequest request = new BulkRequest();
for (int i = 0; i < list.size(); i++) {
request.add(new DeleteRequest(index, list.get(i)));
}
BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulk.status().getStatus() == 200) {
if (!bulk.hasFailures()) {
log.info("批量删除索引{}文档成功 count:{}", index,list.size());
return true;
}
log.error("批量删除索引{}文档成功,失败原因:{}", index, bulk.buildFailureMessage());
}
return false;
}
public boolean cleanIndex(String index){
try {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
deleteRequest.setRefresh(true);
BulkByScrollResponse response = restHighLevelClient.deleteByQuery(deleteRequest,RequestOptions.DEFAULT);
return true;
}catch (IOException e){
log.error("clean index :{} error : {}",index,e);
return false;
}
}
public boolean refreshIndex(String... index){
try {
RefreshRequest refreshRequest = new RefreshRequest();
refreshRequest.indices(index);
RefreshResponse response = restHighLevelClient.indices().refresh(refreshRequest,RequestOptions.DEFAULT);
log.info("refresh {} response status : {}",index,response.getStatus());
return true;
}catch (IOException e){
log.error("refresh index :{} error : {}",index,e);
return false;
}
}
}
创建索引的调用方法,mapping和seeting只是读取你的索引配置文件 比如我的文件是:
{ "settings": { "number_of_shards": 5, "number_of_replicas": 1, "index": { "analysis.analyzer.default.type" : "ik_max_word"} } }public boolean createIndex() { String mapping = null; String settings = null; try { mapping = FileUtil.readString(ResourceUtils.getFile("classpath:index/mapping/t_dying_video.json"), "UTF-8"); settings = FileUtil.readString(ResourceUtils.getFile("classpath:index/settings/t_dying_video.json"), "UTF-8"); if (StrUtil.isNotBlank(mapping) && StrUtil.isNotBlank(settings)) { return elasticsearchService.createIndexRequest(index, settings, mapping, alias); } } catch (Exception e) { log.error("读取索引【{}】的Mapping文件失败", index, e); } return false; }
到了这里,关于JAVA elasticsearch批量插入的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!