数据集成工具 ---- datax 3.0

这篇具有很好参考价值的文章主要介绍了数据集成工具 ---- datax 3.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、datax:

        是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

2、参考网址文献:

https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

3、Datax的框架设计:

数据集成工具 ---- datax 3.0,数据集成工具,大数据

Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

        1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

        2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

        3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

4、Datax的核心架构:

Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

数据集成工具 ---- datax 3.0,数据集成工具,大数据

模块的核心介绍:

        1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

        2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

        3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

        4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

        5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

5、Datax的核心优势:

        1、可靠的数据质量监控

        2、丰富的数据转换功能

        3、精准的控制速度

        4、容错机制:

                1、线程内部重试

                        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                2、线程级别重试

                        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

        5、极简的体验

6、Datax与Sqoop的区别:
功能 Datax sqoop
运行模式 单进程多线程 MR
分布式 是不支持分布式 支持
流控 需要定制
统计信息 支持 不支持,分布式的数据收集不方便
数据校验 只有core部分有校验功能 不支持,分布式的数据收集不方便
监控 需要定制 需要定制
7、Datax部署:

1、下载jar包:

        下载路径:https://github.com/alibaba/DataX

2、解压文件,配置环境变量:  

#解压jar包
tar -zxvf datax.tar.gz

#配置环境变量:
vim /etc/profile


export   DATAX_HOME=/user/loacl/soft/datax
export   PATH=.:$PATH:$DATAX_HOME/bin

#配置好环境变量,让配置文件生效
source /etc/profile

3、使执行文件拥有执行权:

添加执行权:

chmod +x  data.py
数据集成工具 ---- datax 3.0,数据集成工具,大数据8、Datax的使用:
在datax中会自动的生成模板的命令:

datax.py -r streamreader -w streamwriter
        1、streamreader  to  streamwriter,数据打印在控制台上面

参数说明:

"sliceRecordCount": 100  #指定打印的个数

"channel": 1  #指定并发度
#创建json文件:

vim streamreadertostreamwriter.json


{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"wyz"
                            },
                            {
                                "type":"int",
                                "value":"18"
                            }
                        ], 
                        "sliceRecordCount": 100  #指定打印的个数
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1  #指定并发度
            }
        }
    }
}



#脚本执行命令:
datax.py streamreadertostreamwriter.json
        2、mysql to mysql
1、可以通过命令获取模板:
datax.py  -r mysqlreader -w mysqlwriter 

2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细


3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
    例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""      #不是必须要写的,作用是可以在读数据时进行一次过滤
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", 
                                "table": ["data_test"]
                            }
                        ], 
                        "password": "123456", 
                        "preSql": [],   #不是必须写的,作用是再写入数据前可以执行该sql
                        "session": [], 
                        "username": "root", 
                        "writeMode": "insert"   #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}




#执行脚本:
datax.py   mysqlreaderTomysqlwriter.json

 将数据写入到mysql时,写入的表是需要提前创建的。数据集成工具 ---- datax 3.0,数据集成工具,大数据

        3、mysql to hdfs

参数解释:

使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc

"fileType": "text",  #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile

"compress": "", #指定文件的压缩形式,不指定代表不用压缩,
                text支持的压缩方式:gzip,bzip2,
                orc支持的压缩方式有NONE和SNAPPY

 "writeMode": "append"   #表示的是数据在写入的操作,分成三种:
                            append,写入前不做任何处理  
                            nonconflit 如果文件存在,直接报错  
                            truncate:如果文件存在,那就先删除在写入

"path": "/bigdata25/datax/datax_mysqltohdfs/"  #文件不存在是需要提前创建的

vim mysqlTohdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "stu_mysqltohdfs", 
                        "fileType": "text",  
                        "path": "/bigdata25/datax/datax_mysqltohdfs/", 
                        "writeMode": "append"                         												
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

#执行脚本:
datax.py mysqlTohdfs.json
        4、mysql to hive

 原理思想:

实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
原理:
	创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

前期准备:

前期准备:
启动hive(后台启动):nohup hive --service metastore &
连接hive:hive

创建hive表(在没有说明的情况下一般在都是创建一个外部表)
创建一个datax数据库:
	create database datax;
切换数据库:use datax
创建外部表:
	create external table if not exists datax_mysqltohive(
    	 id string,
         name string,
         age int,
         clazz string,
		gender string
    		)
row format delimited  fields terminated  by ',' stored as textfile

编写脚本:

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
        5、mysql to hbase(Hbase11XWriter)

参数解释:

"mode": "normal" 写hbase的模式,目前只支持normal模式

