HIVE 3 使用 MR 引擎多表关联 (JOIN) 导致丢数的问题复现、问题根源及解决方案 (附代码)

这篇具有很好参考价值的文章主要介绍了HIVE 3 使用 MR 引擎多表关联 (JOIN) 导致丢数的问题复现、问题根源及解决方案 (附代码)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

本文意图解决 HIVE 3 版本中使用 MR 作为运算引擎进行 JOIN 操作时导致的丢数情况。

问题描述

Apache Hive 在 2.3 版本后宣布放弃维护 MapReduce 作为底层执行引擎,并转而使用 Tez 作为默认的查询引擎。但是由于 Tez 在大作业量和高并发时的严重性能问题,导致许多任务不得不继续使用 MapReduce 进行操作,因此就需要开发者自行维护 Hive 对于 MR 的可用性。

然而,在 Hive 升级至 Hive 3 版本中,继续使用 MapReduce 会导致非常严重的恶性错误。例如,即使进行非常简单的 JOIN 操作,都会导致部分应该被关联上的数据丢失。

本文档意图提供测试场景浮现上述恶性漏洞,并阐述其根本原因,最后对出现问题部分的源代码进行修改,以彻底修复该问题。

问题复现

场景1: 多表 (超过三张表) 时数据丢失

在复现开始之前先对 Hive 的部分参数进行设置:

SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;

首先,创建三张表。这三张表除了表名不一样,其他包括列信息甚至数据在内完全相同。

建表语句如下。我们使用文件的形式快速插入,当然为了复现这个问题您也可以手动插入如下数据:

USE default;

create table table_a(id string, name string, addr string) stored as orc;
create table table_b(id string, name string, addr string) stored as orc;
create table table_c(id string, name string, addr string) stored as orc;

LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_a_data.orc" INTO TABLE table_a;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_b_data.orc" INTO TABLE table_b;
LOAD DATA LOCAL INPATH "/home/hadoop/reproduce_hive/Scenario1/table_c_data.orc" INTO TABLE table_c;
  

通过以下语句查看三张表的内容,可以看到其中的数据完全一致。

hive> select * from table_a;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.157 seconds, Fetched: 10 row(s)
hive> select * from table_b;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.471 seconds, Fetched: 10 row(s)
hive> select * from table_c;
OK
11      a       aaa
22      b       bbb
33      c       ccc
44      d       ddd
55      e       eee
66      f       fff
77      g       ggg
88      h       hhh
99      i       iii
00      j       jjj
Time taken: 0.186 seconds, Fetched: 10 row(s)  

在确认三张表的数据准确无误后,使用如下关联语句对三张表进行关联:

select a.id as a_id, b.name as b_name, c.addr as c_addr from table_a a join table_b b on(a.id=b.id) join table_c c on(c.name=b.name);  

关联结果如下,数据丢失的结果令人咋舌。

MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 23439 HDFS Write: 5508 SUCCESS
Stage-Stage-2:  HDFS Read: 26292 HDFS Write: 5508 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
22      b       bbb
66      f       fff
88      h       hhh
55      e       eee
99      i       iii
Time taken: 3.343 seconds, Fetched: 5 row(s)

可以非常明显地看到,本来应该被完全关联在一起的 10 条数据,居然出现了严重的数据丢失。有一半的数据竟然没有被成功关联。如果多次运行关联语句,可以发现这不是偶然情况。每次关联 2 张表以上的数据都会出现极为严重的数据丢失问题。

场景2: 表的某些属性 (e.g. bucketing_version) 不同时,即使两张表关联也会导致数据丢失

使用如下数据进行数据建表关联。在建表时使用不同的 bucketing_version 进行表的初始化。

数据文件如下:

0,Kurt,vulnedcasey@yahoo.co.uk
1,Rolland,naejose@gmx.com
2,Cortez,blategarfield@yahoo.com
3,Tyron,tameprobes@gmail.com
4,Matthew,wellezekiel@yahoo.co.uk
5,Jeffrey,fabingeborg@comcast.net
6,Gerard,oughtoutgo@att.net
7,Hal,coursedmauro@hotmail.com
8,Virgil,squintprude@gmail.com
9,Hector,lewddillon@email.com  

