异步检索在 Elasticsearch 中的理论与实践

这篇具有很好参考价值的文章主要介绍了异步检索在 Elasticsearch 中的理论与实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

异步检索在 Elasticsearch 中的理论与实践

https://www.elastic.co/guide/en/elasticsearch/reference/8.1/async-search.html#submit-async-search

引言

Elasticsearch 是一种强大的分布式搜索和分析引擎,它能够快速地存储、搜索和分析大量数据。在处理大规模数据时,性能和响应时间变得至关重要。为了提高搜索和查询操作的效率,Elasticsearch 支持异步检索。本文将深入探讨异步检索在 Elasticsearch 中的理论原理,展示如何在实践中使用它,并提供使用场景和注意事项。

什么是异步检索?

在传统的同步搜索中,当客户端发出一个查询请求后,它需要等待 Elasticsearch 返回所有匹配结果才能继续处理其他任务。而异步检索允许客户端发起一个查询请求后,不必等待搜索结果立即返回,而是可以继续执行其他操作。Elasticsearch 在后台处理这个查询请求,当查询完成后,客户端会得到一个响应。

异步检索的优点在于它能够显著提高搜索和查询操作的性能和响应时间,特别是在处理大量数据或复杂查询时。

添加测试数据

使用python3脚本完成,根据github修改而来

https://github.com/oliver006/elasticsearch-test-data

生成测试数据脚本见文章末尾

执行命令

python3 es_test_data.py --es_url=http://127.0.0.1:9200 --count=1000000

如何使用异步检索?

1. 创建异步搜索任务

在 Elasticsearch 中,使用异步检索需要创建一个异步搜索任务。你可以通过发送一个异步搜索请求来创建任务。以下是一个使用 Elasticsearch 的 REST API 发起异步搜索请求的示例:

POST /test_data/_async_search?size=0
{
  "sort": [
    { "last_updated": { "order": "asc" } }
  ],
  "aggs": {
    "sale_date": {
      "date_histogram": {
        "field": "last_updated",
        "calendar_interval": "1d"
      }
    }
  }
}

在上述示例中,我们向名为 test_data 的索引提交了一个异步搜索请求,该请求使用简单的匹配查询来查找包含特定值的文档。

相应内容如下,注意ID的值即可

如果看不到ID的值,再加一部分数据量再次检索即可

{
  "id" : "FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg=",
  "is_partial" : true,
  "is_running" : true,
  "start_time_in_millis" : 1690808656033,
  "expiration_time_in_millis" : 1691240656033,
  "response" : {
    "took" : 1001,
    "timed_out" : false,
    "terminated_early" : false,
    "num_reduce_phases" : 0,
    "_shards" : {
      "total" : 1,
      "successful" : 0,
      "skipped" : 0,
      "failed" : 0
    },
    "hits" : {
      "total" : {
        "value" : 0,
        "relation" : "gte"
      },
      "max_score" : null,
      "hits" : [ ]
    }
  }
}

2. 获取异步搜索结果

一旦创建了异步搜索任务,你可以轮询获取任务的结果。Elasticsearch 返回一个任务 ID(上一步返回的ID),你可以使用这个 ID 来检索结果。以下是获取异步搜索结果的示例:

GET /_async_search/<task_id>

GET /_async_search/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg=

在上述示例中,我们使用 <task_id> 来获取异步搜索任务的状态。

3. 获取异步搜索的状态

获取异步搜索结果后,可以对结果进行处理和解析。通常,结果会以 JSON 格式返回,其中包含搜索的匹配文档、聚合信息等。仅仅是在url中加入status

GET /_async_search/status/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg=

返回结果如下

{
  "id" : "FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg=",
  "is_running" : false,
  "is_partial" : false,
  "start_time_in_millis" : 1690808656033,
  "expiration_time_in_millis" : 1691240656033,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "completion_status" : 200
}

4. 删除异步检索

DELETE /_async_search/FjU0SDlRSFZ2UTdxZUpkaFdLSF9hOVEdZzBVS3hmd1FTWEc3VmpCc1gzZFZhdzo2NDI0Mzg=

使用场景

异步检索在以下场景中特别有用:

  1. 大数据量搜索: 当索引包含大量数据时,同步搜索可能会导致请求阻塞并增加响应时间。异步检索能够提高搜索性能,让客户端可以并发处理其他任务。

  2. 复杂查询: 复杂的搜索查询可能需要更长的处理时间。通过使用异步检索,可以避免客户端长时间等待,提高用户体验。

  3. 定时任务: 如果你需要定期执行一些查询,并将结果导出或进行其他操作,异步检索可以让你更加灵活地处理这些任务。

