离线数仓建设之数据导出

这篇具有很好参考价值的文章主要介绍了离线数仓建设之数据导出。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为了方便报表应用使用数据,需将ADS各项指标统计结果导出到MySQL,方便熟悉 SQL 人员使用。

1 MySQL建库建表

1.1 创建数据库

创建car_data_report数据库:

CREATE DATABASE IF NOT EXISTS car_data_report
# 字符集
DEFAULT CHARSET utf8mb4
# 排序规则
COLLATE utf8mb4_general_ci;

1.1.2 创建表

① 里程相关统计

创建ads_mileage_stat_last_month表,存储里程相关统计数据。

DROP TABLE IF EXISTS ads_mileage_stat_last_month;

CREATE TABLE ads_mileage_stat_last_month (
  vin VARCHAR(20) COMMENT '汽车唯一ID',
  mon VARCHAR(7) COMMENT '统计月份',
  avg_mileage INT COMMENT '日均里程',
  avg_speed DECIMAL(16, 2) COMMENT '平均时速分子',
  danger_count DECIMAL(16, 2) COMMENT '平均百公里急加减速次数'
) COMMENT '里程相关统计';
② 告警相关统计

创建ads_alarm_stat_last_month表,存储告警相关的统计数据。

DROP TABLE IF EXISTS ads_alarm_stat_last_month;

CREATE TABLE ads_alarm_stat_last_month (
  vin VARCHAR(20) COMMENT '汽车唯一ID',
  mon VARCHAR(7) COMMENT '统计月份',
  alarm_count INT COMMENT '告警次数',
  l1_alarm_count INT COMMENT '一级告警次数',
  l2_alarm_count INT COMMENT '二级告警次数',
  l3_alarm_count INT COMMENT '三级告警次数'
) COMMENT '告警相关统计';

3)温控相关统计

创建ads_temperature_stat_last_month表,存储温控相关的统计数据。

DROP TABLE IF EXISTS ads_temperature_stat_last_month;

CREATE TABLE ads_temperature_stat_last_month (
  vin VARCHAR(20) COMMENT '汽车唯一ID',
  mon VARCHAR(7) COMMENT '统计月份',
  max_motor_temperature INT COMMENT '电机最高温度',
  avg_motor_temperature DECIMAL(16, 2) COMMENT '电机平均温度',
  max_motor_controller_temperature INT COMMENT '电机控制器最高温度',
  avg_motor_controller_temperature DECIMAL(16, 2) COMMENT '电机控制器平均温度',
  max_battery_temperature INT COMMENT '最高电池温度',
  battery_temperature_abnormal_count INT COMMENT '电池温度异常值次数'
) COMMENT '温控相关统计';

4)能耗相关统计

创建ads_consume_stat_last_month表,存储能耗相关的统计数据。

DROP TABLE IF EXISTS ads_consume_stat_last_month;

CREATE TABLE ads_consume_stat_last_month (
  vin VARCHAR(20) COMMENT '汽车唯一ID',
  mon VARCHAR(7) COMMENT '统计月份',
  soc_per_charge DECIMAL(16, 2) COMMENT '次均充电电量',
  duration_per_charge DECIMAL(16, 2) COMMENT '次均充电时长',
  charge_count INT COMMENT '充电次数',
  fast_charge_count INT COMMENT '快充次数',
  slow_charge_count INT COMMENT '慢充次数',
  fully_charge_count INT COMMENT '深度充电次数',
  soc_per_100km DECIMAL(16, 2) COMMENT 'soc百公里平均消耗',
  soc_per_run DECIMAL(16, 2) COMMENT '每次里程soc平均消耗',
  soc_last_100km DECIMAL(16, 2) COMMENT '最近百公里soc消耗'
) COMMENT '能耗主题统计';

2 数据导出

DataX作为数据导出工具,并选择HDFSReader和MySQLWriter作为数据源和目标。

2.1 编写DataX配置文件

