从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

这篇具有很好参考价值的文章主要介绍了从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

从’discover.partitions’='true’分析Hive的TBLPROPERTIES

前言

Hive3.1.2先建表:

show databases ;

use db_lzy;

show tables ;

create external table if not exists test_external_20230502(
    id int,
    comment1 string,
    comment2 string
)
stored as parquet 
;

create external table if not exists test_external_par_20230502(
    id int,
    comment1 string,
    comment2 string
)
partitioned by (
    dt string
    )
stored as parquet
;

然后查看建表语句:

show create table test_external_20230502;
show create table test_external_par_20230502;

可以看到:

hive (db_lzy)> show create table test_external_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028671')
Time taken: 0.181 seconds, Fetched: 15 row(s)
hive (db_lzy)>
             > show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'transient_lastDdlTime'='1683028770')
Time taken: 0.121 seconds, Fetched: 17 row(s)
hive (db_lzy)>

可以看到都有TBLPROPERTIES表属性的信息,这个信息在写DDL时并没有指定,显然是自动生成的。

在CDP建表有时候会默认添加一个表属性:

'discover.partitions'='true'

这个表属性用于表分区的自动发现。接下来就从这个属性入手,分析Hive的TBLPROPERTIES。

走JDBC增加表属性

先写个简单的JDBC:

package com.zhiyong;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * @program: zhiyong_study
 * @description: 测试Hive的TblProperties
 * @author: zhiyong
 * @create: 2023-05-02 19:31
 **/
public class TblpropertiesTest1 {
    public static void main(String[] args) throws Exception{
        final String HIVE_JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
        final String HIVE_HOST = "192.168.88.101";
        final String HIVE_PORT = "10000";
        final String HIVE_USER = "root";
        final String HIVE_PASSWORD = "123456";

        String HIVE_URL = "jdbc:hive2://" + HIVE_HOST + ":" + HIVE_PORT + "/default";

        Connection conn = null;
        Statement stmt = null;
        ResultSet resultSet = null;

        String sql_Alter_TBLPROPERTIES = "alter table db_lzy.test_external_par_20230502 set TBLPROPERTIES('zhiyong_key1' = 'zhiyong_value1')";


        Class.forName(HIVE_JDBC_DRIVER);
        conn = DriverManager.getConnection(HIVE_URL, HIVE_USER, HIVE_PASSWORD);
        stmt = conn.createStatement();

        System.out.println("修改表属性:");
        stmt.execute(sql_Alter_TBLPROPERTIES);

        }
    }
}

打断点调试来查看调用的堆栈,可以发现从HiveConnectionTCLIServiceTBinaryProtocolTSocket,调用过程和之前的:

https://lizhiyong.blog.csdn.net/article/details/129742904

分析的基本一致,底层就是走了Thrift协议。那么从语言无关的ssql-Client这端显然是看不出啥端倪了。更多有用的源码信息是在server端,暂时先不看。

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1')
Time taken: 0.155 seconds, Fetched: 20 row(s)
hive (db_lzy)>

可以看到已经将配置项写入了表属性。

show databases ;
use db_hive_metastore;
show tables ;
select * from TBLS;

显示:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

从表名可以找到刚才修改了表属性的就是这个TBL_ID=17的表。

select * from TABLE_PARAMS
where TBL_ID=17;

显示:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

显然set的表属性就记录在MetaStore元数据库【笔者用的Apache3.1.2是存到了MySQL】的TABLE_PARAMS表中。

可以看到这个表还记录了表的外部属性。所以SQL Boy们删除表数据时有时候会先执行:

alter table 库名.表名 set TBLPROPERTIES ('EXTERNAL'='FALSE');

比较奇怪的是在元数据表中没找到:

alter table 库名.表名 set TBLPROPERTIES  ("external.table.purge"="true");

这个表属性对应的配置项。

但是在HiveStrictManagedMigration.java