使用注意事项

虽然异步检索提供了很多好处,但在使用时也需要注意以下事项:

  1. 任务状态管理: 确保正确地管理异步搜索任务的状态。任务可能处于不同的状态,包括运行中、完成和失败。及时清理已经完成或失败的任务,避免资源浪费。

  2. 任务结果有效性: 确保处理异步搜索结果时,对结果进行有效性验证和解析。避免因错误处理结果而导致数据不一致或错误的分析。

  3. 资源限制: 异步检索仍然占用服务器资源,特别是在处理大量并发任务时。确保服务器资源足够以支持异步检索的需求。

  4. 超时和重试: 考虑到网络或其他故障可能导致异步搜索请求失败,需要合理设置超时时间并实现重试机制,以确保请求的可靠性。

结论

异步检索是 Elasticsearch 中一个强大且实用的特性,可以显著提高搜索和查询操作的性能,特别在处理大规模数据或复杂查询时。在使用异步检索时,注意合理管理任务状态、验证结果有效性,并注意资源限制和错误处理。合理地应用异步检索,能为我们的应用程序带来更高效的搜索和分析功能。

测试脚本

#!/usr/bin/python

import nest_asyncio
nest_asyncio.apply()

import json
import csv
import time
import logging
import random
import string
import uuid
import datetime

import tornado.gen
import tornado.httpclient
import tornado.ioloop
import tornado.options

try:
    xrange
    range = xrange
except NameError:
    pass

async_http_client = tornado.httpclient.AsyncHTTPClient()
headers = tornado.httputil.HTTPHeaders({"content-type": "application/json"})
id_counter = 0
upload_data_count = 0
_dict_data = None



def delete_index(idx_name):
    try:
        url = "%s/%s?refresh=true" % (tornado.options.options.es_url, idx_name)
        request = tornado.httpclient.HTTPRequest(url, headers=headers, method="DELETE", request_timeout=240, auth_username=tornado.options.options.username, auth_password=tornado.options.options.password, validate_cert=tornado.options.options.validate_cert)
        response = tornado.httpclient.HTTPClient().fetch(request)
        logging.info('Deleting index  "%s" done   %s' % (idx_name, response.body))
    except tornado.httpclient.HTTPError:
        pass


def create_index(idx_name):
    schema = {
        "settings": {
            "number_of_shards":   tornado.options.options.num_of_shards,
            "number_of_replicas": tornado.options.options.num_of_replicas
        },
        "refresh": True
    }

    body = json.dumps(schema)
    url = "%s/%s" % (tornado.options.options.es_url, idx_name)
    try:
        logging.info('Trying to create index %s' % (url))
        request = tornado.httpclient.HTTPRequest(url, headers=headers, method="PUT", body=body, request_timeout=240, auth_username=tornado.options.options.username, auth_password=tornado.options.options.password, validate_cert=tornado.options.options.validate_cert)
        response = tornado.httpclient.HTTPClient().fetch(request)
        logging.info('Creating index "%s" done   %s' % (idx_name, response.body))
    except tornado.httpclient.HTTPError:
        logging.info('Looks like the index exists already')
        pass


@tornado.gen.coroutine
def upload_batch(upload_data_txt):
    try:
        request = tornado.httpclient.HTTPRequest(tornado.options.options.es_url + "/_bulk",
                                                 method="POST",
                                                 body=upload_data_txt,
                                                 headers=headers,
                                                 request_timeout=tornado.options.options.http_upload_timeout,
                                                 auth_username=tornado.options.options.username, auth_password=tornado.options.options.password, validate_cert=tornado.options.options.validate_cert)
        response = yield async_http_client.fetch(request)
    except Exception as ex:
        logging.error("upload failed, error: %s" % ex)
        return

    result = json.loads(response.body.decode('utf-8'))
    res_txt = "OK" if not result['errors'] else "FAILED"
    took = int(result['took'])
    logging.info("Upload: %s - upload took: %5dms, total docs uploaded: %7d" % (res_txt, took, upload_data_count))


