Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)

这篇具有很好参考价值的文章主要介绍了Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现

写hudi真实数据

这里的操作就是在HoodieFlinkWriteClient.upsert方法:

public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
        initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
    table.validateUpsertSchema();
    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
        instantTime, table, records.listIterator());
    HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
    if (result.getIndexLookupDuration().isPresent()) {
      metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
    }
    return postWrite(result, instantTime, table);
  }
  • initTable
    初始化HoodieFlinkTable
  • preWrite
    在这里几乎没什么操作
  • getOrCreateWriteHandle
    创建一个写文件的handle(假如这里创建的是FlinkMergeAndReplaceHandle),这里会记录已有的文件路径,后续FlinkMergeHelper.runMerge会从这里读取数
    注意该构造函数中的init方法,会创建一个ExternalSpillableMap类型的map来存储即将插入的记录,这在后续upsert中会用到
  • HoodieFlinkTable.upsert
    这里进行真正的upsert操作,会调用FlinkUpsertDeltaCommitActionExecutor.execute,最终会调用到BaseFlinkCommitActionExecutor.execute,从而调用到FlinkMergeHelper.newInstance().runMerge
      public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {
           final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
           HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
           if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
             readSchema = baseFileReader.getSchema();
             gWriter = new GenericDatumWriter<>(readSchema);
             gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
           } else {
             gReader = null;
             gWriter = null;
             readSchema = mergeHandle.getWriterSchemaWithMetaFields();
           }
           wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
            Option.of(new UpdateHandler(mergeHandle)), record -> {
          if (!externalSchemaTransformation) {
            return record;
          }
          return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
        });
        wrapper.execute();
    
        。。。
        mergeHandle.close();
    
      }
      
    
    • externalSchemaTransformation=
      这里有hoodie.avro.schema.external.transformation配置(默认是false)用来把在之前schame下的数据转换为新的schema下的数据
    • wrapper.execute()
      这里会最终调用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被调用的地方
       public void write(GenericRecord oldRecord) {
       ...
       if (keyToNewRecords.containsKey(key)) {
        if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
            copyOldRecord = true;
          } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
            copyOldRecord = false;
          }
          writtenRecordKeys.add(key); 
       }
       }
      
      如果keyToNewRecords包含了对应的记录,也就是说会有update的操作的话,就插入新的数据,
      writeUpdateRecord 这里进行数据的更新,并用writtenRecordKeys记录插入的记录
    • mergeHandle.close()
       public List<WriteStatus> close() {
             writeIncomingRecords();
        ...
       }
       ...
       protected void writeIncomingRecords() throws IOException {
         // write out any pending records (this can happen when inserts are turned into updates)
         Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
             ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
         while (newRecordsItr.hasNext()) {
           HoodieRecord<T> hoodieRecord = newRecordsItr.next();
           if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
             writeInsertRecord(hoodieRecord);
           }
         }
       }
      
      这里的writeIncomingRecords会判断如果writtenRecordKeys没有包含该记录的话,就直接插入数据,而不是更新

总结一下upsert的关键点:

mergeHandle.close()才是真正的写数据(insert)的时候,在初始化handle的时候会把记录传导writtenRecordKeys中(在HoodieMergeHandle中的init方法)

mergeHandle的write() 方法会在写入数据的时候,如果发现有新的数据,则会写入新的数据(update)

写hudi元数据

这里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete方法

public void notifyCheckpointComplete(long checkpointId) {
   ...
   final boolean committed = commitInstant(this.instant, checkpointId);
   ...
}
  ...
private boolean commitInstant(String instant, long checkpointId){
   ...
   doCommit(instant, writeResults);
   ...
}
  ...
private void doCommit(String instant, List<WriteStatus> writeResults) {
  // commit or rollback
  long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
  long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
  boolean hasErrors = totalErrorRecords > 0;

  if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
    HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
    if (hasErrors) {
      LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="
          + totalErrorRecords + "/" + totalRecords);
    }

    final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
        ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
        : Collections.emptyMap();
    boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),
        tableState.commitAction, partitionToReplacedFileIds);
    if (success) {
      reset();
      this.ckpMetadata.commitInstant(instant);
      LOG.info("Commit instant [{}] success!", instant);
    } else {
      throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
    }
  } else {
    LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
    LOG.error("The first 100 error messages");
    writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
      LOG.error("Global error for partition path {} and fileID {}: {}",
          ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
      if (ws.getErrors().size() > 0) {
        ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));
      }
    });
    // Rolls back instant
    writeClient.rollback(instant);
    throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));
  }
}


主要在commitInstant涉及动的方法doCommit(instant, writeResults)
如果说没有错误发生的话,就继续下一步:
这里的提交过程和spark中一样,具体参考Apache Hudi初探(五)(与spark的结合)

其他

在flink和spark中新写入的文件是在哪里分配对一个的fieldId:文章来源地址https://www.toymoban.com/news/detail-687877.html

//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId

//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner构造函数中 assignInserts 方法涉及到分配新的 fieldId

到了这里,关于Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

    之前讨论的都是’hoodie.datasource.write.operation’:\\\'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: 因为是 bulk insert 操作,所以没有去重的需要,所以直接采用spark原生的方式, 以下我们讨论非spark原生的方式, 继续Apache Hudi初探(八)(与spark的结合)–非

    2024年02月08日
    浏览(25)
  • 大数据Hadoop之——Apache Hudi 数据湖实战操作(FlinkCDC)

    Hudi(Hadoop Upserts Deletes and Incrementals) ,简称 Hudi ,是一个 流式数据湖平台 ,关于Hudi的更多介绍可以参考我以下几篇文章: 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi 大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合) 这里主要讲解Hive、Trino、Starr

    2023年04月20日
    浏览(28)
  • Hudi(16):Hudi集成Flink之读取方式

    目录 0. 相关文章链接 1. 流读(Streaming Query) 2. 增量读取(Incremental Query) 3. 限流  Hudi文章汇总          当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支

    2024年02月06日
    浏览(32)
  • Hudi(17):Hudi集成Flink之写入方式

    目录 0. 相关文章链接 1. CDC 数据同步 1.1. 准备MySQL表 1.2. flink读取mysql binlog并写入kafka 1.3. flink读取kafka数据并写入hudi数据湖 1.4. 使用datafaker插入数据 1.5. 统计数据入Hudi情况 1.6. 实时查看数据入湖情况 2. 离线批量导入 2.1. 原理 2.2. WITH 参数 2.3. 案例 3. 全量接增量 3.1. 

    2024年02月05日
    浏览(26)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(28)
  • Apache hudi 核心功能点分析

    文中部分代码对应 0.14.0 版本 初始的需求是Uber公司会有很多记录级别的更新场景,Hudi 在Uber 内部主要的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地将这两条数据流进行拼接,得到乘客

    2024年02月02日
    浏览(22)
  • Hudi集成Flink

    安装Maven 1)上传apache-maven-3.6.3-bin.tar.gz到/opt/software目录,并解压更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv   apache -maven-3.6. 3  maven 2)添加环境变量到/etc/profile中 sudo  vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)测试安装结果 sourc

    2023年04月13日
    浏览(20)
  • Apache Hudi Timeline Server介绍

    Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。 Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状

    2024年02月12日
    浏览(22)
  • Hudi(四)集成Flink(2)

            当前表 默认是快照读取 ,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1、WITH参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置

    2024年02月07日
    浏览(25)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包