利用如下语句建表,并在建表时使用不同的 bucketing_version 属性。

CREATE TABLE `join_test_1`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='1');

LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_1;

CREATE TABLE `join_test_2`(`id` string, `first` string, `email` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' TBLPROPERTIES ('bucketing_version'='2');

LOAD DATA LOCAL INPATH '/home/hadoop/reproduce_hive/Scenario2/test_data.csv' OVERWRITE INTO TABLE join_test_2;  

运行关联操作的 SQL 语句:

SET hive.execution.engine=mr;
SET mapred.reduce.tasks=2;
SET hive.auto.convert.join=false;
SELECT * from (SELECT id from join_test_1) as tbl1 LEFT JOIN (SELECT id from join_test_2) as tbl2 on tbl1.id = tbl2.id;  

查询关联结果,令人惊讶的事情再一次发生:

Ended Job = job_local184369678_0005
MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 28434 HDFS Write: 7956 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0       NULL
2       NULL
4       NULL
6       NULL
8       8
1       NULL
3       NULL
5       5
7       NULL
9       NULL

蕴含同样数据的两张表,仅仅由于建表时的某些属性不同,就导致了绝大部分数据的关联都不成功。数据最基本的准确性都无法得到保障,这毫无疑问是 HIVE 3 中非常致命的问题。

问题根源

由于该问题影响过于严重,导致许多使用 HIVE 的开发者第一时间发现了本问题并及时进行了 Bug Report。在 HIVE Jira 上面可以看到非常多的针对该问题的问题报告和可能的解决方案。

本文主要采用了 HIVE JIRA 中编号为 HIVE-22098 的问题描述和相应解决思路。

根据 HIVE-22098 的问题描述,究其根源,是由于 HIVE 2 与 HIVE 3 在 JOIN 操作时使用了不同的 Hash 算法,导致同样的值在关联时被不同的 Hash 算法映射成了不同的值,而这些不同的 Hash 值在进行关联时无法被相互匹配。最终导致本来该被关联在一起的数据由于 Hash 值得不同未能被关联在一起。而决定到底应用哪套 Hash 值算法则是根据 bucketing_version 的值来进行评判的。

特别地,在进行多表关联时,即使相同 bucketing_version 的 Hive 表,由于其关联的中间过程所产生的中间表,在源代码中 bucketing_version 值会被置为 -1,因此该中间表再与第三张乃至更多的表关联时会直接导致 Hash 算法的混乱计算。

因此,为了保障关联的数据准确性,必须要确保 bucketing_version 在进行多表关联或者多版本表关联时的稳定。即,保障 bucketing_version 的稳定性就是保证 Hive 3 数据关联时的准确性。

此外,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

Ps: 该问题还可能导致许多其他异常的出现,比如 HIVE-18983HIVE-20164HIVE-22429 等诸多问题的出现。因此该 BUG 的严重级别是最高的。修复了本问题,其余数十个问题也就都可以迎刃而解。

解决思路

由于 HIVE 中 JOIN 操作执行流程的本质是一个二叉树,因此我们只需要通过算法在关联时遍历每个节点,并将每个节点的 bucketing_version 在关联前手动设置为该二叉树中的最高版本,即可保证 bucketing_version 在关联时的稳定,也就可以保障关联不丢数。

源码修改及编译上传

将如下代码替换至原代码即可修复本问题。

下面给出 Patch 中的 Git 代码,对相应的类进行修复。Patch 的代码在 Jira HIVE-22098 中都可以找到。在此衷心感谢各位 Code Contributors 对于 HIVE 社区的贡献。

 From c0774da927451008ba78ed7b8637a1a4899d9e12 Mon Sep 17 00:00:00 2001
From: luguangming <luguangming1@huawei.com>
Date: Mon, 12 Aug 2019 14:24:05 +0800
Subject: [PATCH]HIVE-22098
---
 .../apache/hadoop/hive/ql/exec/mr/ExecMapper.java  | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index 99b33a3..d0c847e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -20,10 +20,12 @@

 import java.io.IOException;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;

+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -104,6 +106,10 @@ public void configure(JobConf job) {
       // initialize map operator
       mo.initialize(job, null);
       mo.setChildren(job);
+
+      // defined self balance ReduceSinkOperator of bucketVersion
+      balanceRSOpbucketVersion(mo);
+
       l4j.info(mo.dump(0));
       // initialize map local work
       localWork = mrwork.getMapRedLocalWork();
@@ -138,6 +144,41 @@ public void configure(JobConf job) {
       }
     }
   }