def get_data_for_format(format):
    split_f = format.split(":")
    if not split_f:
        return None, None

    field_name = split_f[0]
    field_type = split_f[1]

    return_val = ''

    if field_type == 'arr':
        return_val = []
        array_len_expr = split_f[2]
        if '-' in array_len_expr:
            (min,max) = array_len_expr.split('-')
            array_len = generate_count(int(min), int(max))
        else:
            array_len = int(array_len_expr)

        single_elem_format = field_name + ':' + format[len(field_name) + len(field_type) + len(array_len_expr) + 3 : ]
        for i in range(array_len):
            x = get_data_for_format(single_elem_format)
            return_val.append(x[1])

    elif field_type == "bool":
        return_val = random.choice([True, False])

    elif field_type == "str":
        min = 3 if len(split_f) < 3 else int(split_f[2])
        max = min + 7 if len(split_f) < 4 else int(split_f[3])
        length = generate_count(min, max)
        return_val = "".join([random.choice(string.ascii_letters + string.digits) for x in range(length)])

    elif field_type == "int":
        min = 0 if len(split_f) < 3 else int(split_f[2])
        max = min + 100000 if len(split_f) < 4 else int(split_f[3])
        return_val = generate_count(min, max)
    
    elif field_type == "ipv4":
        return_val = "{0}.{1}.{2}.{3}".format(generate_count(0, 245),generate_count(0, 245),generate_count(0, 245),generate_count(0, 245))

    elif field_type in ["ts", "tstxt"]:
        now = int(time.time())
        per_day = 24 * 60 * 60
        min = now - 30 * per_day if len(split_f) < 3 else int(split_f[2])
        max = now + 30 * per_day if len(split_f) < 4 else int(split_f[3])
        ts = generate_count(min, max)
        return_val = int(ts * 1000) if field_type == "ts" else datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S.000-0000")

    elif field_type == "words":
        min = 2 if len(split_f) < 3 else int(split_f[2])
        max = min + 8 if len(split_f) < 4 else int(split_f[3])
        count = generate_count(min, max)
        words = []
        for _ in range(count):
            word_len = random.randrange(3, 10)
            words.append("".join([random.choice(string.ascii_letters + string.digits) for x in range(word_len)]))
        return_val = " ".join(words)

    elif field_type == "dict":
        global _dict_data
        min = 2 if len(split_f) < 3 else int(split_f[2])
        max = min + 8 if len(split_f) < 4 else int(split_f[3])
        count = generate_count(min, max)
        return_val = " ".join([random.choice(_dict_data).strip() for _ in range(count)])

    elif field_type == "text":
        text = ["text1", "text2", "text3"] if len(split_f) < 3 else split_f[2].split("-")
        min = 1 if len(split_f) < 4 else int(split_f[3])
        max = min + 1 if len(split_f) < 5 else int(split_f[4])
        count = generate_count(min, max)
        words = []
        for _ in range(count):
            words.append(""+random.choice(text))
        return_val = " ".join(words)

    return field_name, return_val


def generate_count(min, max):
    if min == max:
        return max
    elif min > max:
        return random.randrange(max, min);
    else:
        return random.randrange(min, max);


def generate_random_doc(format):
    global id_counter

    res = {}

    for f in format:
        f_key, f_val = get_data_for_format(f)
        if f_key:
            res[f_key] = f_val

    if not tornado.options.options.id_type:
        return res

    if tornado.options.options.id_type == 'int':
        res['_id'] = id_counter
        id_counter += 1
    elif tornado.options.options.id_type == 'uuid4':
        res['_id'] = str(uuid.uuid4())

    return res


def set_index_refresh(val):

    params = {"index": {"refresh_interval": val}}
    body = json.dumps(params)
    url = "%s/%s/_settings" % (tornado.options.options.es_url, tornado.options.options.index_name)
    try:
        request = tornado.httpclient.HTTPRequest(url, headers=headers, method="PUT", body=body, request_timeout=240, auth_username=tornado.options.options.username, auth_password=tornado.options.options.password, validate_cert=tornado.options.options.validate_cert)
        http_client = tornado.httpclient.HTTPClient()
        http_client.fetch(request)
        logging.info('Set index refresh to %s' % val)
    except Exception as ex:
        logging.exception(ex)


def csv_file_to_json(csvFilePath):
    data = []

    # Open a csv reader called DictReader
    with open(csvFilePath, encoding='utf-8') as csvf:
        csvReader = csv.DictReader(csvf)
        for rows in csvReader:
            data.append(rows)

    return json.dumps(data)


