DataX同步达梦数据到HDFS

这篇具有很好参考价值的文章主要介绍了DataX同步达梦数据到HDFS。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

DataX同步达梦数据到HDFS
1、 前提条件

  1. 安装达梦数据库客户端
  2. 安装Python3.5 以上
  3. 导入dmPython模块

导入dmPython流程

  1. 在达梦数据库客户端 \drivers\python\dmPython这个路径下执行
python setup.py install 

● 如果报错在PATH中加入E:\dmdbms\bin 达梦数据库的安装路径,并重新装载dmPython
Traceback (most recent call last):

File "setup.py", line 103, in

raise DistutilsSetupError("cannot locate an Dameng software " /

distutils.errors.DistutilsSetupError: cannot locate an Dameng software installation

● 如果报下面错误则需要自己下载Microsoft Visual C++ 14.0

error: Microsoft Visual C++ 14.0 is required. Get it with "Mi
  1. 在命令行里 输入 impot dmPython
1.  impot dmPython
报错 :ImportError: DLL load failed while importing dmPython: 找不到指定的模块。

2.  import sys
	sys.path
在末尾等到C:\\Users\\lee\\AppData\\Local\\Pro
Python38\\lib\\site-packages\\dmpython-2.3-py3.8-win-amd64.egg
这个路径

3. 将达梦数据库安装目录下 E:\dmdbms\drivers\dpi下的所有文件,
拷贝到sys.path的最后一个目录下面,再次导入import dmPython成功

2、 生成Job脚本

# ecoding=utf-8
import json
import getopt
import os
import sys
import dmPython

#DM相关配置,需根据实际情况作出修改
DM_host = "124.**.**.249"
DM_port = "5**6"
DM_user = "MES_****_**_SP"
DM_passwd = "*******"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "pt101"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "D:/"


def get_connection():
    return dmPython.connect(user=DM_user, password=DM_passwd, host=DM_host, port=int(DM_port))


def get_DM_meta(owner, table):
    connection = get_connection()
    cursor = connection.cursor()
    DMsql="SELECT COLUMN_NAME,DATA_TYPE FROM table_name WHERE owner="+"'"+owner+"'"+" AND TABLE_NAME="+"'"+table+"'"
    cursor.execute(DMsql)
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_DM_columns(owner, table):
    return list(map(lambda x: x[0], get_DM_meta(owner, table)))


def get_hive_columns(owner, table):
    def type_mapping(dm_type):
        mappings = {
            "NUMBER": "string",
            "INT": "int",
            "BLOB": "string",
            "CLOB": "string",
            "BIGINT": "bigint",
            "DOUBLE": "double",
            "FLOAT": "float",
            "TEXT": "string",
            "VARCHAR2": "string",
            "VARCHAR": "string",
            "TINYINT": "tinyint",
            "CHAR": "char",
            "TIMESTAMP": "string",
            "DECIMAL": "string",
            "DATETIME": "string",
            "DATE": "string",
            "SMALLINT": "smallint",
            "BIT": "boolean",
            "DEC": "string"
        }
        return mappings[dm_type]

    meta = get_DM_meta(owner, table)
    json.dumps(list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, get_DM_meta(owner, table)))))
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1])}, meta))


