背景
在之前的文章Apache Hudi初探(六)(与spark的结合)
中,我们没有过多的解释Spark中hudi Compaction的实现,在这里详细说一下
注意:在hudi中有同步,异步Compaction的概念,为了保证写入的及时性和数据读取的时效性,hudi在一步compaction的过程中会引入一个后台线程进行compaction,所以异步Compaction又分同步compaction计划生成和异步compaction计划生成和最终的Compaction计划的执行(而compaction计划的执行是在后台异步执行的)
对应的配置为:
hoodie.compact.inline 是否是同步进行Compaction
hoodie.compact.schedule.inline 是够是异步Compaction计划生成
详说杂谈
这里直接以同步Compaction为例,直接到BaseHoodieWriteClient runTableServicesInline方法:
if (config.inlineCompactionEnabled()) {
runAnyPendingCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
inlineCompaction(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
}
假设hoodie.compact.inline为true,也就是config.inlineCompactionEnabled()为true,那就直接执行Compaction流程:
-
runAnyPendingCompactions
重新运行上次失败的Compaction计划- 过滤出没有完成的Compaction计划,并执行Compact操作,具体的为SparkRDDWriteClient.compact
这里我们先略过,假设没有pending的Compaction操作,重点看inlineCompaction
- 过滤出没有完成的Compaction计划,并执行Compact操作,具体的为SparkRDDWriteClient.compact
-
metadata.addMetadata
把属性hoodie.compact.inline记录到元数据中去 -
inlineCompaction执行当前Commit的compaction操作
Option<String> compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata); compactionInstantTimeOpt.ifPresent(compactInstantTime -> { // inline compaction should auto commit as the user is never given control compact(compactInstantTime, true); });
-
inlineScheduleCompaction
最终会调用到scheduleTableServiceInternal如下:
这里会立马调度生成一个Compaction的计划,最终会调用ScheduleCompactionActionExecutor.execute方法... case COMPACT: LOG.info("Scheduling compaction at instant time :" + instantTime); Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf) .scheduleCompaction(context, instantTime, extraMetadata); return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); ...
HoodieCompactionPlan plan = scheduleCompaction(); if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { extraMetadata.ifPresent(plan::setExtraMetadata); HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); try { table.getActiveTimeline().saveToCompactionRequested(compactionInstant, TimelineMetadataUtils.serializeCompactionPlan(plan)); } catch (IOException ioe) { throw new HoodieIOException("Exception scheduling compaction", ioe); } return Option.of(plan); }
-
scheduleCompaction
该方法主要是生成一个调度Compaction的计划文章来源:https://www.toymoban.com/news/detail-609231.htmlboolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); if (compactable) { LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); try { SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) .collect(Collectors.toSet()); // exclude files in pending clustering from compaction. fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan: " + config.getTableName()); return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
-
needCompact(config.getInlineCompactTriggerStrategy())
hoodie.compact.inline.trigger.strategy默认是NUM_COMMITS策略,也就是按照delta commit的次数来的,次数也就是按照hoodie.compact.inline.max.delta.commits(默认是5),也就是5次delta commit的提交就会进行产生一个Compaction计划 -
fgInPendingCompactionAndClustering =
获取一系列处于pending的FileGroupId,这在产生Compaction一系列的CompactionOperation的时候是会被排除掉的 -
compactor.generateCompactionPlan 产生一个Compaction的计划
排除正在pending的FileGroupId后,生成由baseFile,partitionPath,logFiles,还有Compaction策略组成的CompactionOperation,
hoodie.compaction.strategy默认策略是LogFileSizeBasedCompactionStrategy,最后组装成HoodieCompactionOperation数据的数据结构,最后返回一个HoodieCompactionPlan
-
needCompact(config.getInlineCompactTriggerStrategy())
-
table.getActiveTimeline().saveToCompactionRequeste
这步操作主要是把生成的Compaction plan序列化成字节,并保存在相应的文件中,并生成一个Compaction的Request文章来源地址https://www.toymoban.com/news/detail-609231.html
-
-
compact 真正的执行Compaction操作
最终调用的是SparkRDDWriteClient的compact... HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime); HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime); } return compactionMetadata;
-
table.compact(context, compactionInstantTime) 实际进行Compaction的操作
最终调用的是RunCompactionActionExecutor.execute方法HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); ... HoodieData<WriteStatus> statuses = compactor.compact( context, compactionPlan, table, configCopy, instantTime, compactionHandler); ... List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); return compactionMetadata
- compactionPlan获取上面inlineScheduleCompaction生成的Compaction 计划
-
compactor.compact 执行 Compact操作
最终调用的是HoodieSparkCopyOnWriteTable的handleUpdate和handleInsert方法完成数据的更新和插入
这里同时会把Request状态的compcation变成Inflight的状态 - compactionMetadata返回一些元数据信息
-
completeTableService
该方法主要是把compaction的元数据信息写入到元数据,并把Inflight状态的compcation变成Complete的状态
-
table.compact(context, compactionInstantTime) 实际进行Compaction的操作
-
inlineScheduleCompaction
到了这里,关于Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!