@tornado.gen.coroutine
def generate_test_data():

    global upload_data_count

    if tornado.options.options.force_init_index:
        delete_index(tornado.options.options.index_name)

    create_index(tornado.options.options.index_name)

    # todo: query what refresh is set to, then restore later
    if tornado.options.options.set_refresh:
        set_index_refresh("-1")

    if tornado.options.options.out_file:
        out_file = open(tornado.options.options.out_file, "w")
    else:
        out_file = None

    if tornado.options.options.dict_file:
        global _dict_data
        with open(tornado.options.options.dict_file, 'r') as f:
            _dict_data = f.readlines()
        logging.info("Loaded %d words from the %s" % (len(_dict_data), tornado.options.options.dict_file))

    format = tornado.options.options.format.split(',')
    if not format:
        logging.error('invalid format')
        exit(1)

    ts_start = int(time.time())
    upload_data_txt = ""

    if tornado.options.options.data_file:
        json_array = ""
        if tornado.options.options.data_file.endswith(".csv"):
            json_array = json.loads(csv_file_to_json(tornado.options.options.data_file))
        else:
            with open(tornado.options.options.data_file, 'r') as f:
                json_array = json.load(f)
            logging.info("Loaded documents from the %s", tornado.options.options.data_file)

        for item in json_array:
            cmd = {'index': {'_index': tornado.options.options.index_name}}
                             # '_type': tornado.options.options.index_type}}
            if '_id' in item:
                cmd['index']['_id'] = item['_id']

            upload_data_txt += json.dumps(cmd) + "\n"
            upload_data_txt += json.dumps(item) + "\n"

        if upload_data_txt:
            yield upload_batch(upload_data_txt)
    else:
        logging.info("Generating %d docs, upload batch size is %d" % (tornado.options.options.count,
                                                                      tornado.options.options.batch_size))
        for num in range(0, tornado.options.options.count):

            item = generate_random_doc(format)

            if out_file:
                out_file.write("%s\n" % json.dumps(item))

            cmd = {'index': {'_index': tornado.options.options.index_name}}
                             # '_type': tornado.options.options.index_type}}
            if '_id' in item:
                cmd['index']['_id'] = item['_id']

            upload_data_txt += json.dumps(cmd) + "\n"
            upload_data_txt += json.dumps(item) + "\n"
            upload_data_count += 1

            if upload_data_count % tornado.options.options.batch_size == 0:
                yield upload_batch(upload_data_txt)
                upload_data_txt = ""

        # upload remaining items in `upload_data_txt`
        if upload_data_txt:
            yield upload_batch(upload_data_txt)

    if tornado.options.options.set_refresh:
        set_index_refresh("1s")

    if out_file:
        out_file.close()

    took_secs = int(time.time() - ts_start)

    logging.info("Done - total docs uploaded: %d, took %d seconds" % (tornado.options.options.count, took_secs))


if __name__ == '__main__':
    tornado.options.define("es_url", type=str, default='http://localhost:9200', help="URL of your Elasticsearch node")
    tornado.options.define("index_name", type=str, default='test_data', help="Name of the index to store your messages")
    tornado.options.define("index_type", type=str, default='test_type', help="Type")
    tornado.options.define("batch_size", type=int, default=1000, help="Elasticsearch bulk index batch size")
    tornado.options.define("num_of_shards", type=int, default=2, help="Number of shards for ES index")
    tornado.options.define("http_upload_timeout", type=int, default=3, help="Timeout in seconds when uploading data")
    tornado.options.define("count", type=int, default=100000, help="Number of docs to generate")
    tornado.options.define("format", type=str, default='name:str,age:int,last_updated:ts', help="message format")
    tornado.options.define("num_of_replicas", type=int, default=0, help="Number of replicas for ES index")
    tornado.options.define("force_init_index", type=bool, default=False, help="Force deleting and re-initializing the Elasticsearch index")
    tornado.options.define("set_refresh", type=bool, default=False, help="Set refresh rate to -1 before starting the upload")
    tornado.options.define("out_file", type=str, default=False, help="If set, write test data to out_file as well.")
    tornado.options.define("id_type", type=str, default=None, help="Type of 'id' to use for the docs, valid settings are int and uuid4, None is default")
    tornado.options.define("dict_file", type=str, default=None, help="Name of dictionary file to use")
    tornado.options.define("data_file", type=str, default=None, help="Name of the documents file to use")
    tornado.options.define("username", type=str, default=None, help="Username for elasticsearch")
    tornado.options.define("password", type=str, default=None, help="Password for elasticsearch")
    tornado.options.define("validate_cert", type=bool, default=True, help="SSL validate_cert for requests. Use false for self-signed certificates.")
    tornado.options.parse_command_line()

    tornado.ioloop.IOLoop.instance().run_sync(generate_test_data)

