学习笔记-elstaciElasticSearch7.17官方文档

这篇具有很好参考价值的文章主要介绍了学习笔记-elstaciElasticSearch7.17官方文档。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

ElasticSearch

介绍(Introduction)

特征

  • 适用于所有 Elasticsearch API 的强类型请求和响应。
  • 所有 API 的阻塞和异步版本。
  • 在创建复杂的嵌套结构时,使用流畅的构建器和功能模式允许编写简洁但可读的代码。
  • 通过使用对象映射器(例如 Jackson 或任何 JSON-B 实现)无缝集成应用程序类。
  • 将协议处理委托给一个 http 客户端,例如Java 低级 REST 客户端 ,它负责处理所有传输级别的问题:HTTP 连接池、重试、节点发现等。

服务器兼容策略

Elasticsearch Java 客户端是向前兼容的;这意味着客户端支持与更大或相等的次要版本的 Elasticsearch 进行通信。Elasticsearch 语言客户端仅向后兼容默认发行版,并且不作任何保证。

入门

安装

安装要求
  • 至少Java8以上的版本
  • 需要一个 JSON 对象映射库,允许您的应用程序类与 Elasticsearch API 无缝集成。
maven
    <dependency>
      <groupId>co.elastic.clients</groupId>
      <artifactId>elasticsearch-java</artifactId>
      <version>7.17.9</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.12.3</version>
    </dependency>

运行时可能会报异常 ClassNotFoundException: jakarta.json.spi.JsonProvider

则需要加入以下依赖

<dependency>
      <groupId>jakarta.json</groupId>
      <artifactId>jakarta.json-api</artifactId>
      <version>2.0.1</version>
    </dependency>

为什么会出现这个问题?

SpringBoot框架带有的maven插件,用于简化开发和依赖管理,这些插件中带有很多知名的库。

1.x的库里面使用了 javax.json的包,2.x里面用的jakarta.json的包,(从 JavaEE 过渡到 JakartaEE),所以导致了 ClassNotFoundException: jakarta.json.spi.JsonProvider

我们只需要添加正确的项目依赖就可以了

连接中(connecting)

Java API 客户端围绕三个主要组件构建

  • API 客户端类。这些为 Elasticsearch API 提供强类型数据结构和方法。由于 Elasticsearch API 很大,它以功能组(也称为“命名空间”)为结构,每个功能组都有自己的客户端类。Elasticsearch 核心功能在类中实现ElasticsearchClient
  • 一个 JSON 对象映射器。这会将您的应用程序类映射到 JSON,并将它们与 API 客户端无缝集成。
  • 传输层实现。这是所有 HTTP 请求处理发生的地方。

以下代码完成了上面的三个组件

// Create the low-level client
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200)).build();

// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
    restClient, new JacksonJsonpMapper());

// And create the API client
ElasticsearchClient client = new ElasticsearchClient(transport);

身份验证由Java Low Level REST Client管理。有关配置身份验证的更多详细信息,请参阅 其文档。

第一个 ElasticSearch 请求

需求:查询es数据库中,name = ‘bike’ 的所有Product对象

SearchResponse<Product> search = client.search(s -> s
    .index("products")
    .query(q -> q
        .term(t -> t
            .field("name")
            .value(v -> v.stringValue("bicycle"))
        )),
    Product.class);

for (Hit<Product> hit: search.hits().hits()) {
    processProduct(hit.source());
}

从RestHighLevelClient 迁移

个人理解

大概意思就是,7.15以前的版本用的是RestHighLevelClient ,其以后用的是Elasticsearch Java API Client ,这两种客户端可以并存,你可以同时使用,并把原来RestHighLevelClient 里面的一些废弃的接口、low的写法逐步过渡到Elasticsearch Java API Client 。

官方文档

弃用 RestHighLevelClient 服务端,使用 Elasticsearch Java API Client 客户端。

两个客户端库可以在没有操作开销的情况下共存于单个应用程序中,因为 Elasticsearch Java API Client 客户端 是一个全新的客户端,独立于 Elasticsearch 服务器。

兼容模式

HLRC(RestHighLevelClient )可以启用兼容模式,让HLRC版本的7.17和Elasticsearch 一起使用,在这种模式下,

Java API 客户端不需要此设置,因为兼容模式始终处于启用状态。

将相同的 http 客户端与 HLRC 和 Java API 客户端一起使用