+
+  /**
+   * defined-self balance ReduceSinkOperator of bucketVersion, keep values to sameness
+   * @param rootOp
+   */
+  private static void balanceRSOpbucketVersion(Operator rootOp){
+    List<Operator<? extends OperatorDesc>> needDealOps = new ArrayList<Operator<? extends OperatorDesc>>();
+    visitChildGetRSOps(rootOp, needDealOps);
+    int bucketVersion = -1;
+    for(Operator<? extends OperatorDesc> rsop : needDealOps){
+      if(rsop.getBucketingVersion() != 2 && rsop.getBucketingVersion() != 1){
+        rsop.setBucketingVersion(-1);
+      }
+      if(rsop.getBucketingVersion() > bucketVersion){
+        bucketVersion = rsop.getBucketingVersion();
+      }
+    }
+    for(Operator<? extends OperatorDesc> rsop : needDealOps){
+      l4j.info("update reduceSinkOperator name="+rsop.getName()+", opId="+rsop.getOperatorId()+", oldBucketVersion="+rsop.getBucketingVersion()+", newBucketVersion="+bucketVersion);
+      rsop.setBucketingVersion(bucketVersion);
+    }
+    needDealOps.clear();
+  }
+  private static void visitChildGetRSOps(Operator rootOp, List<Operator<? extends OperatorDesc>> needDealOps){
+    List<Operator<? extends OperatorDesc>> ops = rootOp.getChildOperators();
+    if(ops == null || ops.isEmpty()){
+      return;
+    }
+    for(Operator<? extends OperatorDesc> op : ops) {
+      if (op instanceof ReduceSinkOperator) {
+        needDealOps.add(op);
+      }
+      visitChildGetRSOps(op, needDealOps);
+    }
+  }
   @Override
   public void map(Object key, Object value, OutputCollector output,
       Reporter reporter) throws IOException {
-- 
2.9.2

编译对应的模块 hive-exec-3.1.2.jar,并将该 Jar 包替换 Hive 3 自带的 Jar 包。编译 HIVE 的命令可以去查询 HIVE 的官方文档,这里不做过多赘述。

重启 Hive 让其重新加载我们修改源码后的 Jar 包,再次重复上述两个场景,即可观察到 MapReduce 的结果正常,该问题被成功修复。

关联结果示例:

Ended Job = job_local184369678_0006
MapReduce Jobs Launched: 
Stage-Stage-1:  HDFS Read: 22434 HDFS Write: 7966 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
0       0
1       1
2       2
4       4
6       6
8       8
3       3
5       5
7       7
9       9

小结

HIVE JIRA 中有许多关于异常信息和报错的讨论。经常性地浏览社区,配合阅读源代码可以对 HIVE 的理解更加深入。

再次,HIVE 社区已经针对 bucketing_version 不稳定的问题进行了集中的问题汇总和修改建议指导。可以通过查看 JIRA: HIVE-21304 了解系统性的 bucketing_version 稳定性提高方法,此处不做过多赘述。

希望本篇文章对您的 HIVE 使用有所帮助。文章来源地址https://www.toymoban.com/news/detail-644660.html

References

  • JIRA: HIVE-22098
  • JIRA: HIVE-21304

到了这里,关于HIVE 3 使用 MR 引擎多表关联 (JOIN) 导致丢数的问题复现、问题根源及解决方案 (附代码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hive 基于Tez引擎 map和reduce数的参数控制原理与调优经验

    主要对基于Tez的map数和reduce数测试与调优 如果需要查看基于MapReduce的调优可以看这篇: Hive 基于MapReduce引擎 map和reduce数的参数控制原理与调优经验 https://blog.csdn.net/qq_35260875/article/details/110181866?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22110181866%22%2C

    2024年02月04日
    浏览(31)
  • MySQL多表查询内连接外连接详解,join、left join、right join、full join的使用

    目录 1、多表查询概览 1.1、分类 1.2、外连接的分类 1.3、常用的SQL语法标准 2、内外联接案例 2.1、初始化表 2.2、内连接 2.3、外连接案例 2.4、全连接案例 2.5、union和union all 2.6、实现MySQL全连接 2.7、内外连接面试基础 2.8、SQL99多表查询新特性 1.1、分类 可以根据3个角度进行分类

    2024年02月05日
    浏览(59)
  • Elasticsearch 之 join 关联查询及使用场景

    在Elasticsearch这样的分布式系统中执行类似SQL的join连接是代价是比较大的,然而,Elasticsearch却给我们提供了基于水平扩展的两种连接形式 。这句话摘自Elasticsearch官网,从“然而”来看,说明某些场景某些情况下我们还是可以使用的 在关系型数据库中,以MySQL为例,尤其B端类

    2024年02月06日
    浏览(46)
  • 使用Mongoose populate实现多表关联存储与查询,内附完整代码

    mongodb 不是传统的关系型数据库,我们可以使用 monogoose 方便的将多个表关联起来,实现一对多、多对多的数据表存储和查询功能。 本文已最常见的一对多关系模型,介绍简单的数据模型定义、存储、查询。 我们创建一个 Person 模型和一个 Story 模型,其中一个 Person 对应多个

    2024年02月03日
    浏览(34)
  • MySQL基础篇补充 | 多表查询中使用SQL99实现7种JOIN操作、SQL99语法新特性

    目录 一:多表查询中使用SQL99实现7种JOIN操作  二:SQL99语法新特性 1. 自然连接Natural 2. USING连接 在多表查询中,除了遇到最多的内连接、左外连接和右外连接,还有其它的连接方式;接下来就聊聊其它的连接方式,如下图:  ​​​​​​ 并且在正式讲解之前,需要先了解

    2024年02月03日
    浏览(45)
  • hive之Map Join使用方法

    目录 介绍 mapjoin的使用方法 结语         MAPJION会把小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map端是进行了join操作,省去了reduce运行的时间,算是hive中的一种优化。    如上图中的流程,首先Task A在客户端本地执行,负责

    2024年02月05日
    浏览(26)
  • 业务数据LEFT JOIN 多表查询慢--优化操作

    首先你会想到,给表加索引,那么mysql会给主键自动建立索引吗? 会的,当然会。 在我们查询的业务表操作的时候,表业务数据庞大起来的时候,以及left join多的时候,甚至多表关联到几十张表的时候,查询是慢到不行。 这时候,只需要给表join查询的字段,及表结构,进行索

    2024年02月02日
    浏览(40)
  • Hadoop 多表关联

    一、实例描述 多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。 输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址列,包含地址名列和地址编号列。要求从输入数据中找出工厂

    2023年04月08日
    浏览(69)
  • MySQL的多表关联查询

    多表关联查询是使用一条SQL语句,将关联的多张表的数据查询出来。 交叉查询就是将多张表的数据没有条件地连接在一起进行展示。 1.1.1 语法 使用交叉查询类别和商品 通过查询结果可以看到,交叉查询其实是一种错误的做法,在查询到的结果集中有大量的错误数据,称交叉

    2024年02月14日
    浏览(37)
  • FLink多表关联实时同步

    Oracle-Debezium-Kafka-Flink-PostgreSQL Flink消费Kafka中客户、产品、订单(ID)三张表的数据合并为一张订单(NAME)表。 Oracle内创建三张表 PostgreSQL内创建一张表 其他前置环境 Oracle、PostgreSQL、Kafka、FLink、Debezium-Server的部署参见本系列其他文章搭建。 采用前置条件中的语句建表即可,

    2023年04月25日
    浏览(63)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包