elasticsearch升级和索引重建。

这篇具有很好参考价值的文章主要介绍了elasticsearch升级和索引重建。。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.背景描述

es在本公司承载三个部分的业务,站内查询,订单数据统计,elk日志分析。

2020年团队决定对elasticsearch升级。es(elasticsearch缩写,下同)当前版本为1.x,升级到5.x版本。

5.x支持如下新特性:

支持lucene 6.x,磁盘空间少一半,索引时间少一半,查询性能提升25%

Java rest client (high level api)

Painless 脚本相比groovy脚本,更安全,更简洁,更好的性能

 

对于站内查询和订单数据统计,当前业务架构是

mysql -> canal -> kafka -> (es Index server) -> es

(可以考虑使用kafka connector 代替canal)

1.1 如何配置 mysql -> canal -> kafka

1.1.1. 配置mysql

开启binlog

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权给canal用户,让其有复制权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

1.1.2 配置canal

下载 https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

修改 conf/canal.properties

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka # 由kafka消费

kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

修改 conf/example/instance.properties

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=mysql_test # 同步的数据库

# mq config
canal.mq.topic=canal_topic # 在kafka的topic

启动canal

./bin/start.sh

1.1.2 启动zookeeper 和 kafka

brew services start zookeeper
brew services start kafka

1.1.3 测试

在db中添加数据,可以使用kafka 脚本看到同步数据

INSERT INTO `mysql_test`.`user` (`id`, `name`) VALUES ('6', 'Bob');