为了避免在应用程序同时使用 HLRC 和新的 Java API 客户端的过渡阶段产生任何操作开销,两个客户端可以共享相同的 Low Level Rest Client,这是管理所有连接、循环策略的网络层,节点嗅探等。

下面的代码显示了如何使用相同的 HTTP 客户端初始化两个客户端:

// Create the low-level client
RestClient httpClient = RestClient.builder(
    new HttpHost("localhost", 9200)
).build();

// Create the HLRC
RestHighLevelClient hlrc = new RestHighLevelClientBuilder(httpClient)
    // Enables compatibility mode that allows HLRC 7.17 to work with Elasticsearch 8.x.
    .setApiCompatibilityMode(true) 
    .build();

// Create the Java API Client with the same low level client
ElasticsearchTransport transport = new RestClientTransport(
    httpClient,
    new JacksonJsonpMapper()
);

ElasticsearchClient esClient = new ElasticsearchClient(transport);

// hlrc and esClient share the same httpClient
转型策略

您可以通过多种不同的方式在应用程序代码中开始从 HLRC 过渡。

例如:

  • 保持现有代码不变,并使用新的 Java API 客户端实现应用程序中的新功能,然后迁移现有代码,
  • 重写应用程序中新的 Java API 客户端比 HLRC 客户端更容易使用的部分,例如与搜索相关的所有内容,
  • 通过利用新的 Java API 客户端与 JSON 对象映射器的紧密集成,重写那些需要将应用程序对象映射到 JSON 或从 JSON 映射到 JSON 的部分。

Api约定

包结构和命名空间客户端

大概是说Java api和ElasticSearch Api 的分组,都是以包的结构和命名空间的方式

// Create the "products" index
ElasticsearchClient client = ...
client.indices().create(c -> c.index("products"));

tips:如果数据量较大,可能会按月创建索引,比如 xx_2023_02,xx_2023_3 … 等,你往es上传数据的时候,会设置一个indexName,如果现在的es中没有这个索引,则创建索引并保存数据。

方法命名约定

  • 作为 API 一部分的方法和属性,例如 ElasticsearchClient.search()SearchResponse.maxScore()。它们是使用标准 Java 约定从 Elasticsearch JSON API 中各自的名称派生而来的 camelCaseNaming
  • 作为构建 Java API 客户端的框架的一部分的方法和属性,例如Query._kind(). 这些方法和属性以下划线为前缀,以避免与 API 名称发生任何命名冲突,并作为区分 API 和框架的简单方法。

阻塞和异步 client

API 客户端有两种形式:阻塞式和异步式。异步客户端上的所有方法都返回一个标准的CompletableFuture.

ElasticsearchTransport transport = ...

// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);

if (client.exists(b -> b.index("products").id("foo")).value()) {
    logger.info("product exists");
}

// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
    new ElasticsearchAsyncClient(transport);

asyncClient
    .exists(b -> b.index("products").id("foo"))
    .whenComplete((response, exception) -> {
        if (exception != null) {
            logger.error("Failed to index", exception);
        } else {
            logger.info("Product exists");
        }
    });

切记,异步时一定要对异常情况进行处理!!!

构建Api对象

建造者对象(Builder Project)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(
    new CreateIndexRequest.Builder()
        .index("my-index")
        .aliases("foo",
            new Alias.Builder().isWriteIndex(true).build()
        )
        .build()
);

在调用构建器的方法后,不能重用构建器。

构建器lambda表达式(感觉比上面那个清爽)
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
    .create(createIndexBuilder -> createIndexBuilder
        .index("my-index")
        .aliases("foo", aliasBuilder -> aliasBuilder
            .isWriteIndex(true)
        )
    );

比上面那个更清爽,lambda表达式的入参要简洁明了,不需要和fori循环一样望文生义。

ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices()
    .create(c -> c
        .index("my-index")
        .aliases("foo", a -> a
            .isWriteIndex(true)
        )
    );

Builder 的 lambda 在复杂嵌套查询下的应用.

刚入门,看不懂下面这个查询啥意思