def generate_json(owner, source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "rdbmsreader",
                    "parameter": {
                        "username": DM_user,
                        "password": DM_passwd,
                        "column": get_DM_columns(owner, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_database+"."+source_table],
                            "jdbcUrl": ["jdbc:dm://" + DM_host + ":" + DM_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}${system.biz.date}",
                        "fileName": source_table,
                        "column": get_hive_columns(owner, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    owner = ""
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-u:-d:-t:', ['owner=', 'sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-u', '--owner'):
            owner = opt_value
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(owner,source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

3、 通过命令生成Job

python data_platform_datax_import_config.py -u user -d datebase-t table

4、启动DataX进行数据传输文章来源地址https://www.toymoban.com/news/detail-780108.html

  1. 将Job上传在DataX的job目录下
  2. 提前创建好文件夹
hadoop fs -mkdir -p /origin_data/datebase/table/2023-05-05
  1. 使用命令执行DataX
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/datebase/table/2023-05-05" /opt/module/datax/job/路径/脚本名称.json

到了这里,关于DataX同步达梦数据到HDFS的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 数据同步工具DataX、Sqoop、Maxwell、Canal

    常见的数据库同步同步主要有:DataX、Sqoop、Maxwell、Canal 数据同步工具种类繁多,大致可分为两类,一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具,另一类是以Maxwell、Canal为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、upda

    2024年02月11日
    浏览(28)
  • DataX实现Mysql与ElasticSearch(ES)数据同步

    jdk1.8及以上 python2 查看是否安装成功 查看python版本号,判断是否安装成功 在datax/job下,json格式,具体内容及主要配置含义如下 mysqlreader为读取mysql数据部分,配置mysql相关信息 username,password为数据库账号密码 querySql:需要查询数据的sql,也可通过colums指定需要查找的字段(

    2024年02月05日
    浏览(43)
  • 阿里巴巴开源DataX全量同步多个MySQL数据库

    上次 写了阿里巴巴高效的离线数据同步工具DataX: https://mp.weixin.qq.com/s/_ZXqA3H__Kwk-9O-9dKyOQ 安装DataX这个开源工具,并且同步备份了几张数据表。但是发现一个问题,就是每张表都需要单独写一个 job。如果数据表有几百张是不是要写几百个,这个不太现实了。 正当一筹莫展之际

    2024年02月02日
    浏览(52)
  • Centos7.9通过datax-web2.0_用Datax3.0进行增量同步_增量删除_数据更新---大数据之DataX工作笔记006

     1.注意这里的增量同步,不像之前用的DBsyncer或者是,NIFI中的利用binlog的形式,实现真正的实时的数据同步.  2.这里的增量是,指定通过ID,或者时间来进行增量,比如大于2023-07-03 11:44:56的数据仅仅同步这个,或者是,id大于多少的这样,这里建议用时间,因为如果有id用的字符串咋弄来

    2024年02月10日
    浏览(30)
  • DolphinScheduler 调度 DataX 实现 MySQL To ElasticSearch 增量数据同步实践

    基于SQL查询的 CDC(Change Data Capture): 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的

    2024年02月03日
    浏览(35)
  • DataX将MySQL数据同步到HDFS中时,空值不处理可以吗

    DataX将MySQL数据同步到HDFS中时,空值(NULL)存到HDFS中时,默认是存储为空字符串(‘’)。 HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为N。所以

    2024年02月12日
    浏览(37)
  • datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)

    1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控) 2.datax版本:自己编译的DataX-datax_v202210 3.hdfs版本:3.1.3 4.hive版本:3.1.2 1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个

    2023年04月23日
    浏览(33)
  • 业务数据同步工具介绍和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介绍 Sqoop : SQ L-to-Had oop ( Apache已经终止Sqoop项目 ) 用途:把关系型数据库的数据转移到HDFS(Hive、Hbase)(重点使用的场景);Hadoop中的数据转移到关系型数据库中。Sqoop是java语言开发的,底层使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是数据块的转移,没有使

    2024年02月15日
    浏览(69)
  • 数据同步工具调研选型:SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

    Apache SeaTunnel 是一个非常易用的超高性能分布式数据集成产品,支持海量数据的离线及实时同步。每天可稳定高效同步万亿级数据,已应用于数百家企业生产,也是首个由国人主导贡献到 Apache 基金会的数据集成顶级项目。 SeaTunnel 主要解决数据集成领域的常见问题: * 数据源

    2024年02月04日
    浏览(37)
  • datax同步数据到ClickHouse时同步时间特别长,原因:Too many partitions for single INSERT block (more than 100).

    今天将 Hive 分区中数据同步到 ClickHouse 时,发现有的任务运行时间很短,但是有的任务运行时间特别长,看了一下数据量,发现有的接近千万条数据,但是几分钟就同步完了,但是有的才几万条数据,要同步半个多小时,还有的任务几百万条数据,甚至要同步四五个小时。开

    2023年04月08日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包