"hbaseConfig"  {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式

"table": "writer" 表的名称,大小写比较敏感

"encoding" 编码方式

"rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量


"versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

前期准备:

前期准备:

启动zookeeper:
	zkServer.sh start(每一个节点上都是需要启动的)
查看zk的状态:
	zkServer.sh status 

启动hbase:
	start-hbase.sh

连接hbase:
	sqlline.py master,node1,node2

进入hbase的客户端:hbase shell 

hbase中查看表的命令:!table
退出命令 !quit
查看表:list
在hbase中创建创建表,指定表名和列簇:create 'student','cf1'
				

编写脚本:

vim mysqlTohbase.json



{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                 "writer": {
                      "name": "hbase11xwriter",
                      "parameter": {
                        "hbaseConfig": {
                          "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "NEW_STU",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                              "index":0,
                              "type":"string"
                            },
                            {
                              "index":-1,
                              "type":"string",
                              "value":"_"
                            }
                        ],
                        "column": [
                          {
                            "index":1,
                            "name": "cf1:name",
                            "type": "string"
                          },
                          {
                            "index":2,
                            "name": "cf1:age",
                            "type": "int"
                          },
                          {
                            "index":3,
                            "name": "cf1:clazz",
                            "type": "string"
                          },
                          {
                            "index":4,
                            "name": "cf1:gender",
                            "type": "string"
                          }
                        ],
                        "versionColumn":{
                          "index": -1,
                          "value":"123456789"
                        },
                        "encoding": "utf-8"
                      
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }  
    }
}

#执行脚本
datax.py mysqlTohbase.json
        6、mysql增量同步数据到hive中。

最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": "id >20"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
9、在使用datax过程中出现的错误:

1、配置文件出现错误,脚本不完整:

数据集成工具 ---- datax 3.0,数据集成工具,大数据文章来源地址https://www.toymoban.com/news/detail-840016.html

到了这里,关于数据集成工具 ---- datax 3.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据ETL工具对比(Sqoop, DataX, Kettle)

    前言 在实习过程中,遇到了数据库迁移项目,对于数据仓库,大数据集成类应用,通常会采用 ETL 工具辅助完成,公司和客户使用的比较多的是 Sqoop , DataX 和 Kettle 这三种工具。简单的对这三种ETL工具进行一次梳理。 ETL工具,需要完成对源端数据的抽取(exat), 交互转换(

    2024年02月11日
    浏览(41)
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 社区发

    2024年02月04日
    浏览(40)
  • 阿里云开源离线同步工具DataX3.0,用于数据仓库、数据集市、数据备份

    DataX是阿里云开源的一款离线数据同步工具,支持多种数据源和目的地的数据同步,包括但不限于MySQL、Oracle、HDFS、Hive、ODPS等。它可以通过配置文件来定义数据源和目的地的连接信息、数据同步方式、数据过滤等,从而实现数据的高效、稳定、可扩展的同步。 例如,如果您

    2024年02月10日
    浏览(42)
  • 业务数据同步工具介绍和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介绍 Sqoop : SQ L-to-Had oop ( Apache已经终止Sqoop项目 ) 用途:把关系型数据库的数据转移到HDFS(Hive、Hbase)(重点使用的场景);Hadoop中的数据转移到关系型数据库中。Sqoop是java语言开发的,底层使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是数据块的转移,没有使

    2024年02月15日
    浏览(69)
  • 数据同步工具调研选型:SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

    Apache SeaTunnel 是一个非常易用的超高性能分布式数据集成产品,支持海量数据的离线及实时同步。每天可稳定高效同步万亿级数据,已应用于数百家企业生产,也是首个由国人主导贡献到 Apache 基金会的数据集成顶级项目。 SeaTunnel 主要解决数据集成领域的常见问题: * 数据源

    2024年02月04日
    浏览(37)
  • 开源DataX集成可视化项目Datax-Web的使用

    上一篇文章我们已经搭建好了 Datax-Web 后台,这篇文章我们具体讲一下如何通过Datax-Web来配置,同步MySQL数据库。 1、\\\"调度中心OnLine:\\\"右侧显示在线的\\\"调度中心\\\"列表, 任务执行结束后, 将会以failover的模式进行回调调度中心通知执行结果, 避免回调的单点风险; 2、“执行器列表”

    2024年02月08日
    浏览(32)
  • 使用DataX工具连接hive数据库:java.sql.SQLException: Could not establish connection to jdbc:hive2://master:1000

    Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:java.sql.SQLException: Could not establish connection to jdbc:hive2://master:10000/datax: Required field \\\'serverProtocolVersion\\\' is unset! Struct:TOpenSessionRe

    2024年04月09日
    浏览(49)
  • datax工具介绍及简单使用

    Datax是一个异构数据源离线同步工具,致力于实现包括关系数据库、HDFS、Hive、ODPS、Hbase等各种异构数据源之间稳定高效的数据同步功能 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当

    2024年02月09日
    浏览(33)
  • 【ETL工具】Datax-ETL-SqlServerToHDFS

    🪁🍁🪁🍁🪁🍁🪁🍁 感谢点赞和关注 ,每天进步一点点!加油! 🪁🍁🪁🍁🪁🍁🪁🍁 目录 🦄 个人主页——🎐个人主页 🎐✨🍁 一、DataX概览 1.1 DataX 简介 1.2 DataX框架 1.3 功能限制 1.4 Support Data Channels 二、配置样例 2.1 环境信息 2.2 SQLServer数据同步到HDFS 2.2 参数说明

    2024年02月08日
    浏览(43)
  • 【大数据】什么是数据集成?(SeaTunnel 集成工具介绍)

    数据集成是指将来自不同数据源的数据整合到一起形成一个统一的数据集 。这个过程包括从不同的数据源中收集数据,对数据进行清洗、转换、重构和整合,以便能够在一个统一的数据仓库或数据湖中进行存储和管理。 数据集成可以帮助企业更好地理解和利用他们的数据,

    2024年02月08日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包