ElasticsearchClient client = ...
SearchResponse<SomeApplicationData> results = client
    .search(b0 -> b0
        .query(b1 -> b1
            .intervals(b2 -> b2
                .field("my_text")
                .allOf(b3 -> b3
                    .ordered(true)
                    .intervals(b4 -> b4
                        .match(b5 -> b5
                            .query("my favorite food")
                            .maxGaps(0)
                            .ordered(true)
                        )
                    )
                    .intervals(b4 -> b4
                        .anyOf(b5 -> b5
                            .intervals(b6 -> b6
                                .match(b7 -> b7
                                    .query("hot water")
                                )
                            )
                            .intervals(b6 -> b6
                                .match(b7 -> b7
                                    .query("cold porridge")
                                )
                            )
                        )
                    )
                )
            )
        ),
    // 搜索结果将映射到SomeApplicationData实例,以便应用程序随时可用
    SomeApplicationData.class 
);

List And Map(两种数据类型)

添加的 builder setters
// Prepare a list of index names
List<String> names = Arrays.asList("idx-a", "idx-b", "idx-c");

// Prepare cardinality aggregations for fields "foo" and "bar"
Map<String, Aggregation> cardinalities = new HashMap<>();
cardinalities.put("foo-count", Aggregation.of(a -> a.cardinality(c -> c.field("foo"))));
cardinalities.put("bar-count", Aggregation.of(a -> a.cardinality(c -> c.field("bar"))));

// Prepare an aggregation that computes the average of the "size" field
final Aggregation avgSize = Aggregation.of(a -> a.avg(v -> v.field("size")));

SearchRequest search = SearchRequest.of(r -> r
    // Index list:
    // - add all elements of a list
    .index(names)
    // - add a single element
    .index("idx-d")
    // - add a vararg list of elements
    .index("idx-e", "idx-f", "idx-g")

    // Sort order list: add elements defined by builder lambdas
    .sort(s -> s.field(f -> f.field("foo").order(SortOrder.Asc)))
    .sort(s -> s.field(f -> f.field("bar").order(SortOrder.Desc)))

    // Aggregation map:
    // - add all entries of an existing map
    .aggregations(cardinalities)
    // - add a key/value entry
    .aggregations("avg-size", avgSize)
    // - add a key/value defined by a builder lambda
    .aggregations("price-histogram",
        a -> a.histogram(h -> h.field("price")))
);
List And Map 永远不为 null
NodeStatistics stats = NodeStatistics.of(b -> b
    .total(1)
    .failed(0)
    .successful(1)
);

// The `failures` list was not provided.
// - it's not null
assertNotNull(stats.failures());
// - it's empty
assertEquals(0, stats.failures().size());
// - and if needed we can know it was actually not defined
assertFalse(ApiTypeHelper.isDefined(stats.failures()));

Variant types(变种类型)

大概意思就是说,比如查询条件,可以允许多种类型的查询条件拼接,允许在构建查询条件中的各种骚操作。

Elasticsearch API 有很多变体类型:查询、聚合、字段映射、分析器等等。在如此庞大的集合中找到正确的类名可能具有挑战性。

Java API 客户端构建器使这变得简单:变体类型的构建器(例如 Query)具有适用于每个可用实现的方法。我们已经在上面的操作中看到了intervals(一种查询)和allOfmatch以及 anyOf(各种间隔)。

这是因为 Java API 客户端中的变体对象是“标记联合”的实现:它们包含它们持有的变体的标识符(或标记)以及该变体的值。例如,一个Query对象可以包含一个 IntervalsQuerywith tag intervals、一个TermQuerywith tagterm等等。这种方法允许编写流畅的代码,您可以让 IDE 完成功能指导您构建和导航复杂的嵌套结构:

变体构建器为每个可用的实现都提供了 setter 方法。它们使用与常规属性相同的约定,并接受构建器 lambda 表达式和变体实际类型的现成对象。下面是构建术语查询的示例:

Query query = new Query.Builder()
    // 选择term变体以构建术语查询。
    .term(t -> t       
         // 使用构建器 lambda 表达式构建术语查询。
        .field("name")                    
        .value(v -> v.stringValue("foo"))
    )
    // 构建Query现在持有一个TermQuery对象的 kind term。
    .build();    

变体对象还提供有关它们当前持有的变体种类的信息:

  • 具有is针对每种变体类型的方法:isTerm()isIntervals()isFuzzy()等。

  • 带有Kind定义所有变体类型的嵌套枚举。

  • // 测试变体是否属于特定种类
    if (query.isTerm()) { 
        doSomething(query.term());
    }
    
    // 测试一组更大的变体类型。
    switch(query._kind()) { 
        case Term:
            doSomething(query.term());
            break;
        case Intervals:
            doSomething(query.intervals());
            break;
        default:
            // 获取变体对象所持有的种类和值。
            doSomething(query._kind(), query._get()); 
    }
    

