Hive 拉链表详解及实例

这篇具有很好参考价值的文章主要介绍了Hive 拉链表详解及实例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

拉链表

  • 版本迭代:hive 0.14 slowly changing dimension => hive 2.6.0 merge 事务管理

    • 原来采用分区表,用户分区存储历史增量数据,缺点是重复数据太多
  • 定义:数仓用于解决持续增长且存在一定时间时间范围内重复的数据

  • 存储:创建拉链表时使用列式存储ORC
    不能使用load加载数据
    压缩比高 效率高

  • 场景:【数据规模庞大】,新数据【在有限的时间】内存在多种状态变化

  • 优点:节约空间(一份订单只有一条数据)

  • 举例:

原始表订单:
		order_id,order_timestamp,user_id,order_status
		1,2024_01_21 16:12:37.259,87986321,0
		1,2024_01_21 16:12:47.003,87986321,1
		1,2024_01_22 09:00:28.022,87986321,2
		1,2024_01_24 15:00:00.123,87986321,3
		1,2024_02_01 00:30:00.227,87986321,4
		order_detail_id,fk_order_id,goods_id,buy_count,goods_price

		拉链表订单:
		order_id,user_id,order_create_timestamp,order_modify_timestamp,order_amount,order_current_status
		1,87986321,2024_01_21 16:12:37.259,2024_02_01 00:30:00.227,3242.66,4
  • 配置:
set hive.support.concurrency=true;
set hive.enforce.bucketing=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true; -- 表合并开启
set hive.compactor.worker.threads=1; -- 表合并线程必须为一
set hive.auto.convert.join=false; -- 关闭 mapjoin
set hive.merge.cardinality.check=false; -- 关闭检查数据列的基数(列值的差异性)
set mapreduce.job.reduces=4;
  • 拉链表实例:
// 创建原始表格
	create table yb12211_2.hive_zipper_order(
		order_id bigint,
		user_id bigint,
		order_modify_dt timestamp,
		order_money decimal(10,2),
		current_status int
	)
	row format delimited fields terminated by ',';
	// 将数据文件导入原始表格
	load data local inpath '/root/hive/data/course/order_record.log'
	overwrite into table yb12211_2.hive_zipper_order;
	
	// 创建拉链表
	// 操作历史全量数据用动态分区
	set hive.support.concurrency=true;
	set hive.enforce.bucketing=true;
	set hive.exec.dynamic.partition.mode=nonstrict;
	set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
	set hive.compactor.initiator.on=true;
	set hive.compactor.worker.threads=1;
	set hive.auto.convert.join=false;
	set hive.merge.cardinality.check=false;
	set mapreduce.job.reduces=4;

	drop table if exists yb12211_2.hive_zipper_pc_order;
	create table yb12211_2.hive_zipper_pc_order(
		order_id bigint,
		user_id bigint,
		order_create_dt timestamp,
		order_modify_dt timestamp,
		order_money decimal(10,2),
		current_status int
	) partitioned by(year int,month int,day int)
	clustered by(order_create_dt) into 4 buckets
	row format delimited fields terminated by ','
	stored as orc
	tblproperties("transactional"="true");
	
	// 对拉链表的数据进行聚合,获取订单信息的创建日期、修改日期和订单状态
	with zip_src as (
		select order_id,user_id,order_money,
			min(order_modify_dt) as order_create_dt,
			max(order_modify_dt) as order_modify_dt,
			max(current_status) as current_status
		from yb12211_2.hive_zipper_order
		group by order_id,user_id,order_money
	)
	
	// 将原始数据灌入拉链表
	insert overwrite table yb12211_2.hive_zipper_pc_order partition(year,month,day)
	select
		order_id,
		user_id,
		order_create_dt,
		order_modify_dt,
		order_money,
		current_status,
		year(order_create_dt) as year,
		month(order_create_dt) as month,
		day(order_create_dt) as day
	from zip_src;
	
	// 拉链表查询 查询之前必须先有这两句配置
	set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
	set hive.support.concurrency=true;
	select * from yb12211_2.hive_zipper_pc_order
	where to_date(order_modify_dt)='2021-02-04'
	order by order_modify_dt desc;
	
	// 对于追加增量数据,将增量数据覆盖在原始数据表中
	load data local inpath '/root/hive/data/course/order_record_2021_02_05.log'
	overwrite into table yb12211_2.hive_zipper_order;	
	
	// 将原始数据表中的增量数据插入拉链表
	// 利用源数据和目标表的order_id进行匹配,若匹配则更新现有订单信息,若不匹配则插入新订单。
	merge into yb12211_2.hive_zipper_pc_order as O
	using (
		select
			order_id,
			user_id,
			order_create_dt,
			order_modify_dt,
			order_money,
			current_status,
			year(order_create_dt) as year,
			month(order_create_dt) as month,
			day(order_create_dt) as day
		from (
			select order_id,user_id,order_money,
				min(order_modify_dt) as order_create_dt,
				max(order_modify_dt) as order_modify_dt,
				max(current_status) as current_status
			from yb12211_2.hive_zipper_order
			group by order_id,user_id,order_money
		)T
	) as H
	on O.order_id=H.order_id
	when matched then
	update set order_modify_dt=H.order_modify_dt,current_status=H.current_status
	when not matched then
	insert values(H.order_id,H.user_id,H.order_create_dt,H.order_modify_dt,H.order_money,H.current_status,H.year,H.month,H.day);
	
	// 验证拉链结果:最后修改时间是否大于创建时间
	select * from yb12211_2.hive_zipper_pc_order
	where to_date(order_modify_dt)>to_date(order_create_dt);
  • 验证数据变化的三种情况:
    • 新增数据,插入原始表中的所有字段信息。
    • 更改数据,更改修改时间|结束时间|数据状态。
    • 删除数据:只需将结束日期改为删除当天即可。
