Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

这篇具有很好参考价值的文章主要介绍了Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

在Apache Hudi初探(一)(与flink的结合)中,我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory),具体分析一下写入的过程。

分析

对于transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)这个代码片段,我们主要看operatorFactory 这个对象(transform这个操作是Flink框架的操作):

public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {

  public StreamWriteOperator(Configuration conf) {
    super(new StreamWriteFunction<>(conf));
  }

  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
    return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
  }
}

最主要的hudi算子为StreamWriteOperator,其中最主要的操作是由StreamWriteFunction来完成的:

// StreamWriteFunction
  

   @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
    this.metaClient = StreamerUtil.createMetaClient(this.config);
    this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());
    this.writeStatuses = new ArrayList<>();
    this.writeMetadataState = context.getOperatorStateStore().getListState(
        new ListStateDescriptor<>(
            "write-metadata-state",
            TypeInformation.of(WriteMetadataEvent.class)
        ));

    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
    this.currentInstant = lastPendingInstant();
    if (context.isRestored()) {
      restoreWriteMetadata();
    } else {
      sendBootstrapEvent();
    }
    // blocks flushing until the coordinator starts a new instant
    this.confirming = true;
  }

  @Override
  public void open(Configuration parameters) throws IOException {
    this.tracer = new TotalSizeTracer(this.config);
    initBuffer();
    initWriteFunction();
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    if (inputEnded) {
      return;
    }
    snapshotState();
    // Reload the snapshot state as the current state.
    reloadWriteMetaState();
  }

  @Override
  public void snapshotState() {
    // Based on the fact that the coordinator starts the checkpoint first,
    // it would check the validity.
    // wait for the buffer data flush out and request a new instant
    flushRemaining(false);
  }

  @Override
  public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
    bufferRecord((HoodieRecord<?>) value);
  }

  • initializeState操作,主要是做一些初始化的操作

    • this.taskID = getRuntimeContext().getIndexOfThisSubtask();
      获取当前的task的索引下标,用来向operator coordinator发送event给operator coordinator,之后 StreamWriteOperatorCoordinator(operator coordinator) 进行处理,后续会说到StreamWriteOperatorCoordinator

    • metaClient = StreamerUtil.createMetaClient(this.config)
      writeClient = FlinkWriteClients.createWriteClient
      初始化hudi的元数据客户端(这里是HoodieTableMetaClient)和写入客户端(这里是HoodieFlinkWriteClient)

    • writeStatuses = new ArrayList<>()
      记录后续的写入hudi文件的信息

    • writeMetadataState = context.getOperatorStateStore().getListState
      记录写入hudi的元数据事件,会在后续的操作中,会包装成event发送给operator coordinator(StreamWriteOperatorCoordinator)

    • ckpMetadata = CkpMetadata.getInstance
      Flink的checkpoint的元数据信息路径,默认的路径是/${hoodie.basePath}/.hoodie/.aux/ckp_meta

    • currentInstant = lastPendingInstant()
      获取上次还没有完成的commit

    • restoreWriteMetadata或者sendBootstrapEvent,根据是否是从checkpoint恢复过来的进行不同消息的发送,
      这里的operator coordinator(StreamWriteOperatorCoordinator)会进行统一的处理,并初始化一个commit

  • open操作
    写入hudi前的前置操作,比如说 初始化TotalSizeTracer记录maxBufferSize便于flush操作
    根据write.operation的值(默认是upsert)选择后续的操作是insert或upsert或overwrite,这里是upsert

  • processElement操作
    这里对传入的HoodieRecord进行缓存,主要是bufferRecord做的事情,

    • 首先会获取bucketID,之后再往对应的bucket中插入数据
    • 如果超出write.batch.size(默认是128MB),则会进行flushBucket操作,该操作主要是写入hudi操作 //TODO: 具体的写入hudi操作
      • 首先会获取新的需要提交的commit
      • 再进行写入的实际操作
      • 写入的文件元数据信息回传到operator coordinator进行统一处理
  • snapshotState 操作

    • 调用flushRemaining 写入剩下的数据到hudi存储中
    • 重新加载当前写入的hudi文件元数据信息到当前flink的state中

hudi StreamWriteOperatorCoordinator作用