➜  bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canal_topic
{"data":[{"id":"6","name":"Bob","age":null}],"database":"mysql_test","es":1684221427000,"id":5,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(32)","age":"int"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"user","ts":1684221427082,"type":"INSERT"}

2.难点

1.在升级的时候如何不影响当前业务。

2.如果升级失败,能够快速进行回滚。

3.具体步骤

主体方案采用双写机制

3.1.部署es新集群

下载5.x版本的es,在新的机器上部署新的集群。

配置机器:

disable swapping:swapoff -a

内存锁定:mlockall: true

修改文件句柄数:ulimit -a

分配一半的内存给es,留下一半的内存给文件系统, ES_JAVA_OPTS="-Xms16g -Xmx16g"

3.2.pull代码,升级代码到es新版本

由于从1.x到5.x版本跨度比较大,许多java api都发生了变化,需要修复。  

常见字段类型修改:  text/keyword 代替 string

不再支持type类型

java api alias语义变化

3.3.重建索引

  我们使用索引重建程序来新建索引。重建索引具体步骤如下,我们称线上索引为online index, 新创建的索引为new index。

  3.3.1.init

    刷新索引名映射关系,检查当前alias只有一个物理索引。

    根据预定义的mapping,创建索引new index。

    设置在线索引记录数据变更日志,即记录线上索引消费kafka数据,并存储为change log文件.

  3.3.2.全量索引数据库上的数据到new index

    从mysql查出数据同步到es中,如果有多个分表,就按照表顺序同步。可以开启多线程批量插入。

  3.3.3.对new index索引优化

    refresh, flush 索引。调用force-merge api,进行段合并。

  3.3.4.重放change log到new index中

    根据change log 转换为es query,写入到new index。    

  3.3.5.暂停线上索引的写入

    因为online index和new index 使用的是相同的kafka consumer group,所以必须停掉online index的消费功能。

  3.3.6.关闭change log

    停止记录在线索引记录数据变更日志。

  3.3.7.第二阶段重放change log

    根据change log 转换为es query,写入到new index。 

  3.3.8.删除change log 

    删除线索引记录数据变更日志。

  3.3.9.设置副本数 

    new index创建索引的时候默认副本数为0,现在动态调整副本数为业务需要的值。比如对现实搜索业务设置两个副本,对订单统计类索引不需要副本。

PUT /new_index/_settings
{
    "number_of_replicas": 2
}

    此阶段可能会比较耗时,需要等待几分钟才能进行下一步操作。更好的做法是调用health api 查看分片状态。

GET _cluster/health

{
  "cluster_name" : "testcluster",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0, // 重新定位的分片
  "initializing_shards" : 0, // 初始化中的分片
  "unassigned_shards" : 1, // 未分配的分片
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50.0
}

  3.3.10.别名切换 

POST /_aliases
{
    "actions": [
        { "remove": { "index": "online_index", "alias": "my_index" }},
        { "add":    { "index": "new_index", "alias": "my_index" }}
    ]
}

  3.3.11.运行在线索引 (从kafka里面读取数据)

    new_index 开始从kafka里面消费最新数据。由于之前的操作可能会有延时,需要等待几分钟才能同步到最新数据。

  3.3.12.删除旧的索引

    删除old_index

详细代码步骤如下

        // 1.init
        logger.info("初始化");
        ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
        logger.info("刷新索引名映射关系");
        if (!indexContext.refreshIndexName()) {
            throw new IndexException("刷新索引映射关系失败");
        }

        rebuildIndexName = indexContext.getPhysicalRebuildIndexName();

        logger.info("初始化重建索引环境,当前重建索引名:" + rebuildIndexName);
        logger.info("创建索引,索引名:" + rebuildIndexName);
        boolean isCreate = false;
        try {
            isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
        } catch (Throwable t) {
            logger.info("创建索引失败,本次失败可以不处理,将会自动重试 ...");
        }

        logger.info("设置在线索引记录数据变更日志");
        indexContext.startChangeLog();

        // 2. 重建索引
        logger.info("全量索引数据库上的数据 ...");
        long startRebulidTime = System.currentTimeMillis();
        rebuild();
        logger.info(" ------  完成全量索引数据库上的数据,对应索引" + rebuildIndexName + ",耗时" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
            + " 秒    ------  ");

        // 3. 索引优化 -- 是否调到变更重放完毕后做优化
        logger.info("优化索引 ...");
        long startOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
        logger.info(" ------  完成" + rebuildIndexName + "索引优化,耗时 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
            + " 秒    ------  ");

        // TODO 字符集设置
        BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));

        // 4. 重放变更日志
        logger.info("重放本地数据变更日志[第一阶段] ...");
        long startReplay1Time = System.currentTimeMillis();
        int replayChangeLogCount = replayChangeLogFirst(logReader);
        logger.info(" ------  完成[第一阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
            + ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒    ------  ");

        // 5. 暂停在线索引
        logger.info("暂停在线索引");
        indexContext.pauseOnlineIndex();
        isPauseOnline.set(true);

        // 6. 设置 在线索引只做索引更新 以及 关闭 change log
        logger.info("停止变更日志");
        indexContext.stopChangeLog();

        // 7. 继续重放 change log
        logger.info("重放本地数据变更日志[第二阶段] ...");
        long startReplay2Time = System.currentTimeMillis();
        replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
        if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
            logger.error("变更日志,处于错误的状态,统计的日志行数:" + indexContext.getWriteChangeLogCount() + ", 但实际只有:" + replayChangeLogCount);
        }
        logger.info(" ------  完成[第二阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
            + ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒    ------  ");

        // 8. 删除变更日志, OnlineIndex.startChangeLog 有做环境清理,这里不执行
        logger.info("简单优化索引 ...");
        long startSimpleOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);

        logger.info(" ------  完成" + rebuildIndexName + "索引简单优化,耗时 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
            + " 秒    ------  ");

        // 9. 设置副本数 (怀疑比较耗时~~~待确认)
        logger.info("设置副本数 ...");
        int replicas = 3;
        if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
            replicas = 1;
        } else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
            replicas = 2;
        } else {
            String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
            if (NumberUtils.isNumber(replicasStr)) {
                replicas = NumberUtils.toInt(replicasStr);
            }
        }
        ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);

        // 执行索引切换流程
        // 预发、线上环境阻塞等待2分钟同步数据后,再执行索引切换和删除旧索引逻辑
        try {
            if(IDCUtil.isBuildOrProduction()){
                Thread.sleep(120 * 1000);
            }
        } catch (InterruptedException e) {
        }
        // 10. 别名切换
        logger.info("索引切换:将" + rebuildIndexName + "设置为线上索引");
        if (!indexContext.switchIndex(rebuildIndexName)) {
            throw new IndexException("索引切换失败:将" + rebuildIndexName + "设置为线上索引失败");
        }

        // 11. 运行在线索引
        logger.info("运行在线索引");
        indexContext.keepRuningOnlineIndex();
        isPauseOnline.set(false);

        // 12. 删除原有在线索引
        String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
        logger.info("删除原有在线索引,索引名:" + oldOnlineIndexName);
        if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
            throw new IndexException("删除索引失败,索引名:" + oldOnlineIndexName);
        }

思考

如果只是简单地新建索引,完全可以这样做(使用不同的消费组) 

  1.记录时间戳 

  2.全量索引数据的数据

  3.根据前面的时间戳找到kafka中的下标,下标得时间戳必须 < 记录的时间戳    

  sh kafka_2.11-2.3.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list broker1:9092,broker2:9092 -topic topicName -time 1585186237000

  4.根据上一步的下标开始索引数据

3.4.使用新集群进行业务测试

部署新的客户端服务调用新的es集群,检查业务是否正常。对站内查询检查搜索结果是否一致,对统计类查询查看统计结果是否一致。

针对不同业务场景下,做不同的测试

1.对比新老集群,索引数据量是否一致

2.搜索业务,查看热门关键词搜索结果