我们需要为每个表编写一个DataX配置文件。以ads_alarm_stat_last_month为例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1  // DataX 作业的并发通道数,一般根据系统资源进行调整,1 表示单通道
      }
    },
    "content": [
      {
        "reader": {
          ...
        },
        "writer": {
          "name": "mysqlwriter",  // 写入数据的插件类型为 MySQL 数据库写入
          "parameter": {
            "writeMode": "replace",  // 写入模式为替换(如果表存在则先删除再写入)
            "username": "root",  // 数据库用户名
            "password": "000000",  // 数据库密码
            "column": [  // 写入的列信息,包括 vin、mon、alarm_count、l1_alarm_count、l2_alarm_count、l3_alarm_count
              "vin",
              "mon",
              "alarm_count",
              "l1_alarm_count",
              "l2_alarm_count",
              "l3_alarm_count"
            ],
            "connection": [  // 数据库连接信息列表,支持多个数据库连接
              {
                "jdbcUrl": "jdbc:mysql://hadoop102:3306/car_data_report?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8",  // MySQL 数据库连接地址,设置了 SSL、公钥检索、Unicode 编码等参数
                "table": [  // 写入的数据库表列表,这里只写入 ads_alarm_stat_last_month 表
                  "ads_alarm_stat_last_month"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

导出路径参数path并未写死,需在提交任务时通过参数动态传入,参数名称为exportdir。

模版配置参数解析:

HDFSReader:

离线数仓建设之数据导出

即:

"reader": {
  "name": "hdfsreader",  // 读取数据的插件类型为 HDFS 文件读取
  "parameter": {
    "path": "${exportdir}",  // HDFS 文件路径,使用 ${exportdir} 变量表示动态路径
    "defaultFS": "hdfs://hadoop102:8020",  // HDFS 默认文件系统地址
    "column": [  // 需要读取的列信息,这里使用通配符 * 表示读取所有列
      "*"
    ],
    "fileType": "text",  // 文件类型为文本文件
    "encoding": "UTF-8",  // 文件编码格式为 UTF-8
    "fieldDelimiter": "\t",  // 字段分隔符为制表符
    "nullFormat": "\\N"  // 空值格式为 \N
  }
},

MySQLWriter:

离线数仓建设之数据导出

"writer": {
  "name": "mysqlwriter",  // 写入数据的插件类型为 MySQL 数据库写入
  "parameter": {
    "writeMode": "replace",  // 写入模式为替换(如果表存在则先删除再写入)
    "username": "root",  // 数据库用户名
    "password": "000000",  // 数据库密码
    "column": [  // 写入的列信息,包括 vin、mon、alarm_count、l1_alarm_count、l2_alarm_count、l3_alarm_count
      "vin",
      "mon",
      "alarm_count",
      "l1_alarm_count",
      "l2_alarm_count",
      "l3_alarm_count"
    ],
    "connection": [  // 数据库连接信息列表,支持多个数据库连接
      {
        "jdbcUrl": "jdbc:mysql://hadoop102:3306/car_data_report?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8",  // MySQL 数据库连接地址,设置了 SSL、公钥检索、Unicode 编码等参数
        "table": [  // 写入的数据库表列表,这里只写入 ads_alarm_stat_last_month 表
          "ads_alarm_stat_last_month"
        ]
      }
    ]
  }
}

2.2 DataX配置文件生成脚本

TODO(在下载的资料压缩包里)datax_config_generator拷贝到/opt/module。

修改/opt/module/datax_config_generator/configuration.properties:

mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=car_data
mysql.database.export=car_data_report
mysql.tables.import=
mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/import
export_out_dir=/opt/module/datax/job/export

执行配置文件生成器:

java -jar datax-config-generator-1.0.1-jar-with-dependencies.jar

观察生成的配置文件:

ll /opt/module/datax/job/export/

总用量 20
-rw-rw-r--. 1 atguigu atguigu  961 4月  26 19:47 car_data_report.ads_alarm_stat_last_month.json
-rw-rw-r--. 1 atguigu atguigu 1095 4月  26 19:47 car_data_report.ads_consume_stat_last_month.json
-rw-rw-r--. 1 atguigu atguigu 1062 4月  26 19:47 car_data_report.ads_electric_stat_last_month.json
-rw-rw-r--. 1 atguigu atguigu  939 4月  26 19:47 car_data_report.ads_mileage_stat_last_month.json
-rw-rw-r--. 1 atguigu atguigu 1083 4月  26 19:47 car_data_report.ads_temperature_stat_last_month.json

2.3 测试生成的DataX配置文件

以ads_trans_order_stats为例,测试用脚本生成的配置文件是否可用。

1)执行DataX同步命令

python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/car_data/ads/ads_order_stats" /opt/module/datax/job/export/tms_report.ads_order_stats.json

2)观察同步结果

观察MySQL目标表是否出现数据。

离线数仓建设之数据导出

2.4 编写导出脚本

创建hdfs_to_mysql.sh

vim hdfs_to_mysql.sh
#!/bin/bash

# 设置 DataX 的安装路径
DATAX_HOME=/opt/module/datax

# 清理指定路径下的空文件
# 参数 $1: 待清理的路径
handle_export_path() {
  for file in $(hadoop fs -ls -R "$1" | awk '{print $8}'); do
    # 检查文件是否为空
    if hadoop fs -test -z "$file"; then
      echo "$file 文件大小为0,正在删除..."
      # 删除空文件
      hadoop fs -rm -r -f "$file"
    fi
  done
}

# 导出数据到指定路径
# 参数 $1: DataX 配置文件路径
# 参数 $2: 导出路径
export_data() {
  datax_config="$1"
  export_dir="$2"
  # 调用清理空文件函数
  handle_export_path "$export_dir"
  # 执行 DataX 导出命令
  $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" "$datax_config"
}

# 主逻辑,根据传入的参数执行数据导出操作
case $1 in
  'ads_mileage_stat_last_month' | 'ads_alarm_stat_last_month' | 'ads_temperature_stat_last_month' | 'ads_electric_stat_last_month' | 'ads_consume_stat_last_month')
    # 导出单个表的数据
    export_data "/opt/module/datax/job/export/car_data_report.$1.json" "/warehouse/car_data/ads/$1"
    ;;
  'all')
    # 导出所有表的数据
    for table in 'ads_mileage_stat_last_month' 'ads_alarm_stat_last_month' 'ads_temperature_stat_last_month' 'ads_electric_stat_last_month' 'ads_consume_stat_last_month'; do
      export_data "/opt/module/datax/job/export/car_data_report.$table.json" "/warehouse/car_data/ads/$table"
    done
    ;;
  *)
    # 未知参数,打印提示信息
    echo "Usage: $0 {ads_table_name | all}"
    echo "Example: $0 ads_mileage_stat_last_month"
    ;;
esac
chmod +x hdfs_to_mysql.sh

hdfs_to_mysql.sh all

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都技术专家兼架构,多家大厂后端一线研发经验,各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&优惠券等营销中台建设
  • 交易平台及数据中台等架构和开发设计

目前主攻降低软件复杂性设计、构建高可用系统方向。

参考:

  • 编程严选网

本文由博客一文多发平台 OpenWrite 发布!文章来源地址https://www.toymoban.com/news/detail-840655.html

到了这里,关于离线数仓建设之数据导出的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 看这篇就明白大数据实时数仓、离线数仓、数据湖之间的关系

      20世纪70年代,MIT(麻省理工)的研究员致力于研究一种优化的技术架构,该架构试图将业务处理系统和分析系统分开,即将业务处理和分析处理分为不同层次,针对各自的特点采取不同的架构设计原则,MIT的研究员认为这两种信息处理的方式具有显著差别,以至于必须采取完

    2024年02月08日
    浏览(44)
  • 离线数仓(一)【数仓概念、需求架构】

            今天开始学习数仓的内容,之前花费一年半的时间已经学完了 Hadoop、Hive、Zookeeper、Spark、HBase、Flume、Sqoop、Kafka、Flink 等基础组件。把学过的内容用到实践这是最重要的,相信会有很大的收获。         数据仓库( Data Warehouse ),是 为企业制定决策,提供数

    2024年02月20日
    浏览(38)
  • 数仓报表数据导出——Hive数据导出至Clickhouse

    创建database 创建table 使用 spark-sql 查询数据,然后通过 jdbc 写入Clickhouse。 创建Maven项目,pom.xml文件如下 创建HiveToClickhouse类 上传hive.xml,hdfs.xml 以及core-site.xml文件到项目的resource目录下 打包,并上传hive-to-clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop节点 执行如下命令测试 为

    2024年02月16日
    浏览(41)
  • 离线数仓分层

    1、清晰数据结构 :数仓每一层都有对应的作用,方便在使用时更好定位与了解 2、数据血缘追踪 :清晰知道表/任务上下游,方便排查问题,知道下游哪个模块在使用,提升开发效率及后期管理维护 3、减少重复开发 :完善数仓好中间层,减少后期不必要的开发,从而减少资

    2024年02月06日
    浏览(37)
  • 阿里云生态离线数仓

            功能齐全:10多年大数据建设沉淀完整的平台,覆盖数据开发治理的全生命周期         简单易用:全图形化界面,SQL为主的数据开发方式         安全稳定:双11日千万级任务稳定调度,金融级数据安全保障         开放兼容: 支持多种大数据引擎绑定,开放

    2024年02月05日
    浏览(32)
  • 一百八十六、大数据离线数仓完整流程——步骤五、在Hive的DWS层建动态分区表并动态加载数据

    经过6个月的奋斗,项目的离线数仓部分终于可以上线了,因此整理一下离线数仓的整个流程,既是大家提供一个案例经验,也是对自己近半年的工作进行一个总结。 1、Hive的DWS层建库建表语句 --如果不存在则创建hurys_dc_dws数据库 create database if not exists hurys_dc_dws; --使用hurys_

    2024年02月07日
    浏览(45)
  • 【从0开始离线数仓项目】——新能源汽车数仓项目介绍

    目录 1、数据仓库概念 2、项目需求及架构设计 3、集群资源规划设计  4、车辆日志字段说明 数据仓库(Data Warehouse)是为企业提供数据支持,用以协助企业制定决策、改进业务流程和提高产品质量等方面的工具。它可以接收多种类型的输入数据,如业务数据、日志数据和爬虫

    2024年02月13日
    浏览(37)
  • 一百八十二、大数据离线数仓完整流程——步骤一、用Kettle从Kafka、MySQL等数据源采集数据然后写入HDFS

    经过6个月的奋斗,项目的离线数仓部分终于可以上线了,因此整理一下离线数仓的整个流程,既是大家提供一个案例经验,也是对自己近半年的工作进行一个总结。 项目行业属于交通行业,因此数据具有很多交通行业的特征,比如转向比数据就是统计车辆左转、右转、直行

    2024年02月07日
    浏览(50)
  • 离线数仓中,为什么用两个flume,一个kafka

    实时数仓中,为什么没有零点漂移问题? 因为flink直接取的事件时间 用kafka是为了速度快,并且数据不丢,那为什么既用了kafkachannel,也用了kafka,而不只用kafkachannel呢? 因为需要削峰填谷 离线数仓中,为什么用两个flume,一个kafka,直接用taildirsource,kafkachannel,hdfssink不行吗?

    2024年02月14日
    浏览(46)
  • 尚硅谷大数据项目《在线教育之离线数仓》笔记002

     视频地址:尚硅谷大数据项目《在线教育之离线数仓》_哔哩哔哩_bilibili 目录 P025 P026 P027 P028 P029 P030 P031 P032 P033 P034 P035 P036 P037 P038 P025 在Hive所在节点部署Spark P026 3 )Hive on Spark 测试 (1)启动hive客户端 [atguigu@hadoop102 hive]$ hive (2)创建一张测试表 hive (default) create table stud

    2024年02月12日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包