Apache Hudi初探(一)(与flink的结合)

这篇具有很好参考价值的文章主要介绍了Apache Hudi初探(一)(与flink的结合)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

Spark的使用方式不同,flink结合hudi的方式,是以SPI的方式,所以不需要像使用Spark的方式一样,Spark的方式如下:

spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog

(这里不包括org.apache.spark.sql.sources.DataSourceRegister
Flink结合Hudi的方式,只需要引入了对应的jar包即可,以SPI的方式:

META-INF/services/org.apache.flink.table.factories.Factory
org.apache.hudi.table.HoodieTableFactory
org.apache.hudi.table.catalog.HoodieCatalogFactory

其中 HoodieTableFactory 是读写Hudi数据的地方,
HoodieCatalogFactory是操作Hudi用到的Catalog

先说杂谈

直接先解释一下Hudi的写数据:

HoodieTableFactory

  @Override
  public DynamicTableSink createDynamicTableSink(Context context) {
    Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
    checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
        "Option [path] should not be empty.");
    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
    sanityCheck(conf, schema);
    setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
    return new HoodieTableSink(conf, schema);
  }

创建的HoodieTableSink是真正Hudi写入数据的类:文章来源地址https://www.toymoban.com/news/detail-595991.html

public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
  ...
  @Override
  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProviderAdapter) dataStream -> {

      // setup configuration
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      // set up default parallelism
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // bulk_insert mode
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // Append mode
      if (OptionsResolver.isAppendMode(conf)) {
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // bootstrap
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // write pipeline
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }
    };
  }
  ...
}
  • long ckpTimeout = * 获取到flink的checkpoint的超时时间,并设置为write.commit.ack.timeout的值
    spark*的批处理的不同,flink会有checkpoint的存在(便于flink内部状态的保存和恢复)。
  • OptionsInference.setupSinkTasks
    设置write.tasks/write.bucket_assign.tasks/compaction.tasks/clustering.tasks 的值
  • 根据设置的write.operation值(默认是upsert),选择不同的代码路径
  • Pipelines.bootstrap bootstrap操作会加载存在的索引,并传递到下游的operator算子中
    • 如果index.global.enabledtrue(默认为true),则首先构建RowDataToHoodieFunction 算子,把记录转换为HoodieAvroRecord
      其中,payload.class默认为EventTimeAvroPayload
      write.precombinefalse,这是为了更好的写性能,这一点和Spark的默认行为一样
    • 如果index.bootstrap.enabledtrue默认是false,还会构建BootstrapOperator算子,从已有的hoodie表中加载索引,并把索引记录传播到下游(注意每个task只会触发一次)
  • pipeline = Pipelines.hoodieStreamWrite 写hudi文件
    • 正如代码中的注释一样,先按照record key做shuffle,再按照fileId做shuffle,最后才是写操作:
       | input1 | ===\     /=== | bucket assigner | ===\     /=== | task1 |
                      shuffle(by PK)                    shuffle(by bucket ID)
       | input2 | ===/     \=== | bucket assigner | ===/     \=== | task2 |
      
      
    • 如果index.type不是BUCKET(默认是FLINK_STATE,使用flink state backend作为存储),我们这里主要说明一下非bucket索引:
       WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
        return dataStream
            // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
            .keyBy(HoodieRecord::getRecordKey)
            .transform(
                "bucket_assigner",
                TypeInformation.of(HoodieRecord.class),
                new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
            .uid(opUID("bucket_assigner", conf))
            .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
            // shuffle by fileId(bucket id)
            .keyBy(record -> record.getCurrentLocation().getFileId())
            .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
            .uid(opUID("stream_write", conf))
            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
      
      • 按照record key进行分组后,主要的操作就是在BucketAssignFunction 这个函数中,
        该函数的主要作用就是:
        • 先根据上一个operator下发的索引记录,更新内存中保存的索引,如果索引存在则更新,否则新增(这里插入对应的I或者U标识)
      • 再按照FileId进行分组后,主要的操作 在StreamWriteFunction这个函数中,这里涉及的知识点比较多,后续详细剖析
        主要涉及了StreamWriteOperatorCoordinator以及hudi exactly once的实现等
  • 如果compaction.async.enabledTrue(默认是True)则进行异步Compaction,否则进行Clean操作

到了这里,关于Apache Hudi初探(一)(与flink的结合)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

    之前讨论的都是’hoodie.datasource.write.operation’:\\\'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: 因为是 bulk insert 操作,所以没有去重的需要,所以直接采用spark原生的方式, 以下我们讨论非spark原生的方式, 继续Apache Hudi初探(八)(与spark的结合)–非

    2024年02月08日
    浏览(34)
  • 大数据Hadoop之——Apache Hudi 数据湖实战操作(FlinkCDC)

    Hudi(Hadoop Upserts Deletes and Incrementals) ,简称 Hudi ,是一个 流式数据湖平台 ,关于Hudi的更多介绍可以参考我以下几篇文章: 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi 大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合) 这里主要讲解Hive、Trino、Starr

    2023年04月20日
    浏览(39)
  • Apache Hudi 在袋鼠云数据湖平台的设计与实践

    在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种

    2024年02月06日
    浏览(41)
  • Apache Hudi DeltaStreamer 接入CDC数据时如何完成 Kafka 的身份认证?

    题目有些拗口,简短截说,我们对于Apache Hudi DeltaStreamer在接入CDC数据时,对于其如何通过 Kafka 的身份认证,做了一系列测试和研究,有如下明确结论: . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 

    2024年02月16日
    浏览(38)
  • 探索在Apache SeaTunnel上使用Hudi连接器,高效管理大数据的技术

    Apache Hudi是一个数据湖处理框架,通过提供简单的方式来进行数据的插入、更新和删除操作,Hudi能够帮助数据工程师和科学家更高效地处理大数据,并支持实时查询。 Spark Flink SeaTunnel Zeta 批处理 流处理 精确一次性 列投影 并行处理 支持用户自定义切分 Hudi Source 连接器专为从

    2024年04月28日
    浏览(48)
  • Apache hudi 核心功能点分析

    文中部分代码对应 0.14.0 版本 初始的需求是Uber公司会有很多记录级别的更新场景,Hudi 在Uber 内部主要的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地将这两条数据流进行拼接,得到乘客

    2024年02月02日
    浏览(32)
  • Apache Hudi Timeline Server介绍

    Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。 Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状

    2024年02月12日
    浏览(28)
  • 问题:Spark SQL 读不到 Flink 写入 Hudi 表的新数据,打开新 Session 才可见

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(52)
  • 提升 Apache Hudi Upsert 性能的三个建议

    Apache Hudi 社区一直在快速发展,各公司正在寻找方法来利用其强大的功能来有效地摄取和管理大规模数据集。 每周社区都会收到一些常见问题,最常见的问题与 Hudi 如何执行更新插入有关,以确保以低延迟访问最新数据。 快速更新插入的主要考虑因素之一是选择正确的存储

    2024年02月05日
    浏览(42)
  • Apache Hudi 1.x 版本重磅功能展望与讨论

    Apache Hudi 社区正在对Apache Hudi 1.x版本功能进行讨论,欢迎感兴趣同学参与讨论,PR链接:https://github.com/apache/hudi/pull/8679/files 此 RFC 提议对 Hudi 中的事务数据库层进行令人兴奋和强大的重构,以推动未来几年整个社区的持续创新。 在过去的几年里,社区成长(https://git-contributo

    2024年02月07日
    浏览(70)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包