Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作

这篇具有很好参考价值的文章主要介绍了Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在之前的文章Apache Hudi初探(六)(与spark的结合)
中,我们没有过多的解释Spark中hudi Compaction的实现,在这里详细说一下
注意:在hudi中有同步,异步Compaction的概念,为了保证写入的及时性和数据读取的时效性,hudi在一步compaction的过程中会引入一个后台线程进行compaction,所以异步Compaction又分同步compaction计划生成和异步compaction计划生成和最终的Compaction计划的执行(而compaction计划的执行是在后台异步执行的)
对应的配置为:
hoodie.compact.inline 是否是同步进行Compaction
hoodie.compact.schedule.inline 是够是异步Compaction计划生成

详说杂谈

这里直接以同步Compaction为例,直接到BaseHoodieWriteClient runTableServicesInline方法

   if (config.inlineCompactionEnabled()) {
     runAnyPendingCompactions(table);
     metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
     inlineCompaction(extraMetadata);
   } else {
     metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
   }

假设hoodie.compact.inlinetrue,也就是config.inlineCompactionEnabled()true,那就直接执行Compaction流程:

  • runAnyPendingCompactions
    重新运行上次失败的Compaction计划

    • 过滤出没有完成的Compaction计划,并执行Compact操作,具体的为SparkRDDWriteClient.compact
      这里我们先略过,假设没有pending的Compaction操作,重点看inlineCompaction
  • metadata.addMetadata
    把属性hoodie.compact.inline记录到元数据中去

  • inlineCompaction执行当前Commit的compaction操作

      Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata);
       compactionInstantTimeOpt.ifPresent(compactInstantTime -> {
         // inline compaction should auto commit as the user is never given control
         compact(compactInstantTime, true);
       });
    
    • inlineScheduleCompaction
      最终会调用到scheduleTableServiceInternal如下:
       ...
       case COMPACT:
         LOG.info("Scheduling compaction at instant time :" + instantTime);
         Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
             .scheduleCompaction(context, instantTime, extraMetadata);
         return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
       ...
      
      这里会立马调度生成一个Compaction的计划,最终会调用ScheduleCompactionActionExecutor.execute方法
       HoodieCompactionPlan plan = scheduleCompaction();
       if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
         extraMetadata.ifPresent(plan::setExtraMetadata);
         HoodieInstant compactionInstant =
             new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
         try {
           table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
               TimelineMetadataUtils.serializeCompactionPlan(plan));
         } catch (IOException ioe) {
           throw new HoodieIOException("Exception scheduling compaction", ioe);
         }
         return Option.of(plan);
       }
      
      • scheduleCompaction
        该方法主要是生成一个调度Compaction的计划

        boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
        if (compactable) {
          LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
          try {
            SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
            Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
                .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
                .collect(Collectors.toSet());
            // exclude files in pending clustering from compaction.
            fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
            context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan: " + config.getTableName());
            return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
        
        • needCompact(config.getInlineCompactTriggerStrategy())
          hoodie.compact.inline.trigger.strategy默认是NUM_COMMITS策略,也就是按照delta commit的次数来的,次数也就是按照hoodie.compact.inline.max.delta.commits(默认是5),也就是5次delta commit的提交就会进行产生一个Compaction计划
        • fgInPendingCompactionAndClustering =
          获取一系列处于pending的FileGroupId,这在产生Compaction一系列的CompactionOperation的时候是会被排除掉的
        • compactor.generateCompactionPlan 产生一个Compaction的计划
          排除正在pending的FileGroupId后,生成由baseFile,partitionPath,logFiles,还有Compaction策略组成的CompactionOperation
          hoodie.compaction.strategy默认策略是LogFileSizeBasedCompactionStrategy,最后组装成HoodieCompactionOperation数据的数据结构,最后返回一个HoodieCompactionPlan
      • table.getActiveTimeline().saveToCompactionRequeste
        这步操作主要是把生成的Compaction plan序列化成字节,并保存在相应的文件中,并生成一个Compaction的Request文章来源地址https://www.toymoban.com/news/detail-609231.html

    • compact 真正的执行Compaction操作
      最终调用的是SparkRDDWriteClient的compact
         ...
        HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
        if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
          completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
        }
        return compactionMetadata;
      
      • table.compact(context, compactionInstantTime) 实际进行Compaction的操作
        最终调用的是RunCompactionActionExecutor.execute方法
           HoodieCompactionPlan compactionPlan =
             CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime);
           ...
            HoodieData<WriteStatus> statuses = compactor.compact(
                   context, compactionPlan, table, configCopy, instantTime, compactionHandler);
           ...
           List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
           HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
           return compactionMetadata
        
        
        • compactionPlan获取上面inlineScheduleCompaction生成的Compaction 计划
        • compactor.compact 执行 Compact操作
          最终调用的是HoodieSparkCopyOnWriteTable的handleUpdate和handleInsert方法完成数据的更新和插入
          这里同时会把Request状态的compcation变成Inflight的状态
        • compactionMetadata返回一些元数据信息
      • completeTableService
        该方法主要是把compaction的元数据信息写入到元数据,并把Inflight状态的compcation变成Complete的状态