对象生命周期和线程安全

Java API Client 中有五种不同生命周期的对象:

Object mapper

无状态和线程安全,但创建成本高。它通常是在应用程序启动时创建并用于创建传输的单例。

Transport

线程安全,通过底层 HTTP 客户端持有网络资源。传输对象与 Elasticsearch 集群相关联,必须显式关闭才能释放底层资源,例如网络连接。

Clients

不可变的、无状态的和线程安全的。这些是非常轻量级的对象,仅包装传输并提供 API 端点作为方法。

Builders

可变的,非线程安全的。生成器是临时对象,不应在调用 build().

Requests & other API objects

不可变的,线程安全的。如果您的应用程序反复使用相同的请求或请求的相同部分,则可以提前准备好这些对象,并在使用不同传输的多个客户端的多个调用中重复使用。

从JSON数据创建API对象(7.17.2开始支持的)

从JSON中加载的属性会设置上文中未出现的属性值,也会替换上文中已出现的值

可以读取文件中JSON数据,进行创建索引、查询等操作。

比如有个some-index.json文件内容如下

{
  "mappings": {
    "properties": {
      "field1": { "type": "text" }
    }
  }
}

我们可以这样创建一个索引

InputStream input = this.getClass()
    .getResourceAsStream("some-index.json"); 

CreateIndexRequest req = CreateIndexRequest.of(b -> b
    .index("some-index")
    .withJson(input) 
);

boolean created = client.indices().create(req).acknowledged();

也可以的自定义JSON格式的查询、添加条件,进行操作

Reader queryJson = new StringReader(
    "{" +
    "  \"query\": {" +
    "    \"range\": {" +
    "      \"@timestamp\": {" +
    "        \"gt\": \"now-1w\"" +
    "      }" +
    "    }" +
    "  }" +
    "}");

SearchRequest aggRequest = SearchRequest.of(b -> b
    .withJson(queryJson) 
    .aggregations("max-cpu", a1 -> a1 
        .dateHistogram(h -> h
            .field("@timestamp")
            .calendarInterval(CalendarInterval.Hour)
        )
        .aggregations("max", a2 -> a2
            .max(m -> m.field("host.cpu.usage"))
        )
    )
    .size(0)
);

Map<String, Aggregate> aggs = client
    .search(aggRequest, Void.class) 
    .aggregations();

也支持聚合

Reader queryJson = new StringReader(
    "{" +
    "  \"query\": {" +
    "    \"range\": {" +
    "      \"@timestamp\": {" +
    "        \"gt\": \"now-1w\"" +
    "      }" +
    "    }" +
    "  }," +
    "  \"size\": 100" + 
    "}");

Reader aggregationJson = new StringReader(
    "{" +
    "  \"size\": 0, " + 
    "  \"aggregations\": {" +
    "    \"hours\": {" +
    "      \"date_histogram\": {" +
    "        \"field\": \"@timestamp\"," +
    "        \"interval\": \"hour\"" +
    "      }," +
    "      \"aggregations\": {" +
    "        \"max-cpu\": {" +
    "          \"max\": {" +
    "            \"field\": \"host.cpu.usage\"" +
    "          }" +
    "        }" +
    "      }" +
    "    }" +
    "  }" +
    "}");

SearchRequest aggRequest = SearchRequest.of(b -> b
    .withJson(queryJson) 
    .withJson(aggregationJson) 
    // 默认Es中不存在目标索引会报错,加此配置不报错
    .ignoreUnavailable(true) 
);

Map<String, Aggregate> aggs = client
    .search(aggRequest, Void.class)
    .aggregations();

当各个JSON中都有通用属性时,顺序很重要,就像第二个set会覆盖第一个set的值

两种异常

  1. ElasticsearchException

    服务器收到但是被拒绝报此异常,比如验证错误、服务器内部超时。

  2. TransportException

    未达到服务器报此异常,比如网络错误、服务器不可用。该异常原因是较低级别的实现抛出的异常。在这种情况下,RestClientTransport 它将是一个ResponseException包含低级别 HTTP 响应的。

使用Java Api 客户端

