【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

这篇具有很好参考价值的文章主要介绍了【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

一、mysql全量导入hive[分区表]

需求介绍:

本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。


  • mysql表建表语句:
create table t_order(
	id   	 	int   primary key auto_increment,
	amt  	 	decimal(10,2),
	`status` 	int  default 0,
	user_id  	int,
	create_time timestamp DEFAULT CURRENT_TIMESTAMP,
	modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
  • hive
create table t_order(
	id   	 	int,
	amt  	 	decimal(10,2),
	`status` 	int,
	user_id  	int,
	create_time date,
	modify_time date
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

注意字段时间戳,我们将从以上MySQL向Hive导入数据。

  • 编写datax的json脚本
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 执行导入操作

在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区

insert into t_order(amt,user_id) values(100,1001)
insert into t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
insert into t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')

在hive下创建分区

alter table t_order add partition(dt='2023-07-11')

运行dataX脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据导入到hive。


在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区

insert into t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
insert into t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');

在hive下创建分区

alter table t_order add partition(dt='2023-07-12')

运行datax脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。
根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。

二、mysql增量导入hive

大方向:事实表用增量[订单表] 维度表用全量[商品表]

绝大部分公司采用的方案:全量为主、增量为辅

要想采用增量导入还有一个问题是你的业务库表能够支持增量导入

1. 增量导入的第一种实现方法

根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。
如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。

2. 另一种方法是 时间字段

在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive

3. dataX脚本

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。

三、利用Python自动生成Datax的json脚本

1. 创建mysql和hive数据库

create table t_student(
	id   	 int PRIMARY key,
	name  	 	varchar(50),
	`age` 		int
);

create table t_person(
	id   	 		int PRIMARY key,
	name  	 	varchar(50),
	parentid 		int
);

INSERT into t_student values
(1,'zhanmusi',15),
(2,'lisi',55),
(3,'lisi',66);

INSERT into t_person values
(1,'miky',06),
(2,'tom',16),
(3,'jakcon',26);
create table ods_t_student(
	id   	 	int,
	name  	 	string,
	`age` 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

create table ods_t_person(
	id   	 		int,
	name  	 		string,
	parentid 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

2. 修改python脚本里面的密码(2处)和hdfs端口

import json
import sys
import pymysql


def gen_json(dbname, tablename):
 s1 = {
     "job": {
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "connection": [
                             {
                                 "jdbcUrl": ["jdbc:mysql://hadoop10:3306/" + dbname + "?useSSL=false"],
                                 "table": [tablename]
                             }
                         ],
                         "password": "0000",  # 密码
                         "username": "root",
                         "column": getColumn(dbname, tablename)
                     }
                 },
                 "writer": {
                     "name": "hdfswriter",
                     "parameter": {
                         "column": getColumnAndType(dbname, tablename),
                         "defaultFS": "hdfs://hadoop10:8020",  # hdfs端口
                         "fileType": "text",
                         "path": "/user/hive/warehouse/ods_" + tablename + "/dt=$dt",
                         "fieldDelimiter": "\t",
                         "fileName": tablename,
                         "writeMode": "append"
                     }
                 }
             }
         ],
         "setting": {
             "speed": {
                 "channel": "1"
             }
         }
     }
 }

 with open('d:/test/' + tablename + '.json', 'w') as f:
     json.dump(s1, f)


def queryDataBase(dbname, tablename):
 conn = pymysql.connect(user='root', password='0000', host='hadoop10')  # 密码
 cursor = conn.cursor()
 cursor.execute(
     "select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",
     [dbname, tablename])
 fetchall = cursor.fetchall()
 cursor.close()
 conn.close()
 return fetchall


