Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

这篇具有很好参考价值的文章主要介绍了Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,

闲说杂谈

继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:

   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
   val (writeSuccessful, compactionInstant, clusteringInstant) =
        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
          writeResult, parameters, writeClient, tableConfig, jsc,
          TableInstantInfo(basePath, instantTime, commitActionType, operation))
  • doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:

     public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
      HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
          initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
      table.validateUpsertSchema();
      preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
      HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
      HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
      if (result.getIndexLookupDuration().isPresent()) {
        metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
      }
      return postWrite(resultRDD, instantTime, table);
    }
    
    • initTable 创建获取一个HoodieSparkMergeOnReadTable

    • validateSchema 校验Schema的兼容性

    • preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)

    • table.upsert 真正写入数据的操作
      最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write

        public HoodieWriteMetadata<O> write(String instantTime,
                                        I inputRecords,
                                        HoodieEngineContext context,
                                        HoodieTable<T, I, K, O> table,
                                        boolean shouldCombine,
                                        int shuffleParallelism,
                                        BaseCommitActionExecutor<T, I, K, O, R> executor,
                                        WriteOperationType operationType) {
          try {
              // De-dupe/merge if needed
              I dedupedRecords =
                  combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
      
              Instant lookupBegin = Instant.now();
              I taggedRecords = dedupedRecords;
              if (table.getIndex().requiresTagging(operationType)) {
                // perform index loop up to get existing location of records
                context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
                taggedRecords = tag(dedupedRecords, context, table);
              }
              Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
      
              HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
              result.setIndexLookupDuration(indexLookupDuration);
              return result;
            } catch (Throwable e) {
              if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException) e;
              }
              throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
            }
          }
      
      • combineOnCondition 数据去重
        最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine
      • taggedRecords = tag(dedupedRecords, context, table) 因为默认的indexHoodieSimpleIndex,所以这个时候会调用到打标记这个操作
        最终调用到的是HoodieSimpleIndextagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的locationnull
      • executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:
        public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
           // Cache the tagged records, so we don't end up computing both
           // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
           JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
           if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
             inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
           } else {
             LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
           }
        
           WorkloadProfile workloadProfile = null;
           if (isWorkloadProfileNeeded()) {
             context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());
             workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
             LOG.info("Input workload profile :" + workloadProfile);
           }
        
           // partition using the insert partitioner
           final Partitioner partitioner = getPartitioner(workloadProfile);
           if (isWorkloadProfileNeeded()) {
             saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
           }
        
           // handle records update with clustering
           Set<HoodieFileGroupId> fileGroupsInPendingClustering =
               table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
           HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
        
           context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
           HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
           HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
           updateIndexAndCommitIfNeeded(writeStatuses, result);
           return result;
         }
        
        • inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速

        • workloadProfile = new WorkloadProfile(buildProfile(inputRecords)
          构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key, *Pair<instantTime,count>*为value的Map数据

        • final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.typeDEFAULT),
          其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdatesassignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理

        • mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)
          这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,
          之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)

        • updateIndexAndCommitIfNeeded(writeStatuses, result)
          该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),
          其次如果hoodie.auto.committrue(默认是true)会进行元数据的commit操作

    • postWrite 这里的postCommit 进行自动clean和Archive操作

  • commitAndPerformPostOperations
    这里主要是异步Compcation和Clustering以及同步hive元数据,类似Apache Hudi初探(七)(与spark的结合)文章来源地址https://www.toymoban.com/news/detail-477671.html

到了这里,关于Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

    在Apache Hudi初探(一)(与flink的结合)中,我们提到了 Pipelines.hoodieStreamWrite 写hudi文件 ,这个操作真正写hudi是在 Pipelines.hoodieStreamWrite 方法下的 transform(opName(\\\"stream_write\\\", conf), TypeInformation.of(Object.class), operatorFactory) ,具体分析一下写入的过程。 对于 transform(opName(\\\"stream_write\\\", conf), Ty

    2024年02月12日
    浏览(27)
  • Apache Doris 数据导入:Insert Into语句;Binlog Load;Broker Load;HDFS Load;Spark Load;例行导入(Routine Load)

    Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。Doris支持各种各样的数据导入方式:Insert Into、json格式数据导入、Binlog Load、Broker Load、Routine Load、Spark Load、Stream Load、S3 Load,下面分别进行介绍。 注意: Doris 中的所有导入操作都有原子性保

    2024年02月21日
    浏览(44)
  • 使用kettle同步全量数据到Elasticsearch(es)--elasticsearch-bulk-insert-plugin应用

    为了前端更快地进行数据检索,需要将数据存储到es中是一个很不错的选择。由于公司etl主要工具是kettle,这里介绍如何基于kettle的elasticsearch-bulk-insert-plugin插件将数据导入es。在实施过程中会遇到一些坑,这里记录解决方案。 可能会遇到的报错: 1、No elasticSearch nodes found 2、

    2024年02月01日
    浏览(62)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(35)
  • 数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查

    Hadoop 3.1.3 Hive 3.1.2 Flink 1.13.6,scala-2.12 Spark 3.2.2,scala-2.12 2.1.1 环境准备 2.1.2 下载源码包 2.1.3 在pom文件中新增repository加速依赖下载 在pom文件中修改依赖的组件版本: 2.1.4 修改源码兼容hadoop3并添加kafka依赖 Hudi默认依赖的hadoop2,要兼容hadoop3,除了修改版本,还需要修改如下代

    2024年02月06日
    浏览(43)
  • spark集成hudi

    启动spark-shell 2 hudi内置数据生成器,生成10条json数据 3加载到DF,写入hudi,实现简单etl处理 4读取存储数据及注册临时表

    2024年02月07日
    浏览(25)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(33)
  • Apache hudi 核心功能点分析

    文中部分代码对应 0.14.0 版本 初始的需求是Uber公司会有很多记录级别的更新场景,Hudi 在Uber 内部主要的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地将这两条数据流进行拼接,得到乘客

    2024年02月02日
    浏览(23)
  • Apache Hudi Timeline Server介绍

    Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。 Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状

    2024年02月12日
    浏览(23)
  • 04_Hudi 集成 Spark、保存数据至Hudi、集成Hive查询、MergeInto 语句

    本文来自\\\"黑马程序员\\\"hudi课程 4.第四章 Hudi 集成 Spark 4.1 环境准备 4.1.1 安装MySQL 5.7.31 4.1.2 安装Hive 2.1 4.1.3 安装Zookeeper 3.4.6 4.1.4 安装Kafka 2.4.1 4.2 滴滴运营分析 4.2.1 需求说明 4.2.2 环境准备 4.2.2.1 工具类SparkUtils 4.2.2.2 日期转换星期 4.2.3 数据ETL保存 4.2.3.1 开发步骤 4.2.3.2 加载CS

    2024年02月13日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包