Elasticsearch Java API 的使用-更新索引(update & upset)与 Bulk的批量更新

这篇具有很好参考价值的文章主要介绍了Elasticsearch Java API 的使用-更新索引(update & upset)与 Bulk的批量更新。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java更新索引(update & upset)
update
更新使用UpdateRequest(update类型更新,只能更新)

public class EsUpdate{
    public void updateIndex(TransportClient client){
        Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                .parse("2016-7-21 00:00:01");

        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("pointdata")
				     .type("pointdata")
				     .id("1")
				     .doc(XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
                );
		//执行修改
       UpdateResponse response1 = client.update(updateRequest).get();

		//查询修改状态,返回ok表示成功
		System.out.println(response1.status());
    }
}

upset
要用IndexRequest设定添加文档,UpdateRequest设定更新文档,设定upset执行有则修改无则更新(upset类型更新,文档不存在时创建)

public class EsUpSet{
    public void updateIndex(TransportClient client){
         Date time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                .parse("2016-7-21 00:00:01");
		//添加文档
        IndexRequest request1 = new IndexRequest("pointdata", "pointData", "1")
						        .source(
								     XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
						        );
		
		//修改文档		        
        UpdateRequest updateRequest2 = new UpdateRequest();
        updateRequest.index("pointdata")
				     .type("pointdata")
				     .id("1")
				     .doc(XContentFactory.jsonBuilder()
							             .startObject()
							             .field("pointid","W3.UNIT1.10LBG01CP302")
						                 .field("pointvalue","0.8029")
										 .field("inputtime",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time))
							             .endObject()
                ).upset(request1);
       
		UpdateResponce responce = client.update(request2).get();
		
        //查询修改状态,返回ok表示成功
	    System.out.println(response2.status());
    }
}

基于Bulk的批量更新(update & upset)
动态的更新一个 documents 中的任意 field(字段),包括原来没有的 field (字段)。文章来源地址https://www.toymoban.com/news/detail-508644.html

public ResultMsg updateIndex(final String index,
			final JSONArray resultList) {
		final ResultMsg resultMsg = new ResultMsg();
		BulkRequestBuilder bulkRequest = client.prepareBulk();
		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);

		int count = 0;
		int len = resultList.size();
		try {
			for (int i = 0; i < len; i++) {
				final XContentBuilder builder = XContentFactory.jsonBuilder()
						.startObject();
				
				JSONObject json = resultList.getJSONObject(i);
				final String id = json.getString("id");
				json.remove("id");
				
				for (String key : json.keySet()) {
					builder.field(key, json.get(key));
				}

				builder.endObject();

				// update更新
				UpdateRequestBuilder requestBuilder = client.prepareUpdate(
						index, index, id).setDoc(builder);
				
				// upset更新
				UpdateRequestBuilder requestBuilder = client.prepareUpdate(
						index, index, id).setDoc(builder).setUpsert();
				// IndexRequestBuilder requestBuilder = client.prepareIndex(
				// "sampling", indexName, id).setSource(builder);
				bulkRequest.add(requestBuilder);
				count++;

				if (count % BULK_SIZE == 0) {
					BulkResponse bulkResponse = bulkRequest.execute()
							.actionGet();
					if (bulkResponse.hasFailures()) {
						LOGGER.error("批量更新索引数据失败: " + indexName);
						LOGGER.error("批量更新索引数据失败: "
								+ bulkResponse.buildFailureMessage());

						resultMsg.setHasFailures(true);

						BulkItemResponse[] responses = bulkResponse.getItems();
						for (int k = 0; k < responses.length; k++) {
							BulkItemResponse response = responses[k];
							if (response.isFailed()) {
								ErrorDetail errorDetail = new ErrorDetail();
								errorDetail.setIndex(k + (i / BULK_SIZE));
								errorDetail.setId(response.getId());
								errorDetail
										.setMsg(response.getFailureMessage());

								resultMsg.addError(errorDetail);
							}
						}
					}

					bulkRequest = client.prepareBulk();
					bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
					count = 0;
				}
			}

			if (count > 0) {
				BulkResponse bulkResponse = bulkRequest.execute().actionGet();
				if (bulkResponse.hasFailures()) {
					LOGGER.error("批量更新索引数据失败: " + indexName);
					LOGGER.error("批量更新索引数据失败: "
							+ bulkResponse.buildFailureMessage());

					resultMsg.setHasFailures(true);

					BulkItemResponse[] responses = bulkResponse.getItems();
					for (int k = 0; k < responses.length; k++) {
						BulkItemResponse response = responses[k];
						if (response.isFailed()) {
							ErrorDetail errorDetail = new ErrorDetail();
							errorDetail.setIndex(k + (len / BULK_SIZE));
							errorDetail.setId(response.getId());
							errorDetail.setMsg(response.getFailureMessage());

							resultMsg.addError(errorDetail);
						}
					}
				}
			}

			return resultMsg;
		} catch (Exception e) {
			LOGGER.error("批量更新索引数据失败: " + indexName);
			throw new RuntimeException("批量更新索引数据失败: " + indexName, e);
		} finally {
			bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
		}

	}