本文由 mdnice 多平台发布文章来源地址https://www.toymoban.com/news/detail-621373.html

到了这里,关于异步检索在 Elasticsearch 中的理论与实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 探索图像检索:从理论到实战的应用

    本文深入探讨了图像检索技术及其在主流APP中的应用,涵盖了特征提取、相似度计算、索引技术,以及在电商、社交媒体和云服务中的实际应用案例。 关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人

    2024年01月21日
    浏览(34)
  • 自然语言处理 Paddle NLP - 检索式文本问答-理论

    基础 自然语言处理(NLP) 自然语言处理PaddleNLP-词向量应用展示 自然语言处理(NLP)-前预训练时代的自监督学习 自然语言处理PaddleNLP-预训练语言模型及应用 自然语言处理PaddleNLP-文本语义相似度计算(ERNIE-Gram) 自然语言处理PaddleNLP-词法分析技术及其应用 自然语言处理Pa

    2024年02月11日
    浏览(53)
  • Elasticsearch 全文检索 分词检索-Elasticsearch文章四

    https://www.elastic.co/guide/en/enterprise-search/current/start.html https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-dsl-match-query.html Full text Query中,我们只需要把如下的那么多点分为3大类,你的体系能力会大大提升 很多api都可以查得到,我们只要大概知道有支持哪些功能 Elasticsearch 执行

    2024年02月14日
    浏览(52)
  • 全文检索-Elasticsearch-进阶检索

    本文记录谷粒商城高级篇的 Elasticsearch 进阶检索部分,续上之前记录的 Elasticsearch入门篇。 ES 支持两种基本方式检索 : 一个是通过使用 REST request URI 发送搜索参数(uri + 检索参数) 另一个是通过使用 REST request body 来发送它们(uri + 请求体) 请求体中写查询条件,语法: 示例

    2024年02月03日
    浏览(88)
  • FormData异步发送文件,后端SpringBoot接收

    平时我们用表单提交数据时,所有的数据都是以键值对的形式提交给后端的,对于文件二进制流数据也是以键值对提交的,只是说此时值的内容是二进制数据罢了。如果我们想给后端上传文件,一般都是利用表单里的File控件去提交的,但这时候有一个问题,这种上传方式不是

    2024年02月15日
    浏览(47)
  • 后端架构师必知必会系列:搜索引擎与全文检索

    作者:禅与计算机程序设计艺术 什么是搜索引擎?它是指在互联网上搜集、整理并快速索引海量信息的数据结构与数据库,对用户的查询请求进行快速、准确的响应的技术。搜索引擎又分为互联网搜索引擎(英语: internet search engine)和本地搜索引擎(英语: local search engine)。 全文

    2024年02月05日
    浏览(50)
  • Elasticsearch 8.X 向量检索和普通检索能否实现组合检索?如何实现?

    向量组合条件查询,报 [vector] malformed query, expected [END_OBJECT] but found [FIELD_NAME] 错误, 向量查询是不支持复合条件查询吗? ——问题来自:死磕 Elasticsearch 知识星球 https://t.zsxq.com/18skX0ZS6 类似问题在社群里被问到 2 次以上了! 向量搜索热度不减,所以我们非常有必要将向量搜

    2024年04月11日
    浏览(45)
  • Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(37)
  • ElasticSearch 实战:ElasticSearch文档全文检索

    Elasticsearch 实战:Elasticsearch 文档全文检索 全文检索是 Elasticsearch 的核心功能之一,它允许用户对文本内容进行高效的模糊搜索、词组匹配、同义词处理、停用词过滤等操作。以下是如何进行文档全文检索的详细步骤: **1. **全文匹配查询(Match Query) 最基础的全文检索查询是

    2024年04月11日
    浏览(51)
  • 使用 Elasticsearch 和 LlamaIndex 进行高级文本检索:句子窗口检索

    2023 年是检索增强生成 (RAG) 的一年,人们探索了许多用例,并使用该技术开发了数百种产品。 从 Q/A 聊天机器人到基于上下文的代理,RAG 的使用一直是 LLM 申请快速增长的主要因素。 支持不断发展的社区以及 Langchain 和 LlamaIndex 等强大框架的可用性,使开发人员可以更轻松地

    2024年01月18日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包