Datax同步MySQL到ES

这篇具有很好参考价值的文章主要介绍了Datax同步MySQL到ES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、在MySQL中建表

  • 建表语句

    • CREATE TABLE `user` (
        `id` int(11) NOT NULL,
        `name` varchar(255) DEFAULT NULL,
        `age` varchar(255) DEFAULT NULL,
        `create_date` datetime DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
  • 插入数据

    • INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (1, '小明1', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (2, '小明2', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (3, '小明3', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (4, '小明4', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (5, '小明5', '23','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (6, '小明6', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (7, '小明7', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (8, '小明8', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (9, '小明9', '23','2023-06-02 11:26:05');
      

2、在ES建立索引

  • 建立索引语句

    • 我这里使用Kibana工具连接ES进行操作的,也可以使用Postman进行操作

    • Kibana操作语句

      # 创建索引
      PUT /user
      {
          "mappings" : {
            "properties" : {
              "id" : {
                "type" : "keyword"
              },
              "name" : {
                "type" : "text"
              },
              "age" : {
                "type" : "keyword"
              },
              "create_date" : {
                "type" : "date",
                "format": "yyyy-MM-dd HH:mm:ss"
              }
      	}	
        }
      }
      
    • Postman操作语句

      • 地址输入
      http://localhost:9200/user
      

datax mysql到es,ETL,mysql,elasticsearch,数据库
Json文本输入

    {
        "mappings" : {
          "properties" : {
            "id" : {
              "type" : "keyword"
            },
            "name" : {
              "type" : "text"
            },
            "age" : {
              "type" : "keyword"
            },
            "create_date" : {
              "type" : "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            }
    	}	
      }
    }

  • 当出现以下信息代表创建索引成功
    {
        "acknowledged": true,
        "shards_acknowledged": true,
        "index": "user"
    }
    

3、构建从MySQL到ES的Datax的Json任务

[root@hadoop101 ~]# vim mysql2es.json
# 添加以下内容
{
  "job": {
    "setting": {
      "speed": {
        "channel": 8
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "id",
              "name",
              "age",
              "date_format(create_date,'%Y-%m-%d %H:%I:%s')"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://192.168.xx.xxx:3306/bigdata"
                ],
                "table": [
                  "user"
                ]
              }
            ],
            "password": "xxxxxx",
            "username": "root",
            "where": "",
            "splitPk": "id"
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://192.168.xx.xxx:9200",
            "accessId": "root",
            "accessKey": "root",
            "index": "user",
            "type": "_doc",
            "settings": {"index" :{"number_of_shards": 5, "number_of_replicas": 1}},
            "batchSize": 5000,
            "splitter": ",",
            "column": [
              {
                "name": "pk",
                "type": "id"
              },
              {
                "name": "id",
                "type": "keyword"
              },
              {
                "name": "name",
                "type": "text"
              },
              {
                "name": "age",
                "type": "keyword"
              },
              {
                "name": "create_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
              }
            ]
          }
        }
      }
    ]
  }
}
  • 参数介绍

    • reader:datax的source(来源)端
    • reader.cloumn::读取mysql的字段名
    • reader.connection.jdbcUrl:MySQL连接的url
    • reader.connection.table:读取MySQL的表名
    • reader.password:连接MySQL的用户名
    • reader.username:连接MySQL的密码
    • reader.where:读取MySQL的过滤条件
    • reader.splitPk:读取MySQL时按照哪个字段进行切分
    • writer:datax的sink(去处)端
    • writer.endpoint:ElasticSearch的连接地址
    • writer.accessId:http auth中的user
    • writer.accessKey:http auth中的password

    注意:假如Elasticsearch没有设置用户名,也需要给accessId和accessKey值,不然就报错了,可以给赋值root,root

    • writer.index:Elasticsearch中的index名

    • writer.type:Elasticsearch中index的type名

    • writer.settings:创建index时候的settings, 与Elasticsearch官方相同

    • writer.batchSize:每次批量数据的条数

    • writer.splitter:如果插入数据是array,就使用指定分隔符

    • writer.column:Elasticsearch所支持的字段类型

      • 下面是column样例字段类型
      "column": [
                    # 使用数据库id作为es中记录的_id,业务主键(pk):_id 的值指定为某一个字段。
                    {"name": "pk", "type": "id"}, 
                    { "name": "col_ip","type": "ip" },
                    { "name": "col_double","type": "double" },
                    { "name": "col_long","type": "long" },
                    { "name": "col_integer","type": "integer" },
                    { "name": "col_keyword", "type": "keyword" },
                    { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
                    { "name": "col_geo_point", "type": "geo_point" },
                    # ES日期字段创建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
                    { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
                    { "name": "col_nested1", "type": "nested" },
                    { "name": "col_nested2", "type": "nested" },
                    { "name": "col_object1", "type": "object" },
                    { "name": "col_object2", "type": "object" },
                    { "name": "col_integer_array", "type":"integer", "array":true},
                    { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
                  ]
      
    • 如果时间类型需要精细到yyyy-MM-dd HH:mm:ss.SSSSSS,比如时间为:2023-06-02 13:39:57.000000

      • MySQL的cloumn端填写
      "date_format(create_date,'%Y-%m-%d %H:%I:%s.%f')"
      
      • Elasticsearch的cloumn填写
      { "name": "create_date", "type": "text"}
      
      • Elasticsearch创建索引时修改如下
      { "name": "create_date","type": "text"}
      

4、运行mysql2es.json脚本

  • 问题1: ConfigParser - 插件[mysqlreader,elasticsearchwriter]加载失败
    datax mysql到es,ETL,mysql,elasticsearch,数据库

  • 解决问题:

    下面是编译好的elasticsearchwriter插件,放在DATAX_HOME/plugin/writer目录下面,就可以运行成功了,也可以自己从官网下载,然后自己编译好也可以用的

    • elasticsearchwriter百度网盘资源下载链接:
    链接:https://pan.baidu.com/s/1C_OeXWf_t5iVNiLquvEhjw 
    提取码:tutu 
    
  • 问题2:在运行从MySQL抽取500万条数据到Elasticsearch时,出现了datax传输200多万条数据卡住的情况,报错日志:I/O Exception … Broken pipe (write failed)

datax mysql到es,ETL,mysql,elasticsearch,数据库

  • 解决问题:

    datax传输mysql到es卡在200多万这个问题和channel: 8和es的oom有一定的关系

    • 测试样例

    • 1、es batchsize为5000,不开启mysql切分"splitPk": “id”,卡住

    • 2、es batchsize为1000,不开启mysql切分"splitPk": “id”,不卡住

    • 3、es batchsize为5000,开启mysql切分"splitPk": “id”,不卡住

  • 总结:这个问题和mysql读取速率,es 写入速率有关,开启切分提高一下读取速率就不会卡住了

  • 成功运行截图:
    datax mysql到es,ETL,mysql,elasticsearch,数据库

以下是工作中做过的ETL,如有需要,可以私信沟通交流,互相学习,一起进步

datax mysql到es,ETL,mysql,elasticsearch,数据库文章来源地址https://www.toymoban.com/news/detail-540743.html

到了这里,关于Datax同步MySQL到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控) 2.datax版本:自己编译的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个

    2023年04月23日
    浏览(33)
  • 5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)

    1.1 DataX概述 源码地址:https://github.com/alibaba/DataX 1.2 DataX支持的数据源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图。 2.1 DataX设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星

    2024年02月11日
    浏览(37)
  • DataX mysql同步到mysql

    创建数据源 配置数据库相关信息 创建执行器 配置执行器执行地址相关信息 1.1 SQL语句 (querySql) 在json文件中此部分配置就是 querySql 在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就

    2024年02月09日
    浏览(22)
  • 【ElasticSearch】ES与MySQL数据同步方案及Java实现

    elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。 操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用 同步调用方式

    2024年02月15日
    浏览(38)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(38)
  • 使用Logstash同步mysql数据到Elasticsearch(亲自踩坑)_将mysql中的数据导入es搜索引擎利用logstash(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月28日
    浏览(38)
  • DataX将MySQL数据同步到HDFS中时,空值不处理可以吗

    DataX将MySQL数据同步到HDFS中时,空值(NULL)存到HDFS中时,默认是存储为空字符串(‘’)。 HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为N。所以

    2024年02月12日
    浏览(36)
  • DataX-阿里开源离线同步工具在Windows上实现Sqlserver到Mysql全量同步和增量同步

    Kettle-开源的ETL工具集-实现SqlServer到Mysql表的数据同步并部署在Windows服务器上: Kettle-开源的ETL工具集-实现SqlServer到Mysql表的数据同步并部署在Windows服务器上_etl实现sqlserver报表服务器_霸道流氓气质的博客-CSDN博客 上面讲过Kettle的使用,下面记录下阿里开源异构数据源同步工具

    2024年02月08日
    浏览(33)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(35)
  • ELK(Elasticsearch、Kibana、Logstash)以及向ES导入mysql数据库数据或CSV文件数据,创建索引和可视化数据

    地址:Past Releases of Elastic Stack Software | Elastic 在Products和version处分别选择需要下载的产品和版本,E(elasticsearch)L(logstash)K(kibana)三者版本必须相同 将下载好的elk分别解压到相同路径下 本文中elasticsearch=E=ES=es;L=logstash;K=kibana 一般情况下使用默认配置即可,下面对我的

    2024年02月15日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包