索引单个文档

using the fluent DSL(使用DSL语句)
Product product = new Product("bk-1", "City bike", 123.0);

IndexResponse response = esClient.index(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
);

logger.info("Indexed with version " + response.version());
Product product = new Product("bk-1", "City bike", 123.0);

IndexRequest<Product> request = IndexRequest.of(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
);

IndexResponse response = esClient.index(request);

logger.info("Indexed with version " + response.version());
using classic builders(使用经典构建器)
Product product = new Product("bk-1", "City bike", 123.0);

IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("product");
indexReqBuilder.id(product.getSku());
indexReqBuilder.document(product);

// 注释:个人觉得这样用链式调用比较好看
      Query termQuery = new TermQuery.Builder()
                .field()
                .value()
                .build();

IndexResponse response = esClient.index(indexReqBuilder.build());

logger.info("Indexed with version " + response.version());
Using the asynchronous client 使用异步客户端
ElasticsearchAsyncClient esAsyncClient = new ElasticsearchAsyncClient(transport);

Product product = new Product("bk-1", "City bike", 123.0);

esAsyncClient.index(i -> i
    .index("products")
    .id(product.getSku())
    .document(product)
).whenComplete((response, exception) -> {
    if (exception != null) {
        logger.error("Failed to index", exception);
    } else {
        logger.info("Indexed with version " + response.version());
    }
});

api客户端有两种形式,阻塞式和异步式,两种客户端都返回一个标准的CompletableFuture.

如下:

ElasticsearchTransport transport = ...

// Synchronous blocking client
ElasticsearchClient client = new ElasticsearchClient(transport);

if (client.exists(b -> b.index("products").id("foo")).value()) {
    logger.info("product exists");
}

// Asynchronous non-blocking client
ElasticsearchAsyncClient asyncClient =
    new ElasticsearchAsyncClient(transport);

asyncClient
    .exists(b -> b.index("products").id("foo"))
    .whenComplete((response, exception) -> {
        if (exception != null) {
            logger.error("Failed to index", exception);
        } else {
            logger.info("Product exists");
        }
    });

异步客户端一定要记得处理异步失败

Using raw JSON data(使用原始JSON数据)

当您要索引的数据来自外部来源时,必须创建域对象可能很麻烦,或者对于半结构化数据来说是完全不可能的。(个人理解:假如你现在有一个筛选条件筛选数据,且已经上到了线上环境,现在需要修改这个查询条件,不应该代码里修改然后再打包发布。应该用文本写好查询条件,修改的时候直接修改文本即可)。

方法withJson(),此方法读取源并用于索引请求的document属性,案例Creating API objects from JSON data

Reader input = new StringReader(
    "{'@timestamp': '2022-04-08T13:55:32Z', 'level': 'warn', 'message': 'Some log message'}"
    .replace('\'', '"'));

IndexRequest<JsonData> request = IndexRequest.of(i -> i
    .index("logs")
    .withJson(input)
);

IndexResponse response = esClient.index(request);

logger.info("Indexed with version " + response.version());

以上的源码案例来自于Java API Client tests.

批量:索引多个文档

批量请求允许在一个请求中向ElasticSearch发送多个文档相关的操作。

批量请求说明:Elasticsearch API 文档。

批量请求可以包含多种操作

  • 创建document,确保它不存在后对其进行索引
  • 索引文档,在需要时创建它
  • 使用脚本或部分文档更新已存在的文档
  • 删除文档
indexing application projects(索引应用程序对象)

一个BulkRequest包含一组操作,每个操作都是一个variant type,主要请求使用构建器对象,每个操作使用DSL。