3.统计业务,对比索引数据量,常用聚合统计查询结果是否一致

4.对于elk业务,可以单独升级

3.5.发布线上客户端搜索代码,修改es地址为新集群地址

  上线,观察业务是否稳定。

3.6.下线旧的es集群

  释放旧的es集群的资源。

4.总结

  es升级这份工作是两年之前做的,现在来进行总结,部分细节可能会有疏漏。但是总结起来,依然后很多收获,从架构,代码细节上都有改进的空间。es重建代码可以做得更通用,然后开源出来。文章来源地址https://www.toymoban.com/news/detail-414294.html

到了这里,关于elasticsearch升级和索引重建。的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ES 如何重建索引

            ES索引中,为了效率和存储空间,有些字段可以设定为不被索引,然后某一天又需要改成能索引,此时就需要对ES进行重建索引,操作如下 cd /data/elk/logstash/es-template/ vim event.json 将 mask以及其他需要放开查询的字段的\\\"index\\\": false 去掉(注意需要把上方\\\"type\\\": \\\"keyword\\\",的逗

    2024年02月11日
    浏览(52)
  • 【数据处理】建立数据库索引并定时重建索引

    给表 建立索引能加速查询 (我的习惯是给经常查询的列建立索引,如果经常查询的是id列,我会给将id设置为主键),长时间查询后会变慢(具体原因目前不清楚),公司前辈说 定期重建索引就可以解决问题 ,我就在 Microsoft Sql Server Management Studio 里设置了定时“自动重建索

    2024年01月25日
    浏览(48)
  • ES索引重建reindex详解

    1. 分片数变更 :当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。 2. mapping字段变更 :当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;

    2024年02月06日
    浏览(45)
  • 【HBZ分享】ES中的Reindex重建索引

    滚动索引 + 批量复制 如果新的索引没有提前创建好,并指定字段类型,那么重建后的新索引类型极有可能会和旧的索引不一致,因为ES他会推断类型,而推断错误率从实战来说那是相当的高 字段类型设置错了 旧的索引分片不合理,想重新分 某批数据存错了,或只想保留具备

    2024年02月13日
    浏览(41)
  • ES索引修改mappings与重建reindex详解之修改字段类型

    elasticsearch一直在使用,这里总结一下mappings的修改方法,分为两种情况: 增加新的字段,这种很简单; 修改已有的字段类型,这种就比较麻烦了,需要reindex,对索引进行迁移重建。 1.1、获取mappings 增加一个 new_stocks 字段,如下: 再查一下: 可以看到new_stocks字段已经加上去

    2024年02月17日
    浏览(44)
  • 基于JAVA公司介绍网站设计与实现(Springboot框架) 研究背景与意义、国内外研究现状

     博主介绍 :黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程,免费 项目配有对应开发文档、开题报告、任务书、

    2024年02月03日
    浏览(43)
  • kibana重建es索引报错request body is required

    业务需要把mysql的数据同步到es,用es来查。公司用于同步mysql和es的组件,在mysql表新增字段时会对同步的es索引新增字段,但新增的字段类型可能不是我们想要的,因为es不支持索引字段类型的修改和删除,这时就需要重建es索引。这里的重建索引简单说就是新建一个字段正确

    2024年02月16日
    浏览(55)
  • openGauss学习笔记-204 openGauss 数据库运维-常见故障定位案例-重建索引失败

    204.1 重建索引失败 204.1.1 问题现象 当Desc表的索引出现损坏时,无法进行一系列操作,可能的报错信息如下。 204.1.2 原因分析 在实际操作中,索引会由于软件或者硬件问题引起崩溃。例如,当索引分裂完发生磁盘空间不足、出现页面损坏等问题时,会导致索引损坏。 204.1.3 处

    2024年01月24日
    浏览(48)
  • 替代升级虚拟化 | ZStack Cloud云平台助力中节能镇江公司核心业务上云

    数字经济正加速推动各行各业的高质量升级发展,云计算是数字经济的核心底层基础设施。作为云基础软件企业,云轴科技ZStack 坚持自主创新,自研架构,产品矩阵可全面覆盖数据中心云基础设施,针对虚拟化资源实现纳管、替代和升级,涵盖以下四大场景: ZStack 虚拟化平

    2024年02月05日
    浏览(45)
  • ICT通信运营企业的重建之服务升级(二)----ICT技术及产品种类

    ICT服务商要借助作为刚需产品的传统通信网络接入主导阶段和内容流量主导阶段,探索智能化主导的可能性。实际上,这三个阶段在很大程度上反映了 ICT 技术的变迁以及 ICT 业务的核心变化。 网络接入主导阶段作为 ICT 技术及业务起步、探索的时期,业务重心还体现在最根本

    2024年02月16日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包