elasticsearch使用脚本 滚动关闭索引,更新index setting

这篇具有很好参考价值的文章主要介绍了elasticsearch使用脚本 滚动关闭索引,更新index setting。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一 问题

     在旧的索引中更新mapping时,新增了分词器(分词器已经在模板中添加),但是在更新mapping时报错:

elasticsearch使用脚本 滚动关闭索引,更新index setting

查看elasticsearch官网,发现不允许在已经存在的索引中动态更新分词器,只能先将索引close,更新分词器,然后再打开

Update index settings API | Elasticsearch Guide [8.3] | Elastic

You can only define new analyzers on closed indices.

To add an analyzer, you must close the index, define the analyzer, and reopen the index.

二 问题解决方式步骤(已经验证)

2.1 由暂停数据写入&关闭分片分配

    暂停数据写入,可以避免恢复阶段translog中大量数据回放,提升索引恢复速度。

关闭分片分配

 PUT _cluster/settings
  { "persistent" : { "cluster.routing.rebalance.enable": "none" } }

2.2 对所有需要进行更新分词器的索引执行flush操作。

    作用是将内存buffer flush到硬盘中,避免恢复阶段translog中大量数据回放,提升索引恢复速度。

   (1)获取所有索引

          GET /_cat/indices

 (2)获取指定索引对应的setting

         GET /prod_candidatev2_chunk_{i}/_settings

        根据setting内容判断索引是否包含分词器 back_edge_ngram_analyzer word_analyzer 作为更新索引判断依据

  (3)对指定索引进行flush操作以及sync操作,加速恢复,避免从主分片全量拉取数据同步至副本。

     POST /index{i}/_flush

    POST /index{i}/_flush/synced

2.3 对索引进行关闭操作

  POST /index{i}/_close

2.4 更新索引分词器

PUT index{i}/_settings
{
    
    "analysis": {
      "filter":{
        "back_edge_ngram_filter": {
          "min_gram": "1",
          "side": "back",
          "type": "edgeNGram",
          "max_gram": "256"
        }
      },
      "analyzer": {
        "back_edge_ngram_analyzer": {
          "filter": [
            "standard",
            "lowercase",
            "back_edge_ngram_filter"
          ],
          "tokenizer": "keyword"
        },
        "word_analyzer": {
          "filter": [
            "standard",
            "lowercase"
          ],
          "tokenizer": "keyword"
        },
        "mk_edge2": { 
              "type": "mk_edge",
              "extra_prarm": "cb,3,-1"
            },
            "mk_edge1": {  
              "type": "mk_edge",
              "extra_prarm": "wb,1,-1"
            }
      }
    }
  }

2.5 轮询等待集群恢复green状态

GET  _cluster/health

2.6 重复2.2至2.5步骤,直至所有索引恢复

python处理脚本:

 文章来源地址https://www.toymoban.com/news/detail-482833.html

import time

from tqdm import trange

from elasticsearch import Elasticsearch

ES_HOST = ["http://elastic:"]

client = Elasticsearch(ES_HOST)

# index_pattern = "thoth_candidatev2_chunk_*"
index_pattern = "prod_candidatev2_chunk_*"
# index_pattern = "test_candidatev2_chunk_*"

put_all_index = False

def main():
    all_indices = get_all_indices()
    print("Number of indices: ", len(all_indices))
    all_indices = [index for index in all_indices if not is_updated_index_settings(index) ]
    print("Number of not updated indices: ", len(all_indices))
    for index in all_indices:
        if put_all_index or select():
            print(f"Start put {index} settings")
            put_index_settings(index)
            check_cluster_health(index)
        else:
            break
    print('Finished')

def select():
    global put_all_index

    text = input("continue(all/y/n): ")
    if text == 'y':
        return True
    elif text == 'n':
        return False
    elif text == 'all':
        put_all_index = True
        return True
        
    
def is_updated_index_settings(index):
    settings = client.indices.get_settings(index=index)
    analysis = settings[index]["settings"]["index"]["analysis"]
    if "word_analyzer" in analysis["analyzer"] and "back_edge_ngram_analyzer" in analysis["analyzer"] and "back_edge_ngram_filter" in analysis["filter"]:
        print(f"{index} done")
        return True
    else:
        return False

def put_index_settings(index):
    
    if client.cat.indices(index=index,params={"h": "status"}).strip() != 'close':
        print(f"flush {index}")
        client.indices.flush(index=index)
        print(f"flush {index} done")
    
    close_index(index)

    body = '{"analysis":{"filter":{"back_edge_ngram_filter":{"min_gram":"1","side":"back","type":"edgeNGram","max_gram":"256"}},"analyzer":{"back_edge_ngram_analyzer":{"filter":["standard","lowercase","back_edge_ngram_filter"],"tokenizer":"keyword"},"word_analyzer":{"filter":["standard","lowercase"],"tokenizer":"keyword"},"mk_edge2":{"type":"mk_edge","extra_prarm":"cb,3,-1"},"mk_edge1":{"type":"mk_edge","extra_prarm":"wb,1,-1"}}}}'
    client.indices.put_settings(index=index, body=body)
    
    if not is_updated_index_settings(index):
        print(f"put index error: {index}")
        put_index_settings(index)
    
    open_index(index)

def close_index(index):
    print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    client.indices.close(index)
    print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())

def open_index(index):
    print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())
    client.indices.open(index)
    print(f"{index} status: ", client.cat.indices(index=index,params={"h": "status"}).strip())


