ELK增量同步数据【MySql->ES】

这篇具有很好参考价值的文章主要介绍了ELK增量同步数据【MySql->ES】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、前置条件

        1.  linux,已经搭建好的logstash+es+kibana【系列版本7.0X】,es 的plugs中安装ik分词器

ES版本:ELK增量同步数据【MySql->ES】,ELK,elk,elasticsearch

 Logstash版本:ELK增量同步数据【MySql->ES】,ELK,elk,elasticsearch

 (以上部署,都是运维同事搞的,我不会部署,同事给力)

二、编写Logstash.sh 执行文件

1、在Logstash安装目录下【/usr/share/logstash】,新建XX.sh,内容如下:

/usr/share/logstash/bin/logstash --path.data /usr/share/logstash/case-conf -e 'input {
 jdbc {
    jdbc_driver_library => "/var/local/logstash/etc/lib/mysql-connector-java-8.0.15.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://IP:端口号/数据库?serverTimezone=Asia/Shanghai&autoReconnect=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&characterEncoding=utf8&useSSL=false&tinyInt1isBit=false"
    jdbc_user => "用户名"
    jdbc_password => "密码"
    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "update_at"
    tracking_column_type => "numeric"
    last_run_metadata_path => "/usr/share/logstash/case-last"
statement_filepath => "/usr/share/logstash/case_.sql"
  }
}

output {
  elasticsearch {
    hosts => ["es1:9206","es2:9207","es3:9208"]
	action=>"index"
    index => "case"
    document_id => "%{case_id}"
    template => "/usr/share/logstash/template-case.json"
	template_name=>"template-case.json"
	template_overwrite=>true
    }
}'

2. 在Logstash安装目录下【/usr/share/logstash】,新建case.sql文件:查询字段,根据业务要求书写,不需要全字段查询

SELECT * FROM 表名称 where update_at > :sql_last_value

where条件中的,update_at 是表更新时间,与case.sh 中的 tracking_column 强关联;默认logstash 每分钟执行一次case.sql ,执行后会生成一个case-last文件,里面记录最后一次执行时间;ELK增量同步数据【MySql->ES】,ELK,elk,elasticsearch

3.  关于output,配置项,使用了自定义创建索引的模板方式,在表中有数据为前提,执行case.sh 会用到模板创建索引;如果不配置,es会默认给创建mapping,默认的话,分词都是英文分词器; 执行效果如下:sh  case.sh后:

ELK增量同步数据【MySql->ES】,ELK,elk,elasticsearch

 注意细节: 

output {
  elasticsearch {
    hosts => []
    action=>"index"
    index => "case"   //索引名称
    document_id => "%{case_id}"  //动态数据,数据ID
    template => "/usr/share/logstash/template-case.json"   //模板文件路径
    template_name=>"template-case.json"            //模板名称
    template_overwrite=>true                                 
    }
}

4. 在Logstash安装目录下【/usr/share/logstash】,新建template-case.json,内容如下:

{
  "template" : "case*",
  "settings" : {
    "index.refresh_interval" : "5s",
    "number_of_replicas":"1",
    "number_of_shards":"1"
  },
  "mappings": {
    "date_detection": false,
    "numeric_detection": false,
    "dynamic_templates": [
      {
        "integers": {
          "match_mapping_type": "long",
          "mapping": {
            "type": "integer"
          }
        }
      },
      {
        "strings": {
          "match_mapping_type": "string",
          "unmatch": "*_en",
          "mapping": {
            "type": "text",
            "analyzer":"ik_max_word",
            "search_analyzer": "ik_smart",
            "fields": {
              "raw": {
                "type":  "keyword",
                "ignore_above": 100
              }
            }
          }
        }
      }
    ]
  }
}

注意:

1、模板名称和case.sh 中的index 匹配;模板中的名称后必须带个*:  "template" : "case*",

2、模板比较简单,关闭了日期动态检测和数字格式动态检测;不然创建索引的时候格式会乱;尤其是日期;比如数据库中字段是varchar,但是存的是yyyy-MM-dd hh:mm:ss 日期格式的话,es创建索引是date格式的;

    "date_detection": false,
    "numeric_detection": false,

3、string格式的字段,默认给转出text, 并且使用ik分词器;排除了_en结尾的字段,存英文的字段建表的时候注意下,使用默认分词就好;

   "unmatch": "*_en"

三、Kibana验证

1、GET case_/_mapping:

{
  "case" : {
    "mappings" : {
      "dynamic_templates" : [
        {
          "integers" : {
            "match_mapping_type" : "long",
            "mapping" : {
              "type" : "integer"
            }
          }
        },
        {
          "strings" : {
            "unmatch" : "*_en",
            "match_mapping_type" : "string",
            "mapping" : {
              "analyzer" : "ik_max_word",
              "fields" : {
                "raw" : {
                  "ignore_above" : 100,
                  "type" : "keyword"
                }
              },
              "search_analyzer" : "ik_smart",
              "type" : "text"
            }
          }
        }
      ],
      "date_detection" : false,
      "numeric_detection" : false,
      "properties" : {
        "@timestamp" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword",
              "ignore_above" : 100
            }
          },
          "analyzer" : "ik_max_word",
          "search_analyzer" : "ik_smart"
        },
        "@version" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword",
              "ignore_above" : 100
            }
          },
          "analyzer" : "ik_max_word",
          "search_analyzer" : "ik_smart"
        },
        "apply_education" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword",
              "ignore_above" : 100
            }
          },
          "analyzer" : "ik_max_word",
          "search_analyzer" : "ik_smart"
        },  
        "apply_major_en" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "apply_school_name" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword",
              "ignore_above" : 100
            }
          },
          "analyzer" : "ik_max_word",
          "search_analyzer" : "ik_smart"
        },
        "apply_school_name_en" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
     
        "begin_learn_time_at" : {
          "type" : "integer"
        },
  
        "case_id" : {
          "type" : "integer"
        },     
   
        "case_result_time_at" : {
          "type" : "integer"
        },
    
        "country_name" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword",
              "ignore_above" : 100
            }
          },
          "analyzer" : "ik_max_word",
          "search_analyzer" : "ik_smart"
        },
  
   
    
        "school_id" : {
          "type" : "integer"
        },
          
        "update_at" : {
          "type" : "integer"
        }
      }
    }
  }
}