如下:

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    // 添加一个操作(记住列表属性是可加的)。opis 是一个构建器,BulkOperation它是一个变体类型。此类型有index、create和变update体delete。
    br.operations(op -> op    
       // 选择index操作变体,idx是 的构建器IndexOperation。
        .index(idx -> idx     
            // 设置索引操作的属性,类似于单文档索引:索引名称、标识符和文档。
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}
indexing raw json data(索引原始JSON数据)
// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}
使用Bulk Ingester 进行流式摄取

通过提供允许索引/更新/删除操作在批量请求中透明分组的实用程序类,简化BulkIngester了批量 API 的使用。您只需要add()对 ingester 进行批量操作,它会根据其配置负责分组并批量发送它们。

当满足以下条件之一时,ingester 将发送批量请求:

  • 操作次数超过最大值(默认为 1000)
  • 以字节为单位的批量请求大小超过最大值(默认为 5 MiB)
  • 自上次请求过期以来的延迟(定期刷新,无默认值)

此外,您可以定义等待 Elasticsearch 执行的并发请求的最大数量(默认为 1)。当达到最大值并且已收集到最大操作数时,向索引器添加新操作将阻塞。这是为了避免通过对客户端应用程序施加背压来使 Elasticsearch 服务器过载。

BulkIngester<Void> ingester = BulkIngester.of(b -> b
    // 	设置用于发送批量请求的 Elasticsearch 客户端
    .client(esClient)    
    // 设置在发送批量请求之前要收集的最大操作数。
    .maxOperations(100)  
    // 设置刷新间隔。
    .flushInterval(1, TimeUnit.SECONDS) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);
    // 向摄取器添加批量操作
    ingester.add(op -> op 
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}
// 关闭 ingester 以刷新挂起的操作并释放资源。
ingester.close(); 

此外,批量摄取器接受一个侦听器,以便您的应用程序可以收到发送的批量请求及其结果的通知。为了允许将批量操作关联到应用程序上下文,该add()方法可选择接受一个context参数。此上下文参数的类型用作对象的通用参数BulkIngester 。您可能已经注意到上面Void的类型BulkIngester<Void>:这是因为我们没有注册监听器,因此不关心上下文值。

以下示例显示了如何使用上下文值来实现批量摄取侦听器:与以前一样,它批量发送 JSON 日志文件,但跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能希望将其重新添加到 ingester。

BulkListener<String> listener = new BulkListener<String>() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
        // The request was accepted, but may contain failed items.
        // The "context" list gives the file name for each bulk item.
        logger.debug("Bulk request " + executionId + " completed");
        for (int i = 0; i < contexts.size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                // Inspect the failure cause
                logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
        // The request could not be sent
        logger.debug("Bulk request " + executionId + " failed", failure);
    }
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
    .client(esClient)
    .maxOperations(100)
    .flushInterval(1, TimeUnit.SECONDS)
    .listener(listener) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        ),
        file.getName() 
    );
}

ingester.close();

批量摄取还公开了允许监视摄取过程并调整其配置的统计信息:

  • 添加的操作数,
  • add()由于达到最大并发请求数(争用)而被阻止的 调用数,
  • 发送的批量请求数,
  • 由于达到最大并发请求数而被阻止的批量请求数。

通过id读取文档

Reading a domain object

从索引product中获取id = bk-1的对象,此get请求有两个参数

GetResponse<Product> response = esClient.get(g -> g
    .index("products") 
    .id("bk-1"),
    // 结果需要转成的对象                                         
    Product.class      
);

if (response.found()) {
    Product product = response.source();
    logger.info("Product name " + product.getName());
} else {
    logger.info ("Product not found");
}
Reading raw JSON
GetResponse<ObjectNode> response = esClient.get(g -> g
    .index("products")
    .id("bk-1"),
    // 	The target class is a raw JSON object.
    ObjectNode.class     
);

if (response.found()) {
    ObjectNode json = response.source();
    String name = json.get("name").asText();
    logger.info("Product name " + name);
} else {
    logger.info("Product not found");
}

搜索文件(Search for document)

索引文档可用于近乎实时的搜索

简单的搜索查询
String searchText = "bike";

SearchResponse<Product> response = esClient.search(s -> s
    .index("products") 
    .query(q -> q      
        .match(t -> t   
            .field("name")  
            .query(searchText)
        )
    ),
    Product.class      
);

TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;

if (isExactResult) {
    logger.info("There are " + total.value() + " results");
} else {
    logger.info("There are more than " + total.value() + " results");
}

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
嵌套搜索查询
String searchText = "bike";
double maxPrice = 200.0;

// Search by product name
Query byName = MatchQuery.of(m -> m 
    .field("name")
    .query(searchText)
)._toQuery(); 

// Search by max price
Query byMaxPrice = RangeQuery.of(r -> r
    .field("price")
    .gte(JsonData.of(maxPrice)) 
)._toQuery();

// Combine name and price queries to search the product index
SearchResponse<Product> response = esClient.search(s -> s
    .index("products")
    .query(q -> q
        .bool(b -> b 
            .must(byName) 
            .must(byMaxPrice)
        )
    ),
    Product.class
);

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
模板搜索