def getColumn(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 k2 = list(map(lambda x: x[0], k1))
 return k2


def getColumnAndType(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 mappings = {
     'bigint': 'bigint',
     'varchar': 'string',
     'int': 'int',
     'datetime': 'string',
     'text': 'string'
 }
 k2 = list(map(lambda x: {"name": x[0], "type": mappings[x[1].lower()]}, k1))
 return k2


if __name__ == '__main__':
 l = sys.argv[1:]
 dbname = l[0]  # mysql数据库名
 tablename = l[1]  # 表名
 gen_json(dbname, tablename)

3. 运行python脚本

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person

4. 将生成的json文件上传到linux

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]

5. 编写shell脚本 b.sh

#! /bin/bash

dt=$1

if [ ''$1 == '' ]
then
  dt=$(date -d yesterday +%Y-%m-%d)
fi

echo $dt

s=$(hive -e "show partitions ods_t_student partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_student add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_student.json



s=$(hive -e "show partitions ods_t_person partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_person add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_person.json

6. 运行shell

root@hadoop10 app]# sh b.sh 2023-07-13文章来源地址https://www.toymoban.com/news/detail-772063.html

任务启动时刻                    : 2023-07-13 02:31:38
任务结束时刻                    : 2023-07-13 02:31:50
任务总计耗时                    :                 12s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

  • hive
id|name    |age|dt        |
--|--------|---|----------|
 1|zhanmusi| 15|2023-07-13|
 2|lisi    | 55|2023-07-13|
 3|lisi    | 66|2023-07-13|

到了这里,关于【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • DolphinScheduler 调度 DataX 实现 MySQL To ElasticSearch 增量数据同步实践

    基于SQL查询的 CDC(Change Data Capture): 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据。也就是我们说的基于SQL查询抽取; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 不保障实时性,基于离线调度存在天然的

    2024年02月03日
    浏览(34)
  • 电商数仓项目需求及架构设计

    1.用户行为数据采集平台搭建 2.业务数据采集平台搭建 3.数仓维度建模 4.统计指标 5.即席查询工具,随时进行指标分析 6.对集群性能进行监控,发生异常时报警(第三方信息) 7.元数据管理 8.质量监控 9.权限管理(表级别、字段级别) 数据量大小、业务需求、行内经验、技术

    2024年02月10日
    浏览(22)
  • Spark 增量抽取 Mysql To Hive

    抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用h

    2024年02月05日
    浏览(24)
  • sqoop(DataX)-MySQL导入HIVE时间格问题

    用公司的大数据平台(DataX)导数,已经开发上线一个多月的一批报表,突然有同事说有个报表数据不准。出在时间字段上。 分析: 1、先看了原数据MySQL字段类型为datetime,目标字段为timestamp类型; 2、经发现所有时间的差距都是8小时,怀疑是因为时区转换的原因; 3、对比其他

    2024年02月02日
    浏览(36)
  • 使用DataX实现mysql与hive数据互相导入导出

             DataX 是 阿里巴巴开源 的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各种异构数据源之间稳定高效的数据同步 功能。          为了解决异构数据源同步问题,DataX 将复杂的 网状 的同步链路变成了

    2024年02月08日
    浏览(29)
  • 大数据之使用Spark全量抽取MySQL的数据到Hive数据库

    前言 一、读题分析 二、使用步骤 1.导入配置文件到pom.xml 2.代码部分 三、重难点分析 总结 本题来源于全国职业技能大赛之大数据技术赛项赛题-离线数据处理-数据抽取(其他暂不透露) 题目:编写Scala代码,使用Spark将MySQL的shtd_industry库中表EnvironmentData,ChangeRecord,BaseMach

    2024年02月11日
    浏览(36)
  • 使用DataX实现mysql与hive数据互相导入导出 一、概论

             DataX 是 阿里巴巴开源 的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各种异构数据源之间稳定高效的数据同步 功能。          为了解决异构数据源同步问题,DataX 将复杂的 网状 的同步链路变成了

    2024年02月14日
    浏览(29)
  • elasticsearch+canal增量、全量同步

    目录 一、搭建环境: 1.1 下载软件上传到linux目录/data/soft下 1.2  把所有软件解压到/data/es-cluster 二、单节点(多节点同理)集群部署elasticsearch 2.1 创建es用户 2.2 准备节点通讯证书 2.3 配置elasticsearch,编辑/data/es-cluster/elasticsearch-7.9.0-node1/config/elasticsearch.yml文件 2.4 在每一台集群

    2024年01月24日
    浏览(37)
  • 离线数据仓库-关于增量和全量

    应用系统所产生的业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。 为了方便上层指标的统计,数据的同步策略有 全量同步 和 增量同步 。 同步方式是针对对应的表而言的! 为什么要做数据

    2024年01月17日
    浏览(33)
  • 如何选择离线数据集成方案 - 全量&增量

    1 前言 我在上一篇中介绍了实时集成与离线集成该怎么选择,接着介绍一下离线集成中的增量与全量的选择问题。 要设计方案,我们先分析一下数据产生的方式。我们把音视频流这种非结构化的数据集成从这里排除出去,因为这种音视频流一般都是专业的厂商和系统来处理。

    2024年02月02日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包