查看,映射字段是否满足要求;

2.  验证索引分词结果:

GET case/_analyze
{
  "field": "apply_education",
  "text": "马斯特里赫特大学" 
}

分词结果:

{
  "tokens" : [
    {
      "token" : "马斯特里赫特",
      "start_offset" : 0,
      "end_offset" : 6,
      "type" : "CN_WORD",
      "position" : 0
    },
    {
      "token" : "马斯",
      "start_offset" : 0,
      "end_offset" : 2,
      "type" : "CN_WORD",
      "position" : 1
    },
    {
      "token" : "特里",
      "start_offset" : 2,
      "end_offset" : 4,
      "type" : "CN_WORD",
      "position" : 2
    },
    {
      "token" : "赫",
      "start_offset" : 4,
      "end_offset" : 5,
      "type" : "CN_CHAR",
      "position" : 3
    },
    {
      "token" : "特大",
      "start_offset" : 5,
      "end_offset" : 7,
      "type" : "CN_WORD",
      "position" : 4
    },
    {
      "token" : "大学",
      "start_offset" : 6,
      "end_offset" : 8,
      "type" : "CN_WORD",
      "position" : 5
    }
  ]
}

验证完成;

四、相关资料

1. Elastic:开发者上手指南_elastic.show_Elastic 中国社区官方博客的博客-CSDN博客

2. Mutate filter plugin | Logstash Reference [8.8] | Elastic

五、注意事项

1. 此种方法,只能针对表数值为逻辑删除的情况,若业务是物理删除,则需要同步删除索引中的数据;

2. 所有update 操作,都需要同时修改 update_at 字段文章来源地址https://www.toymoban.com/news/detail-516610.html

到了这里,关于ELK增量同步数据【MySql->ES】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [Java Framework] [ELK] Spring 整合ES (ElasticSearch7.15.x +)

    ElasticSearch7.15.x 版本后,废弃了高级Rest客户端的功能 2.1 配置文件 2.2 配置类 3.1 索引的相关操作 3.2 实体映射相关操作 3.2.1 创建实体类 3.2.2 Doc实体操作API 3.3 聚合相关操作 3.3.1 创建实体类 3.3.2 创建操作类 [1] Elasticsearch Clients [2] Elasticsearch Clients - Aggregations

    2023年04月08日
    浏览(46)
  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(60)
  • ELK实战,Linux版docker安装ElasticSearch、ES-head、Logstash、Kiabana入门,无坑详细图解

            项目需要,记录一次ELK日志分析系统无坑初始安装过程,并给大家整理出了操作elasticsearch的主要命令,elasticsearch!伙伴们都懂得哦!别的不多说,看过内容概览,直接开整!!! 1-1 修改/etc/security/limits.conf limits.conf文件限制着用户可以使用的最大文件数,最大线

    2023年04月09日
    浏览(37)
  • 【ElasticSearch】ES与MySQL数据同步方案及Java实现

    elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。 操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用 同步调用方式

    2024年02月15日
    浏览(51)
  • ELK之从Logstash读取数据到Elasticsearch

    前置条件: Elasticsearch 集群正常 Elasticsearch集群配置直通车:ELK之Elasticsearch7.17.4安装(yum方式)和三节点集群配置 Filebeat和logstash打通 ELK之LogStash接收Filebeat的数据:ELK之LogStash接收Filebeat的数据 修改Logstash 配置文件,将output改为如下地址,注释掉控制台输出,添加elasticsearc

    2024年01月22日
    浏览(47)
  • 【ELK03】ES 索引的Mapping映射详解、数据类型和settings属性设置

    映射(MAPPING)就是es中一个决定了文档如何存储,如何生成索引,字段各种类型定义的过程.类似于我们在关系型数据库中创建一个 表格数据之前先定义表格有哪些字段,每个字段是什么类型 ,然后数据会按照这个配置写入表格,ES中同样是这个过程,它由两种映射组成.一个是 动态映射

    2024年02月03日
    浏览(54)
  • ELK日志系统实战(五):安装vector并将数据输出到es、clickhouse案例

    目录 安装vector(0.22.0) 方式一:docker安装 方式二:手动下载安装包 编写配置文件 1.从syslog将日志进

    2024年02月06日
    浏览(48)
  • elk的时候logstash传送elasticsearch的数据一直失败报错403

          message: \\\"blocked by: [FORBIDDEN/8/index write (api)];: [cluster_block_exception] blocked by: [FORBIDDEN/8/index write (api)];\\\" kibana中输入执行下边的 PUT /_all/_settings {   \\\"index.blocks.write\\\": null } logstash.outputs.elasticsearch] Encountered a retryable error. Will Retry with exponential backoff  {:code=403, :url=  以上问题最主要

    2024年02月15日
    浏览(44)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(53)
  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)_将mysql中的数据导入es搜索引擎利用logstash(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月28日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包