搜索模板是一种存储的搜索,您可以使用不同的变量运行它。搜索模板让您无需修改应用程序代码即可更改搜索。

在运行模板搜索之前,您首先必须创建模板。这是一个返回搜索请求主体的存储脚本,通常定义为 Mustache 模板。这个存储的脚本可以在应用程序外部创建,也可以使用 Java API 客户端创建:

// Create a script
esClient.putScript(r -> r
    // 模板脚本标识符/名称
    .id("query-script") 
    .script(s -> s
        .lang("mustache")
        .source("{\"query\":{\"match\":{\"{{field}}\":\"{{value}}\"}}}")
    ));

要使用搜索模板,请使用方法searchTemplate引用脚本并为其参数提供值:

SearchTemplateResponse<Product> response = esClient.searchTemplate(r -> r
        .index("some-index")
        // 要使用的模板脚本标识符
        .id("query-script") 
        // 模板脚本里面需要的参数
        .params("field", JsonData.of("some-field")) 
        .params("value", JsonData.of("some-data")),
    Product.class
);

List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {
    Product product = hit.source();
    logger.info("Found product " + product.getSku() + ", score " + hit.score());
}

聚合

Elasticsearch 文档。

一个简单的聚合

分析类的聚合,一般不使用document,所以查询的size为0,返回结果的目标类为Void。

例如-查询:

String searchText = "bike";

Query query = MatchQuery.of(m -> m
    .field("name")
    .query(searchText)
)._toQuery();

// 查询的是价格直方图,价格以50为步进递增分区查询
SearchResponse<Void> response = esClient.search(b -> b
    .index("products")
    // 不需要document返回数据,纯聚合
    .size(0) 
    .query(query) 
    // 聚合名称为 'price-histogram'
    .aggregations("price-histogram", a -> a 
        .histogram(h -> h 
            .field("price")
            .interval(50.0)
        )
    ),
    Void.class 
);

例如-响应:

List<HistogramBucket> buckets = response.aggregations()
    // 聚合名称
    .get("price-histogram") 
    // 聚合类型/聚合函数,例如avg、sum、max、min
    .histogram() 
    // 数组或者map
    .buckets().array(); 

for (HistogramBucket bucket: buckets) {
    logger.info("There are " + bucket.docCount() +
        " bikes under " + bucket.key());
}

Java API Client tests.

故障排除

MissingRequiredPropertyException in a response

Java API客户端区分两种属性,可选属性和必须属性,可选属性用@Nullable注释标记,未被@Nullable标记的必然不会为null。

但是,Elasticsearch API 规范可能存在错误,错误的需要响应对象的属性,导致MissingRequiredPropertyException反序列化响应时出现错误。如果发生这种情况,您可以按照以下方法解决:

  • 确保使用最新版本的 Java API 客户端。该问题可能已经得到解决。(新版本可能没有这个问题)
  • 如果问题仍然存在于最新版本中,请打开一个问题,以便我们可以在下一个版本中修复它。请帮助我们改进 Java API 客户端。(如果新版本有,请反馈给官方)
  • 暂时禁用违规请求所需的属性检查(干掉属性检查)
   ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true);
    SomeRequest request = SomeRequest.of(...);
    SomeResponse response = esClient.someApi(request);
    ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(false);
    // Do something with response
}

DANGEROUS_disableRequiredPropertiesCheck方法禁用当前线程所需的属性检查,以及异步请求中的响应反序列化。顾名思义,它是危险的,因为它取消了对非 . 属性的保证@Nullable。在问题得到解决之前,这是一个临时解决方法。

请注意,此方法的结果是一个AutoCloseable将所需属性检查重置为其先前设置的对象。因此,您可以在 try-with-resource 块中使用它,如下所示:

try (ApiTypeHelper.DisabledChecksHandle h =
        ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true)) {
    SomeRequest request = SomeRequest.of(...);
    SomeResponse response = esClient.someApi(request);
    // Do something with response
}
Serializing aggregations and suggestions without typed keys

大概意思就是

当你不需要格式化序列对象的时候,typed_keys您可以通过将JsonpMapperFeatures.SERIALIZE_TYPED_KEYS属性设置为false映射器对象来禁用序列化。