总的来说,StreamWriteOperatorCoordinator扮演的角色和在Spark中driver的角色一样,都是来最后来提交 元数据信息到huid中。
具体的作用还是得从具体的方法来看:

  @Override
  public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
    ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
        "The coordinator can only handle WriteMetaEvent");
    WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;

    if (event.isEndInput()) {
      // handle end input event synchronously
      // wrap handleEndInputEvent in executeSync to preserve the order of events
      executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
    } else {
      executor.execute(
          () -> {
            if (event.isBootstrap()) {
              handleBootstrapEvent(event);
            } else {
              handleWriteMetaEvent(event);
            }
          }, "handle write metadata event for instant %s", this.instant
      );
    }
  }
  ...
  @Override
  public void notifyCheckpointComplete(long checkpointId) {
    executor.execute(
        () -> {
          // The executor thread inherits the classloader of the #notifyCheckpointComplete
          // caller, which is a AppClassLoader.
          Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
          // for streaming mode, commits the ever received events anyway,
          // the stream write task snapshot and flush the data buffer synchronously in sequence,
          // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
          final boolean committed = commitInstant(this.instant, checkpointId);

          if (tableState.scheduleCompaction) {
            // if async compaction is on, schedule the compaction
            CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
          }

          if (tableState.scheduleClustering) {
            // if async clustering is on, schedule the clustering
            ClusteringUtil.scheduleClustering(conf, writeClient, committed);
          }

          if (committed) {
            // start new instant.
            startInstant();
            // sync Hive if is enabled
            syncHiveAsync();
          }
        }, "commits the instant %s", this.instant
    );
  }
  • handleEventFromOperator方法用来接受task发送的消息

    • 对于BootStrap类型的WriteMetadataEvent(在StreamWriteFunction方法initializeState中),相当于函数初始化也就会触发
      该类型的消息由handleBootstrapEvent来处理(我们这里假设每个任务operator都完成了初始化的操作),对应的数据流如下:

      initInstant
         ||
         \/
      reset => startInstant
      

      startInstant 这里就会初始化一个hudi写操作的commit信息

    • 对于一般的write的信息的event,(比如说在processElement的flushBucket函数中),由handleWriteMetaEvent来处理:

       if (this.eventBuffer[event.getTaskID()] != null) {
       this.eventBuffer[event.getTaskID()].mergeWith(event);
       } else {
         this.eventBuffer[event.getTaskID()] = event;
       }
      

      这里只是加到变量名为eventBuffer 的WriteMetadataEvent类型的数组中,后续中会进行处理

    • 对于isEndInputtrue的event,这种一般source是基于文件的这种,这里先不讨论

  • notifyCheckpointComplete 当对应的checkpointId完成以后,该方法会被调用

    • commitInstant 提交hudi元数据,如果如果有发生异常,则回滚当前hudi对应的commit
    • scheduleCompaction && scheduleClustering 进行hui的CompcationClustering
    • 如果成功的提交了,则会开启一个新的commit,如果开了hive同步(hive_sync.enabled默认为false),则会同步元数据信息到hive

总结

用一张图总结一下交互方式,如下:
Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作),flink,hudi,flink,大数据,hudi文章来源地址https://www.toymoban.com/news/detail-664014.html

到了这里,关于Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

    之前讨论的都是’hoodie.datasource.write.operation’:\\\'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: 因为是 bulk insert 操作,所以没有去重的需要,所以直接采用spark原生的方式, 以下我们讨论非spark原生的方式, 继续Apache Hudi初探(八)(与spark的结合)–非

    2024年02月08日
    浏览(27)
  • 大数据Hadoop之——Apache Hudi 数据湖实战操作(FlinkCDC)

    Hudi(Hadoop Upserts Deletes and Incrementals) ,简称 Hudi ,是一个 流式数据湖平台 ,关于Hudi的更多介绍可以参考我以下几篇文章: 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi 大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合) 这里主要讲解Hive、Trino、Starr

    2023年04月20日
    浏览(32)
  • Hudi(17):Hudi集成Flink之写入方式

    目录 0. 相关文章链接 1. CDC 数据同步 1.1. 准备MySQL表 1.2. flink读取mysql binlog并写入kafka 1.3. flink读取kafka数据并写入hudi数据湖 1.4. 使用datafaker插入数据 1.5. 统计数据入Hudi情况 1.6. 实时查看数据入湖情况 2. 离线批量导入 2.1. 原理 2.2. WITH 参数 2.3. 案例 3. 全量接增量 3.1. 

    2024年02月05日
    浏览(28)
  • Hudi(16):Hudi集成Flink之读取方式

    目录 0. 相关文章链接 1. 流读(Streaming Query) 2. 增量读取(Incremental Query) 3. 限流  Hudi文章汇总          当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支

    2024年02月06日
    浏览(33)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(29)
  • Apache hudi 核心功能点分析

    文中部分代码对应 0.14.0 版本 初始的需求是Uber公司会有很多记录级别的更新场景,Hudi 在Uber 内部主要的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地将这两条数据流进行拼接,得到乘客

    2024年02月02日
    浏览(23)
  • Hudi集成Flink

    安装Maven 1)上传apache-maven-3.6.3-bin.tar.gz到/opt/software目录,并解压更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv   apache -maven-3.6. 3  maven 2)添加环境变量到/etc/profile中 sudo  vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)测试安装结果 sourc

    2023年04月13日
    浏览(22)
  • Apache Hudi Timeline Server介绍

    Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。 Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状

    2024年02月12日
    浏览(23)
  • Hudi(四)集成Flink(2)

            当前表 默认是快照读取 ,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1、WITH参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置

    2024年02月07日
    浏览(27)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包