def check_cluster_health(index):
    t = trange(100, desc="recover: ", leave=True)
    last_progress = 0
    while client.cluster.health()["status"] != "green":
        t.set_description(client.cluster.health()["status"])
        current_progress = client.cluster.health()["active_shards_percent_as_number"]
        t.update(current_progress - last_progress)
        last_progress = current_progress
        recovery_status = client.cat.recovery(index=index, params={"h": "index,shard,translog_ops_percent,files_percent,stage", "v": "true"})
        output = []
        for idx, item in enumerate(recovery_status.split('\n')):
            if idx == 0:
                output.append(item)
            else:
                output.append(item) if not item.endswith('done') else None
        if len(output) > 1:
            print('\n'.join(output))
        time.sleep(2)
    t.set_description(client.cluster.health()["status"])
    t.update(100 - last_progress)
    t.close()

def get_all_indices():
    return client.indices.get_alias(index_pattern) 

def test_put_index():
    index = "test_candidatev2_chunk_{chunk}"
    body = ''
    for chunk in range(0, 10):
        client.indices.create(index=index.format(chunk=chunk), body=body)


if __name__ == "__main__":
    # test_put_index()
    main()

到了这里,关于elasticsearch使用脚本 滚动关闭索引,更新index setting的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Pycharm一直更新索引(Scanning files to index)的解决办法

    之前打开pycharm的时候,下方的进度条一直显示在扫描文件,如下图所示,当时看了很多教程也没解决问题。 但奇怪的是,并不是打开每个项目都会花费这么长时间来sacnning files to index,有的很快,一分钟就好了,有的很慢,半个小时都索引不完。 后来的有一天,我突然想明

    2024年04月26日
    浏览(34)
  • ElasticSearch 关闭索引

     ElasticSearch  数据库作为日志服务器收集日志,随着时间,日志量增加,集群会变得越来越慢,我们就需要去关闭历史的索引,每天去执行计划任务关闭索引,如果日志量过大也可以删除日志

    2024年02月16日
    浏览(33)
  • elasticsearch中创建索引模板_index_template

            索引模版是创建索引时自动应用提前设置好的settings、mappings和aliases,通过索引的名称进行匹配。         对索引模版的更改时不会影响目前已经根据索引模版创建的索引。         使用索引模版可以省去创建索引时再次指定settings、mappings、aliases的步骤,

    2024年02月01日
    浏览(73)
  • Elasticsearch Index Templates(索引模板),如何实现分布式锁

    “type”: “keyword” }, “created_at”: { “type”: “date”, “format”: “” } } } } } 代码@1:触发条件。 代码@2:索引配置定义。 代码@3:索引映射配置。 上述示例对应的JAVA示例如下: public static final void createIndexTemp() { RestHighLevelClient client = EsClient.getClient(); try { PutIndexTemplateRequ

    2024年04月22日
    浏览(37)
  • Elasticsearch之Index Setting:索引的静态配置与动态配置

            索引的配置项按是否可以更改分为静态配置与动态配置,所谓的 静态配置即索引创建后不能修改。 索引静态配置  【索引创建后不能修改】 index.number_of_shards:索引分片的数量。在ES层面可以通过es.index.max_number_of_shards属性设置索引最大的分片数,默认为1024,in

    2024年02月16日
    浏览(47)
  • Elasticsearch 索引文档时create、index、update的区别【学习记录】

    本文基于elasticsearch7.3.0版本。 一、思维导图 elasticsearch中create、index、update都可以实现插入功能,但是实现原理并不相同。 二、验证index和create 由上面思维导图可以清晰的看出create、index的大致区别,下面我们来验证下思维导图中的场景: 1、首先明确一点:如何指定是creat

    2024年01月20日
    浏览(46)
  • ElasticSearch第十六讲 ES 索引模板Index Template与Dynamic Template

    Index Templates可以帮助你设定Mappings和Settings,并按照一定的规则,自动匹配到 新创建的索引之上。模版仅在一个索引被新创建时,才会产生作用。修改模版不会影响已创建的索引,你可以设定多个索引模版,这些设置会被“merge”在一起,你可以指定“order”的数值,控制“

    2024年02月15日
    浏览(38)
  • es elasticsearch 九 索引index 定制分词器 type结构后期弃用原因 定制动态映射 动态映射模板 零停机重建索引

    目录 索引index 定制分词器 Type底层结构及弃用原因 定制 dynamic mapping 定制dynamic mapping template 动态映射模板 零停机重建索引 生产环境应该度别名数据 索引index Put /index Stings 分片 Mapping 映射 Aliases 别名 增加 Put my_index2 {        \\\"settings\\\":{           \\\"number_of_shards\\\":3,      

    2024年02月06日
    浏览(40)
  • elasticsearch索引操作,索引创建、索引更新、索引删除

    创建索引 更新索引,添加字段 注意更新索引时与创建索引大致一样,只是更新索引时候的url不同,需要在后面加一个 _mapping 路径,同时请求的json里面不需要 mappings 路径,只需要 properties 即可 更新索引,修改配置 同理在更新setting的时候和更新maping的时候一样 获取索引结构

    2024年02月11日
    浏览(44)
  • Elasticsearch集群索引写入失败[FORBIDDEN/12/index read-only / allow delete (api)]处理流程

    操作系统:CentOS 7.3 软件版本:elasticsearch-6.7.2 正常将数据写入到Elasticsearch时,发现写入失败,出现如下报错 检查Elasticsearch集群的active master节点的日志,并没有发现error,但有WARN告警,显示与 flood stage disk watermark [90%] 有关。 上下文有 low disk watermark [80%] 的INFO日志信息,再次

    2024年02月10日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包