到了这里,关于Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Hudi初探(一)(与flink的结合)

    和 Spark 的使用方式不同, flink 结合 hudi 的方式,是以 SPI 的方式,所以不需要像使用 Spark 的方式一样, Spark 的方式如下: (这里不包括 org.apache.spark.sql.sources.DataSourceRegister ) Flink 结合 Hudi 的方式,只需要引入了对应的jar包即可,以 SPI 的方式: 其中 HoodieTableFactory 是读写 H

    2024年02月16日
    浏览(32)
  • 大数据Hadoop之——Apache Hudi 数据湖实战操作(FlinkCDC)

    Hudi(Hadoop Upserts Deletes and Incrementals) ,简称 Hudi ,是一个 流式数据湖平台 ,关于Hudi的更多介绍可以参考我以下几篇文章: 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi 大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合) 这里主要讲解Hive、Trino、Starr

    2023年04月20日
    浏览(39)
  • Apache hudi 核心功能点分析

    文中部分代码对应 0.14.0 版本 初始的需求是Uber公司会有很多记录级别的更新场景,Hudi 在Uber 内部主要的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地将这两条数据流进行拼接,得到乘客

    2024年02月02日
    浏览(32)
  • Apache Hudi Timeline Server介绍

    Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。 Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状

    2024年02月12日
    浏览(28)
  • 提升 Apache Hudi Upsert 性能的三个建议

    Apache Hudi 社区一直在快速发展,各公司正在寻找方法来利用其强大的功能来有效地摄取和管理大规模数据集。 每周社区都会收到一些常见问题,最常见的问题与 Hudi 如何执行更新插入有关,以确保以低延迟访问最新数据。 快速更新插入的主要考虑因素之一是选择正确的存储

    2024年02月05日
    浏览(42)
  • Apache Hudi 1.x 版本重磅功能展望与讨论

    Apache Hudi 社区正在对Apache Hudi 1.x版本功能进行讨论,欢迎感兴趣同学参与讨论,PR链接:https://github.com/apache/hudi/pull/8679/files 此 RFC 提议对 Hudi 中的事务数据库层进行令人兴奋和强大的重构,以推动未来几年整个社区的持续创新。 在过去的几年里,社区成长(https://git-contributo

    2024年02月07日
    浏览(70)
  • Hudi集成Hive时的异常解决方法 java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat

    使用 Hive CLI 连接 Hive 3.1.2 并查询对应的 Hudi 映射的 Hive 表,发现如下异常: 根据报错信息 Caused by: java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat 推断时缺少相应的 Jar 包所导致的异常。 翻看 Hudi 0.10.0 集成 Hive 的文档,文档链接,可以看到需要将 hudi-hadoop-m

    2024年02月01日
    浏览(57)
  • Apache Hudi 在袋鼠云数据湖平台的设计与实践

    在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种

    2024年02月06日
    浏览(41)
  • 通过源代码修改使 Apache Hudi 支持 Kerberos 访问 Hive 的功能

    本文档主要用于阐释如何基于 Hudi 0.10.0 添加支持 Kerberos 认证权限的功能。 主要贡献: 针对正在使用的 Hudi 源代码进行 Kerberos-support 功能扩展,总修改规模囊括了 12 个文件约 20 处代码共计 约 200 行代码; 对 Hudi 0.10.0 的源代码进行了在保持所有自定义特性的基础上,支持了

    2024年02月14日
    浏览(40)
  • 性能提升30%!袋鼠云数栈基于 Apache Hudi 的性能优化实战解析

    Apache Hudi 是一款开源的数据湖解决方案,它能够帮助企业更好地管理和分析海量数据,支持高效的数据更新和查询。并提供多种数据压缩和存储格式以及索引功能,从而为企业数据仓库实践提供更加灵活和高效的数据处理方式。 在金融领域,企业可以使用 Hudi 来处理大量需要

    2024年02月09日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包