Elasticsearch 搜索请求接受一个typed_key参数,该参数允许返回类型信息以及聚合和建议结果中的名称(有关更多详细信息,请参阅聚合文档)。

Java API 客户端总是将此参数添加到搜索请求中,因为需要类型信息来了解应该用于反序列化聚合和建议结果的具体类。

对称地,Java API Client 也使用这种typed_keys格式序列化聚合和建议结果,以便它可以正确地反序列化自己序列化的结果。

代码如下,默认有序列化

ElasticsearchClient esClient = ...
JsonpMapper mapper = esClient._jsonpMapper();

StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
    mapper.serialize(searchResponse, generator);
}
String result = writer.toString();

// The aggregation property provides the "avg" type and "price" name
assertTrue(result.contains("\"aggregations\":{\"avg#price\":{\"value\":3.14}}}"));

设置没序列化文章来源地址https://www.toymoban.com/news/detail-625133.html

ElasticsearchClient esClient = ...
// Create a new mapper with the typed_keys feature disabled
JsonpMapper mapper = esClient._jsonpMapper()
    .withAttribute(JsonpMapperFeatures.SERIALIZE_TYPED_KEYS, false);

StringWriter writer = new StringWriter();
try (JsonGenerator generator = mapper.jsonProvider().createGenerator(writer)) {
    mapper.serialize(searchResponse, generator);
}
String result = writer.toString();

// The aggregation only provides the "price" name
assertTrue(result.contains("\"aggregations\":{\"price\":{\"value\":3.14}}}"));

到了这里,关于学习笔记-elstaciElasticSearch7.17官方文档的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Docker学习笔记17

    跨主机容器间网络: 实现跨主机容器间通信的工具: 1)Pipework 2)Flannel 3)Weave 4)Open V Switch (OVS) 5)Calico 1. Weave: 在每个宿主机上布置一个特殊的route的容器,不同宿主机的route容器连接起来,route拦截所有普通容器的ip请求,并通过 udp包 发送到其它宿主机上的普通容器

    2024年02月12日
    浏览(33)
  • 企业架构LNMP学习笔记17

    反向代理: 反向代理服务器和真实访问的服务器是在一起的,有关联的。 根据实际业务需求,分发代理页面到不同的解释器。常见于代理后端服务器。 安装apache服务器: 修改配置文件:   由nginx反向代理给后端的apache服务器处理,apache处理完成后再交给nginx返回给客户端。

    2024年02月09日
    浏览(38)
  • ros2官方文档(基于humble版本)学习笔记

    由于市面上专门讲ROS 2开发的书籍不多,近期看完了《ROS机器人开发实践》其中大部分内容还是基于ROS 1写的,涉及topic,service,action等一些重要的概念,常用组件,建模与仿真,应用(机器视觉,机器语音,SLAM,机械臂),最后一章写了ROS 2的安装,话题通信和服务通信的示

    2024年02月11日
    浏览(42)
  • Spring MVC官方文档学习笔记(二)之DispatcherServlet

    1.DispatcherServlet入门 (1) Spring MVC是以前端控制器模式(即围绕着一个中央的Servelt, DispatcherServlet)进行设计的,这个DispatcherServlet为请求的处理提供了一个共用的算法,即它都会将实际的请求处理工作委托给那些可配置的组件进行执行,说白了,DispatcherServlet的作用就是进行统一调度,并

    2024年02月07日
    浏览(82)
  • ROS 2官方文档(基于humble版本)学习笔记(二)

    今天继续总结CLI 工具章的学习 理解节点(node) ROS 2图是一个ROS 2元件同时处理数据的网络,如果将它们全部映射并可视化它们,则包括所有可执行文件以及它们之间的连接。 ROS中的每个节点(node)都应该只为了单个的、模块化的目的而设计的,例如控制车轮电动机或从激光

    2024年02月10日
    浏览(44)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(53)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(55)
  • Spring MVC官方文档学习笔记(一)之Web入门

    注: 该章节主要为原创内容,为后续的Spring MVC内容做一个先行铺垫 1.Servlet的构建使用 (1) 选择Maven - webapp来构建一个web应用 (2) 构建好后,打开pom.xml文件,一要注意打包方式为war包,二导入servlet依赖,如下 (3) 替换webapp/WEB-INF/web.xml文件为如下内容,采用Servlet 3.1版本 (4) 在

    2024年02月03日
    浏览(43)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(48)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包