到了这里,关于Elasticsearch Java API 的使用-更新索引(update & upset)与 Bulk的批量更新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch - 批量更新bulk死锁问题排查 | 京东云技术团队

    一、问题系统介绍 监听商品变更MQ消息,查询商品最新的信息,调用BulkProcessor批量更新ES集群中的商品字段信息; 由于商品数据非常多,所以将商品数据存储到ES集群上,整个ES集群共划分了256个分片,并根据商品的三级类目ID进行分片路由。 比如一个SKU的商品名称发生变化,

    2024年02月12日
    浏览(40)
  • elasticsearch使用脚本 滚动关闭索引,更新index setting

         在旧的索引中更新mapping时,新增了分词器(分词器已经在模板中添加),但是在更新mapping时报错: 查看elasticsearch官网,发现不允许在已经存在的索引中动态更新分词器,只能先将索引close,更新分词器,然后再打开 Update index settings API | Elasticsearch Guide [8.3] | Elastic 2.1 由

    2024年02月08日
    浏览(51)
  • PHP 如何使用 Elasticsearch 的 索引 API 接口

    目录 一、实战场景 二、知识点 PHP Elasticsearch 索引 index MySQL 三、菜鸟实战 如何在 PHP 中使用 Elasticsearch 的索引 API 接口 PHP Elasticsearch 索引 index MySQL Elasticsearch 本质上是一个数据库,但并不是 MySQL 这种关系型数据库,查询语言也不是 SQL,而是 Elasticsearch 自己的一套查询语言。

    2024年02月13日
    浏览(52)
  • Elasticsearch 索引文档时create、index、update的区别【学习记录】

    本文基于elasticsearch7.3.0版本。 一、思维导图 elasticsearch中create、index、update都可以实现插入功能,但是实现原理并不相同。 二、验证index和create 由上面思维导图可以清晰的看出create、index的大致区别,下面我们来验证下思维导图中的场景: 1、首先明确一点:如何指定是creat

    2024年01月20日
    浏览(46)
  • ElasticSearch7.3学习(十六)----RestHighLevelClient Java api实现索引的创建、删除、是否存在、关闭、开启

    注意:导入的包区别,不同的包创建索引的方式不同。博主亲身实践,具体体现在createIndexRequest.mapping()里面。读者可自行试验。  由此可以猜想一下: 可以看到上述两种方式导入的包的子类名是相同的,但是具体对索引的操作方式可能是不同的。具体的区别博主暂时还不清

    2024年02月16日
    浏览(58)
  • Elasticsearch Document Update API详解、原理与示例

    private int retryOnConflict = 0:更新冲突时重试次数。 private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:刷新策略。NONE:代表不重试; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT:执行操作之前需要等待激活的副本数,已在《Elasticsearch Document Get API详解、原理与示例》中详

    2024年04月22日
    浏览(42)
  • Elasticsearch更新指定字段操作_update_by_query

    MYSQL语句:update index_name set name = ‘wb’ where id = ‘20132112534’; MYSQL语句:update index_name set name = ‘wb’ where (a_time - b_time = 100000) MYSQL语句:update index_name set sort_time = update_time where sort_time is null;

    2024年02月11日
    浏览(38)
  • elasticsearch索引操作,索引创建、索引更新、索引删除

    创建索引 更新索引,添加字段 注意更新索引时与创建索引大致一样,只是更新索引时候的url不同,需要在后面加一个 _mapping 路径,同时请求的json里面不需要 mappings 路径,只需要 properties 即可 更新索引,修改配置 同理在更新setting的时候和更新maping的时候一样 获取索引结构

    2024年02月11日
    浏览(46)
  • 使用kettle同步全量数据到Elasticsearch(es)--elasticsearch-bulk-insert-plugin应用

    为了前端更快地进行数据检索,需要将数据存储到es中是一个很不错的选择。由于公司etl主要工具是kettle,这里介绍如何基于kettle的elasticsearch-bulk-insert-plugin插件将数据导入es。在实施过程中会遇到一些坑,这里记录解决方案。 可能会遇到的报错: 1、No elasticSearch nodes found 2、

    2024年02月01日
    浏览(69)
  • es elasticsearch 新增更新索引,新增更新文档

    先新增索引 新增映射  或者上述两步和为一步(创建索引,及创建mapping) 只能增加原有不存在的字段 创建一个全新的索引,映射包含调整后的字段或类型 将原有索引的数据迁移到新的索引 删除原有索引 将新的索引的别名设置为原来索引相同名称 创建一个 重建文档(全量

    2024年02月11日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包