构造增量数据

此处提供了订单日增量数据的自动生成代码,读者可利用代码实现对增量数据文件的生成,以便于体验拉链表的作用。文章来源地址https://www.toymoban.com/news/detail-817841.html

  • HiveZipMaker类
package cn.ybg;

import java.io.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;


public class HiveZipTableMaker {
    static Properties pro = new Properties();
    static Random rand = new Random();
    static Calendar calendar = Calendar.getInstance(Locale.CHINA);
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    static long orderId = 0;
    static long timestamp = 0;
    static String order_log = "order/order_log.properties";
    static String order_record = "order/order_record_2021_02_05.log";
    static String order_list = "order/order_list.log";
    static BufferedWriter bw;
    static long size;
    static LinkedList<Order> orders;

    static {
        try {
            pro.load(new FileReader(order_log));
            timestamp = Long.parseLong(pro.getProperty("timestamp"));
            calendar.setTimeInMillis(timestamp);
            orderId = Long.parseLong(pro.getProperty("orderId"));
            size = new File(order_record).length();
            bw = new BufferedWriter(new FileWriter(order_record,true));
            File orderList = new File(order_list);
            if (orderList.exists() && orderList.length()>0){
                ObjectInputStream ois = new ObjectInputStream(new FileInputStream(orderList));
                orders = (LinkedList<Order>) ois.readObject();
                ois.close();
            }else{
                orders = new LinkedList<>();
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    static int userId(){
        return rand.nextInt(10000000)+1;
    }

    static long orderId(){
        return ++orderId;
    }

    static Date now(){
        calendar.add(Calendar.MILLISECOND,rand.nextInt(5000));
        return calendar.getTime();
    }

    static void write(String line) throws IOException {
        if (orderId>1){
            bw.newLine();
        }
        bw.write(line);
    }

    static void close(){
        try {
            bw.close();
            pro.setProperty("timestamp",String.valueOf(timestamp));
            pro.setProperty("orderId",String.valueOf(orderId));
            pro.store(new FileWriter(order_log,false),timeFormat.format(new Date()));
            if (orders.size()>0){
                ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(order_list,false));
                oos.writeObject(orders);
                oos.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static double randMoney(){
        return new BigDecimal(rand.nextInt(500000)/100+10)
                .setScale(2, RoundingMode.HALF_UP)
                .doubleValue();
    }

    static boolean percentage(){
        return rand.nextInt(100)<65;
    }

    public static void main(String[] args) throws IOException, ParseException {
        /**
         * 订单编号:
         * 用户编号:
         * 操作日期:
         * 订单金额:
         * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
         */

        final long TWO_WEEKS = 14*24*60*60*1000;
        String format = dateFormat.format(now());
        Order order;
        Date now;
        Date dateEnd = dateFormat.parse("2021-02-06");
        while ((now = now()).before(dateEnd)){
            timestamp = now.getTime();
            String nowFormat = dateFormat.format(now);
            if (!format.equals(nowFormat)){
                format = nowFormat;
                if (orders.size()>0) {
                    Date finalNow = now;
                    orders.removeIf(o2-> finalNow.getTime()-o2.getModifyDate().getTime()>TWO_WEEKS);
                    orders.remove(rand.nextInt(orders.size()));
                }
            }
            if (percentage() && orders.size()>=20+rand.nextInt(21)) {
                for (int j = 0; j < rand.nextInt(orders.size()/15) ; j++){
                    int index = rand.nextInt(orders.size());
                    order = orders.get(index);
                    if (order.getStatus()<2){
                        order.setStatus(order.getStatus()+1);
                    }else{
                        order.setStatus(percentage()?3:4);
                        orders.remove(index);
                    }
                    order.setModifyDate(now);
                    write(order.toString());
                }
            }else{
                orderId = orderId();
                int userId = userId();
                double money = randMoney();
                order = new Order(orderId, userId, now, money, 0);
                orders.add(order);
                write(order.toString());
            }
        }
        close();
    }
}

  • Order类
package cn.ybg;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 订单编号:
 * 用户编号:
 * 操作日期:
 * 订单金额:
 * 当前状态:0(suspending),1(paid),2(signed),3(completed),4(return)
 */
public class Order implements Serializable {
    static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd.SSS");
    private long orderId;
    private int userId;
    private Date modifyDate;
    private double money;
    private int status;

    public Order(long orderId, int userId, Date modifyDate, double money, int status) {
        this.orderId = orderId;
        this.userId = userId;
        this.modifyDate = modifyDate;
        this.money = money;
        this.status = status;
    }

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public int getUserId() {
        return userId;
    }

    public void setUserId(int userId) {
        this.userId = userId;
    }

    public Date getModifyDate() {
        return modifyDate;
    }

    public void setModifyDate(Date modifyDate) {
        this.modifyDate = modifyDate;
    }

    public double getMoney() {
        return money;
    }

    public void setMoney(double money) {
        this.money = money;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return String.format("%d,%d,%s,%.2f,%d",orderId,userId,timeFormat.format(modifyDate),money,status);
    }
}

到了这里,关于Hive 拉链表详解及实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据hive】hive 拉链表设计与实现

    目录 一、前言 二、拉链表业务背景 2.1 数据同步引发的问题 2.1.1 解决方案1

    2024年02月09日
    浏览(35)
  • 详解数据库、Hive以及Hadoop之间的关系

    数据库是一个用于存储和管理数据的系统。 数据库管理系统(DBMS)是用于管理数据库的软件。 数据库使用表和字段的结构来组织和存储数据。 关系型数据库是最常见的数据库类型,使用SQL(Structured Query Language)进行数据操作和查询。 数据库管理系统(DBMS):数据库管理系

    2024年03月15日
    浏览(58)
  • Hive---拉链表

    拉链表是一种数据模型,主要是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。拉链表可以避免按每一天存储所有记录造成的海量存储问题,同时也是处理缓慢变化数据(SCD

    2024年02月10日
    浏览(71)
  • 大数据期资料2023 Beta版 - Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase详解

    了解大数据概念、Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase等技术,包括特点、命令操作和启动关闭方法。获取2023年大数据资料Beta版。

    2024年02月06日
    浏览(238)
  • hive 全量表、增量表、快照表、切片表和拉链表

    全量表 :记录每天的所有的最新状态的数据, 增量表 :记录每天的新增数据,增量数据是上次导出之后的新数据。 快照表 :按日分区,记录截止数据日期的全量数据 切片表 :切片表根据基础表,往往只反映某一个维度的相应数据。其表结构与基础表结构相同,但数据往往

    2024年02月13日
    浏览(33)
  • Hive时间日期函数一文详解+代码实例

    目录 前言 一、HiveSQL运行过程 二、Hive时间函数 1.获取当前时间 1.current_date() 2. current_timestamp() 3. unix_timestamp() 2.获取指定时间维度 1. year() 2.quarter() 3.month() 4.day() 5.hour() 6.minute() 7.second 8.weekofyear() 9. dayofweek()  10.last_day()  11.next_day() 12.trunc()  3.时间格式转换  1.to_date() 2. from_un

    2024年02月02日
    浏览(46)
  • Hive数据仓库---Hive的安装与配置

    Hive 官网地址:https://hive.apache.org/ 下载地址:http://www.apache.org/dyn/closer.cgi/hive/ 把安装文件apache-hive-3.1.2-bin.tar.gz上传到master节点的/opt/software目 录下,执行以下命令把安装文件解压到/opt/app目录中 进入/opt/app目录,为目录apache-hive-3.1.2-bin建立软件链接 即输入hive就相当于输入a

    2024年02月02日
    浏览(43)
  • Hive数据仓库

    数据仓库(英语:Data Warehouse,简称数仓、DW),是一个用于存储、分析、报告的数据系统。 数据仓库的目的是构建面相分析的集成化数据环境,分析结果为企业提供决策支持(Decision Support)。 数据仓库本身并不“产生”任何数据,其数据来源不同外部系统; 同时数据仓库

    2024年02月15日
    浏览(40)
  • hive数据仓库工具

    1、hive是一套操作数据仓库的应用工具,通过这个工具可实现mapreduce的功能 2、hive的语言是hql[hive query language] 3、官网hive.apache.org 下载hive软件包地址  Welcome! - The Apache Software Foundation https://archive.apache.org/ 4、hive在管理数据时分为元数据和真数据,其中元数据要保存在数据库中

    2024年02月04日
    浏览(35)
  • 安装hive数据仓库

    需要安装部署完成的Hadoop的环境如果不会搭建的可以参考: 卸载Centos7自带的mariadb mariadb-libs-5.5.64-1.el7.x86_64是使用 rpm -qa|grep mariadb 查询出来的名称 安装mysql 安装mysql时可能会出现的问题 1、依赖检测失败 问题很明显了就是依赖的问题,下载他说的依赖就好了 安装hive 上传并且

    2024年02月14日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包