boolean migrateToExternalTable(Table tableObj, TableType tableType) throws HiveException {
  String msg;
  switch (tableType) {
  case MANAGED_TABLE:
    if (AcidUtils.isTransactionalTable(tableObj)) {
      msg = createExternalConversionExcuse(tableObj,
          "Table is a transactional table");
      LOG.debug(msg);
      return false;
    }
    LOG.info("Converting {} to external table ...", getQualifiedName(tableObj));
    if (!runOptions.dryRun) {
      String command = String.format(
          "ALTER TABLE %s SET TBLPROPERTIES ('EXTERNAL'='TRUE', 'external.table.purge'='true')",
          getQualifiedName(tableObj));
      runHiveCommand(command);
    }
    return true;
  case EXTERNAL_TABLE:
    msg = createExternalConversionExcuse(tableObj,
        "Table is already an external table");
    LOG.debug(msg);
    break;
  default: // VIEW/MATERIALIZED_VIEW
    msg = createExternalConversionExcuse(tableObj,
        "Table type " + tableType + " cannot be converted");
    LOG.debug(msg);
    break;
  }
  return false;
}

可以看到和这个配置项相关的内容。说明在Hive3.1.2中该配置项有效。并且可以看出通过purge这个属性,使得外部表可以挂载文件。。。

手动新增一条:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

此时:

hive (db_lzy)> show create table test_external_par_20230502;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test_external_par_20230502`(
  `id` int,
  `comment1` string,
  `comment2` string)
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://zhiyong-1/zhiyong-1/user/hive/warehouse/db_lzy.db/test_external_par_20230502'
TBLPROPERTIES (
  'bucketing_version'='2',
  'last_modified_by'='hadoop',
  'last_modified_time'='1683030293',
  'transient_lastDdlTime'='1683030293',
  'zhiyong_key1'='zhiyong_value1',
  'zhiyong_key2'='zhiyong_value2')
Time taken: 0.098 seconds, Fetched: 21 row(s)
hive (db_lzy)>

重新拿到的建表DDL已经出现了这个配置。。。

至此就可以明白为神马Spark使用Overwrite模式写入时会重建表,并且增加一大坨的spark.sql相关的配置。。。也能够明白为神马IceBerg和Hudi会选用Hive的MetaStore做元数据存储。。。也能够明白为神马Flink会使用Hive的MetaStore做Catalog。。。

就地取材,拿这个表当kv键值对存储,就可以保存表的多种属性,存取都很方便。并且可以直接复用Hive的MetaStore,解析别的表的元数据也很方便。走Thrift来进行rpc远程调用和、走JDBC读源数据表,都可以无缝衔接Hive生态圈的多种组件。这可能也是云原生时代Hadoop和Yarn被OSS和K8S暴打,但是Hive还依旧霸气的原因。。。存储、运算、资源调度都可以用别的方式取代,但是文件映射表的MetaStore一时半会儿还没有太好的替代品。

在Github查找

笔者在CDP7.1.5建表时就见识过:

'discover.partitions'='true'

这个属性了。。。所以一定不是无中生有的。。。既然Apache3.1.2中找不到,那就到Github查看是否别的版本有它。。。

PartitionManagementTask类

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java

可以看到:

package org.apache.hadoop.hive.metastore;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.TimeValidator;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * Partition management task is primarily responsible for partition retention and discovery based on table properties.
 *
 * Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
 * metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
 * Dropping partitions after retention period will also delete the data in that partition.
 *
 * Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
 * for newly added partition directories and create partition objects if it does not exist. Also, if partition object
 * exist and if corresponding directory does not exists under table location then the partition object will be dropped.
 *
 */
public class PartitionManagementTask implements MetastoreTaskThread {
  private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class);
  public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
  public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
  private static final Lock lock = new ReentrantLock();
  // these are just for testing
  private static int completedAttempts;
  private static int skippedAttempts;

  private Configuration conf;

  @Override
  public long runFrequency(TimeUnit unit) {
    return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
  }

  @Override
  public void setConf(Configuration configuration) {
    // we modify conf in setupConf(), so we make a copy
    conf = new Configuration(configuration);
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  private static boolean partitionDiscoveryEnabled(Map<String, String> params) {
    return params != null && params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
            params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
  }

  @Override
  public void run() {
    if (lock.tryLock()) {
      skippedAttempts = 0;
      String qualifiedTableName = null;
      IMetaStoreClient msc = null;
      try {
        msc = new HiveMetaStoreClient(conf);
        String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
        String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
        String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
        String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
        Set<String> tableTypesSet = new HashSet<>();
        for (String type : tableTypes.split(",")) {
          try {
            tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
          } catch (IllegalArgumentException e) {
            // ignore
            LOG.warn("Unknown table type: {}", type);
          }
        }
        // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
        // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
        if (tableTypesSet.isEmpty()) {
          LOG.info("Skipping partition management as no table types specified");
          return;
        }

        StringBuilder filterBuilder = new StringBuilder()
            .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
            .append("discover__partitions").append(" like \"true\" ");
        boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
        boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
        if (!managed && external) {
          // only for external tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
        } else if (managed && !external) {
          // only for managed tables
          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
              .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
        }
        if (!tablePattern.trim().isEmpty()) {
          filterBuilder.append(" and ")
              .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
              .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
        }

        List<String> databases = msc.getDatabases(catalogName, dbPattern);
        List<TableName> candidates = new ArrayList<>();
        for (String db : databases) {
          Database database = msc.getDatabase(catalogName, db);
          if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
            LOG.debug("Skipping table under database: {}", db);
            continue;
          }
          if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
            LOG.info("Skipping table belongs to database {} being failed over.", db);
            continue;
          }
          List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
              filterBuilder.toString(), -1);
          tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
        }

        if (candidates.isEmpty()) {
          LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
          return;
        }

        // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
        // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
        // defeats the purpose of thread pooled msck repair.
        int threadPoolSize = MetastoreConf.getIntVar(conf,
          MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
        final ExecutorService executorService = Executors
          .newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
        CountDownLatch countDownLatch = new CountDownLatch(candidates.size());
        LOG.info("Found {} candidate tables for partition discovery", candidates.size());
        setupMsckPathInvalidation();
        Configuration msckConf = Msck.getMsckConf(conf);
        for (TableName table : candidates) {
          // this always runs in 'sync' mode where partitions can be added and dropped
          MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(),
            null, null, true, true, true, -1);
          executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));
        }
        countDownLatch.await();
        executorService.shutdownNow();
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        if (msc != null) {
          msc.close();
        }
        lock.unlock();
      }
      completedAttempts++;
    } else {
      skippedAttempts++;
      LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
    }
  }

  public static long getRetentionPeriodInSeconds(final Table table) {
    String retentionPeriod;
    long retentionSeconds = -1;
    if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
      retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      if (retentionPeriod.isEmpty()) {
        LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
          PARTITION_RETENTION_PERIOD_TBLPROPERTY);
      } else {
        try {
          TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
          timeValidator.validate(retentionPeriod);
          retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS);
        } catch (IllegalArgumentException e) {
          LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);
          // will return -1
        }
      }
    }
    return retentionSeconds;
  }

  private void setupMsckPathInvalidation() {
    // if invalid partition directory appears, we just skip and move on. We don't want partition management to throw
    // when invalid path is encountered as these are background threads. We just want to skip and move on. Users will
    // have to fix the invalid paths via external means.
    conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
  }

  private static class MsckThread implements Runnable {
    private MsckInfo msckInfo;
    private Configuration conf;
    private String qualifiedTableName;
    private CountDownLatch countDownLatch;

    MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) {
      this.msckInfo = msckInfo;
      this.conf = conf;
      this.qualifiedTableName = qualifiedTableName;
      this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
      try {
        Msck msck = new Msck( true, true);
        msck.init(conf);
        msck.repair(msckInfo);
      } catch (Exception e) {
        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
      } finally {
        // there is no recovery from exception, so we always count down and retry in next attempt
        countDownLatch.countDown();
      }
    }
  }

  @VisibleForTesting
  public static int getSkippedAttempts() {
    return skippedAttempts;
  }

  @VisibleForTesting
  public static int getCompletedAttempts() {
    return completedAttempts;
  }
}

功夫不负有心人,终于在这个类找到了这个表属性。这个任务显然就是守护进程,定时刷新Hive表的MetaStore,从而自动删除多余的分区【对应HDFS只有路径没有文件,但是存在于MetaStore的分区】并且添加缺失的分区【对应HDFS只有路径和文件,但是不存在于MetaStore的分区】。

在获取到同步锁后,就会跑MSCK命令。

runFrequency方法中获取到的刷新频率,显然就是线程池的吊起频率。。。

package org.apache.hadoop.hive.metastore;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * Any task that will run as a separate thread in the metastore should implement this
 * interface.
 */
public interface MetastoreTaskThread extends Configurable, Runnable {

  /**
   * Get the frequency at which the thread should be scheduled in the thread pool.  You must call
   * {@link #setConf(Configuration)} before calling this method.
   * @param unit TimeUnit to express the frequency in.
   * @return frequency
   */
  long runFrequency(TimeUnit unit);
}

可惜在3.1.2的Hive中还没有这个类:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

那么配置类中也就找不到对应的配置项:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

还是要去Github找新一点的版本。。。

MetastoreConf类

在:https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java

可以看到:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

// Partition management task params
    PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency",
      "metastore.partition.management.task.frequency",
      6, TimeUnit.HOURS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
      "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
      "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
      "management will look for partitions in table location and add partitions objects for it in metastore.\n" +
      "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
      "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
      "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
      "than the specified retention period will be automatically dropped from metastore along with the data."),

显然这个配置项也不是出现在Hive3.1.2,而是更新的版本!!!而且,默认值是6小时???

HiveMetaStore

HiveMetaStore中:

private static void startRemoteOnlyTasks(Configuration conf) throws Exception {
    if (MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)) {
        ThreadPool.initialize(conf);
        Collection<String> taskNames = MetastoreConf.getStringCollection(conf, ConfVars.TASK_THREADS_REMOTE_ONLY);
        Iterator var2 = taskNames.iterator();

        while(var2.hasNext()) {
            String taskName = (String)var2.next();
            MetastoreTaskThread task = (MetastoreTaskThread)JavaUtils.newInstance(JavaUtils.getClass(taskName, MetastoreTaskThread.class));
            task.setConf(conf);
            long freq = task.runFrequency(TimeUnit.MILLISECONDS);
            ThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
        }

    }
}

可以看到会按照频率来使用线程池调度这些任务。单位是毫秒。。。

COMPACTOR_INITIATOR_ON("metastore.compactor.initiator.on", "hive.compactor.initiator.on", false, "Whether to run the initiator and cleaner threads on this metastore instance or not.\nSet this to true on one instance of the Thrift metastore service as part of turning\non Hive transactions. For a complete list of parameters required for turning on\ntransactions, see hive.txn.manager."),

又是和:

/**
 * An interface that allows Hive to manage transactions.  All classes
 * implementing this should extend {@link HiveTxnManagerImpl} rather than
 * implementing this directly.
 */
public interface HiveTxnManager {
}

有关的。显然这是保证Hive事务的接口。。。

至此,可知Hive会在MetastoreConf类写一些编码级的配置【堆内存中,可以在session会话中set部分值】,并且在元数据表TABLE_PARAMS固化一些kv键值对配置。这样,Hive的MetaStore守护进程就可以从这2种方式存储的kv键值根据key拿到value,进而执行事务的操作。

CDP7.1.5【或者更早的发行版】就支持的配置项,对应Apache这边需要alpha4.0才支持。。。白piao版总是要比缴纳了保护费的企业版差劲一些,不管是稳定性还是功能性。。。Kylin4.0企业版的功能在开源的5.0姗姗来迟。。。也是这道理。。。甜点要先服务付费客户,卖剩下的给白piao的客户免费品尝。。。不过能白piao就不错了,还要啥自行车。。。

自动分区发现的使用

Apache版本

按照Apache Hive的官方文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-DiscoverPartitions

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

这是Hive4.0的特性。

Discover Partitions

Table property “discover.partitions” can now be specified to control automatic discovery and synchronization of partition metadata in Hive Metastore.

When Hive Metastore Service (HMS) is started in remote service mode, a background thread (PartitionManagementTask) gets scheduled periodically every 300s (configurable via metastore.partition.management.task.frequency config) that looks for tables with “discover.partitions” table property set to true and performs MSCK REPAIR in sync mode. If the table is a transactional table, then Exclusive Lock is obtained for that table before performing MSCK REPAIR. With this table property, “MSCK REPAIR TABLE table_name SYNC PARTITIONS” is no longer required to be run manually.

Version information

As of Hive 4.0.0 (HIVE-20707).

对应这个issue:

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES

https://issues.apache.org/jira/browse/HIVE-20707。

Partition Retention

Table property “partition.retention.period” can now be specified for partitioned tables with a retention interval. When a retention interval is specified, the background thread running in HMS (refer Discover Partitions section), will check the age (creation time) of the partition and if the partition’s age is older than the retention period, it will be dropped. Dropping partitions after retention period will also delete the data in that partition. For example, if an external partitioned table with ‘date’ partition is created with table properties “discover.partitions”=“true” and “partition.retention.period”=“7d” then only the partitions created in last 7 days are retained.

Version information

As of Hive 4.0.0 (HIVE-20707).

这个特性是分区保留。。。

CDP版本

也就只能在CDP去使用这个特性了。。。Alpha的Apache Hive4.0估计一时半会儿也没什么人去用。

在CDP中使用Hive时,参照官方的说法:https://docs.cloudera.com/runtime/7.0.3/using-hiveql/topics/hive-automate-msck.html

Automated partition discovery and repair is useful for processing log data, and other data, in Spark and Hive catalogs. You learn how to set the partition discovery parameter to suit your use case. An aggressive partition discovery and repair configuration can delay the upgrade process.

Apache Hive can automatically and periodically discover discrepancies in partition metadata in the Hive metastore and in corresponding directories, or objects, on the file system. After discovering discrepancies, Hive performs synchronization.

The discover.partitions table property enables and disables synchronization of the file system with partitions. In external partitioned tables, this property is enabled (true) by default when you create the table. To a legacy external table (created using a version of Hive that does not support this feature), you need to add discover.partitions to the table properties to enable partition discovery.

By default, the discovery and synchronization of partitions occurs every 5 minutes. This is too often if you are upgrading and can result in the Hive DB being queried every few milliseconds, leading to performance degradation. During upgrading the high frequency of batch routines dictates running discovery and synchronization infrequently, perhaps hourly or even daily. You can configure the frequency as shown in this task.

  1. Assuming you have an external table created using a version of Hive that does not support partition discovery, enable partition discovery for the table.

    ALTER TABLE exttbl SET TBLPROPERTIES ('discover.partitions' = 'true');
    
  2. In Cloudera Manager, click Clusters > Hive > Configuration, search for Hive Server Advanced Configuration Snippet (Safety Valve) for hive-site.xml.

  3. Add the following property and value to hive-site.xml: Property: metastore.partition.management.task.frequency Value: 600.

    This action sets synchronization of partitions to occur every 10 minutes expressed in seconds. If you are upgrading, consider running discovery and synchonization once every 24 hours by setting the value to 86,400 seconds.

也就是说,CDP的Hive默认外部的分区表是开启了分区自动发现特性,并且默认是5分钟吊起一次。这里设置时单位是秒。至少CDP7.0.3就已经可用了。。。

使用

如果不符合期望,可以手动修改:

ALTER TABLE 表名 SET TBLPROPERTIES ('discover.partitions'='true');
ALTER TABLE 表名 SET TBLPROPERTIES ('metastore.partition.management.task.frequency' = 600);

这。。。是写到元数据表做持久化的,和Apache的堆内存对象又不同了。。。

不过按照:

/**
 * Get the variable as a long indicating a period of time
 * @param conf configuration to retrieve it from
 * @param var variable to retrieve
 * @param outUnit Timeout to return value in
 * @return value, or default value if value not in config file
 */
public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
  assert var.defaultVal.getClass() == TimeValue.class;
  String val = conf.get(var.varname);

  if (val == null) {
    // Look for it under the old Hive name
    val = conf.get(var.hiveName);
  }

  if (val != null) {
    return convertTimeStr(val, ((TimeValue)var.defaultVal).unit, outUnit);
  } else {
    return outUnit.convert(((TimeValue)var.defaultVal).val, ((TimeValue)var.defaultVal).unit);
  }
}

如果堆内存没有,就去配置中拿,也没啥毛病。。。

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务立即被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

别的问题

自动发现是5分钟,也就是近实时。如果一个任务跑完,下一个任务被吊起,这5分钟【甚至更久】里Hive On Tez任务是读不到新分区的!!!Spark或者Flink直接读Parquet文件是另一回事。。。

所以:https://lizhiyong.blog.csdn.net/article/details/127680034

笔者之前做平台开发时,还需要+一个msck或者alter table add partition,只有这条命令也刷完了,才能保证最终一致性!!!

总结

通过一顿操作猛如虎。。。笔者找到了Hive的MetaStore存储的元数据,并且分析出了和表配置相关的运行机理。恰巧,也意识到了白piao版在功能上的滞后性。。。还是有所收获的。。。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/130468723

从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES文章来源地址https://www.toymoban.com/news/detail-456654.html

到了这里,关于从‘discover.partitions‘=‘true‘分析Hive的TBLPROPERTIES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • vellum (Discovering Houdini VellumⅡ柔体系统)学习笔记

    视频地址: https://www.bilibili.com/video/BV1ve411u7nE?p=3spm_id_from=pageDrivervd_source=044ee2998086c02fedb124921a28c963(搬运) 1. vellum hair 带旋转(orient) 2.开启 mis orienation的区别 如果没开启没法读取正确的orient值 3.orient pin pin 开启orient pin的时候 bend的大小就是跟随的强度 (可以用remove 移除)

    2024年02月11日
    浏览(28)
  • 解决Elasticsearch集群 master_not_discovered_exception 异常

    错误描述 查看集群健康返回以下错误: 我通过docker命令在三台机器上分别启动es应用后,单个节点可以通过网络访问,但是他们彼此之间却显示无法通信,导致选举失败,发现不了主节点。 问题排查 查看es日志发现: java.net.NoRouteToHostException: No route to host (Host unreachable) 重要

    2024年02月01日
    浏览(41)
  • python接口自动化(二十六)--批量执行用例 discover(详解)

    我们在写用例的时候,单个脚本的用例好执行,那么多个脚本的时候,如何批量执行呢?这时候就需要用到 unittest 里面的 discover 方法来加载用例了。加载用例后,用 unittest 里面的 TextTestRunner 这里类的 run 方法去一次执行多个脚 本的用例。那么前边介绍那么多都是半道开始,

    2024年02月13日
    浏览(81)
  • Hive数据分析案例——汽车销售数据分析

    使用HiveQL实现来实现,本数据为上牌汽车的销售数据,分为乘用车辆和商用车辆。数据包含销售相关数据与汽车具体参数。数据项包括:时间、销售地点、邮政编码、车辆类型、车辆型号、制造厂商名称、排量、油耗、功率、发动机型号、燃料种类、车外廓长宽高、轴距、前

    2024年02月09日
    浏览(181)
  • Hive数据分析实验报告

    1 完成本地数据user_log文件上传至HDFS中 2 完成HDFS文件上传至Hive中 用户行为日志user_log.csv,日志中的字段定义如下: user_id | 买家id item_id | 商品id cat_id | 商品类别id merchant_id | 卖家id brand_id | 品牌id month | 交易时间:月 day | 交易事件:日 action | 行为,取值范围{0,1,2,3},0表示点击,

    2024年02月06日
    浏览(47)
  • ES集群报错:master_not_discovered_exception 503

     在布置集群(设置node-1001节点)索引分片的时候,报错如下: 可能是集群中的其他es节点( node-1002,node-1003.... )没有打开

    2024年02月16日
    浏览(52)
  • kibana查看和展示es数据(index pattern、discover、dashboard)

    天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。 提前在es中创建好数据 批量新增数据请参考文章链接中的11.2批量新增:kibana操作es增删

    2024年02月04日
    浏览(42)
  • 电影票房之数据分析(Hive)

    #进入hive #在hive中创建数据库 mydb #使用数据库 mydb #创建表moviecleaned并使用\\\"/t\\\"分割字段 #将本地清洗后的数据导入moviecleaned表中 #创建top10_boxoffice表,用来存放数据查询的结果 #查询,并将结果导入top10_boxoffice表中 #创建boxoffice_national_day表,用来存放数据查询的结果 #查询,并将

    2024年02月13日
    浏览(57)
  • 搭建hadoop和hive分析脚本

    搭建环境 hdfs操作 创建路径 上传 HIVE数据库操作 创库 建表 导入数据 导出文件 #输出路径 先选 数据分析校验输出 Hive会自动生成输出文件,并将结果写入这些文件中。你不需要提前创建 part-r-00000 文件,Hive会根据配置自动创建文件并命名 # 联合输出 $ javac -classpath $(hadoop cla

    2024年02月16日
    浏览(44)
  • hive窗口分析函数使用详解系列一

    Hive的聚合函数衍生的窗口函数在我们进行数据处理和数据分析过程中起到了很大的作用 在Hive中,窗口函数允许你在结果集的行上进行计算,这些计算不会影响你查询的结果集的行数。 Hive提供的窗口和分析函数可以分为聚合函数类窗口函数,分组排序类窗口函数,偏移量计

    2024年04月08日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包