Hudi的7种索引

这篇具有很好参考价值的文章主要介绍了Hudi的7种索引。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、Bloom Index

Bloom Index (default) 使用根据记录键构建的bloom过滤器,也可以使用记录键范围修剪候选文件.原理为计算RecordKey的hash值然后将其存储到bitmap中,为避免hash冲突一般选择计算3次

  • HoodieKey 主键信息:主要包含recordKey 和patitionPath 。recordkey 是由hoodie.datasource.write.recordkey.field 配置项根据列名从记录中获取的主键值。patitionPath 是分区路径。Hudi 会根据hoodie.datasource.write.partitionpath.field 配置项的列名从记录中获取的值作为分区路径。

  • https://llimllib.github.io/bloomfilter-tutorial/zh_CN/

原理:计算RecordKey的hash值然后将其存储到bitmap中去,key值做hash可能出现hash 碰撞的问题,为了较少hash 值的碰撞使用多个hash算法进行计算后将hash值存入BitMap,一般三次hash最佳

查找步骤:

1、提取所有的分区路径和主键值,然后计算每个分区路径中需要根据主键查找的索引的数量。

2、有了需要加载的分区后,调用LoadInvolvedFiles 方法加载分区下所有的parquet 文件。在加载paquet文件只是加载文件中的页脚信息,页脚存放的有布隆过滤器、记录最小值、记录最大值。对于布隆过滤器其实是存放的是bitmap序列化的对象。

3、加载好parquet 的页脚信息后会根据最大值和最小值构造线段树。

4、据Rdd 中RecordKey 进行数据匹配查找数据属于那个parqeut 文件中,对于RecordKey查找只有符合最大值和最小值范围才会去查找布隆过滤器中的bitmap ,RecordKey小于最小值找左子树,RecordKey大于最大值的key找右子树。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据。查找索引时spark会自定义分区避免大量数据在一个分区查找导致分区数据倾斜。查找到RecordKey位置信息后会构造<HoodieKey,HoodieRecordLocation> Rdd 对象。
hudi 索引,Hudi,spark,哈希算法,大数据,hudi
查找步骤,以Spark举例:
tagLocation:

  • 从Spark Rdd中提取partitionPath以及recordKey,构建partitionRecordKeyPairRDD对象

    • 调用lookupIndex方法,获取文件位置,返回值为:JavaPairRDD<HoodieKey, HoodieRecordLocation>

      • //1、将从Rdd中提取的一批值根据partition,进行分组
        //key:partition  value:数据数量
        Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
        //2、根据recordsPerPartition,将对应partition下的全部parquet文件加载上来
        //tuple2.t1:partitionPath,tuple2.t2:BloomIndexFileInfo->fileId、parquet footer max min
        List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
                loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
        //3、根据partitionPath,对parquet元数据进行分组
        final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        	fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
        //4、record与parquet文件进行匹配 fileid对应有HoodieKey的数据 -- 根据配置决定构建线段树还是暴力查找
        //返回值 Tuple2 t1:fileId t2:partitionPath and recordKey 形成对应关系
        JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
          explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
        //5、计算对每个文件组执行的布隆过滤器比较的估计数量
        //返回值:key:fileId value:数据数量   
        Map<String, Long> comparisonsPerFileGroup =
          computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
        //6、将partition数量与配置项 config.getBloomIndexParallelism()对比
        //索引查找并行度
        int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
        //7、根据key值获取位置信息
        //返回值:JavaPairRDD<HoodieKey, HoodieRecordLocation>
        //String instantTime; String fileId;
        findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable,
                comparisonsPerFileGroup);
        
      • 加载好parquet 的页脚信息后会根据最大值和最小值构造线段树

      • 根据Rdd 中RecordKey 进行数据匹配查找数据属于那个parqeut 文件中,对于RecordKey查找只有符合最大值和最小值范围才会去查找布隆过滤器中的bitmap ,RecordKey小于最小值找左子树,RecordKey大于最大值的key找右子树。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据。查找索引时spark会自定义分区避免大量数据在一个分区查找导致分区数据倾斜。查找到RecordKey位置信息后会构造<HoodieKey,HoodieRecordLocation> Rdd 对象。

      • hudi 索引,Hudi,spark,哈希算法,大数据,hudi
        加载paquet文件只是加载文件中的页脚信息,页脚存放的有布隆过滤器、记录最小值、记录最大值。对于布隆过滤器其实是存放的是bitmap序列化的对象。递归查询后如果查找到节点为空说明RecordKey在当前分区中不存在,当前Recordkey是新增数据

2、 Global Bloom Index

​ 全局布隆索引,与布隆索引的差异点在查找每个RecordKey 属于那个parquet 文件中,会加载所有parquet文件的页脚信息构造线段树,然后在去查询索引。因为Hudi需要加载所有的parquet文件和线段树节点变多对于查找性能会比普通的布隆索引要差

但是对于分区字段的值发生了修改,如果还是使用普通的布隆索引会导致在当前分区查询不到当成新增数据写入Hudi表。这样我们的数据就重复了,在很多业务场景是不被允许的。所以在选择那个字段做分区列时,尽量选择列值永远不会发生变更的,这样我们使用普通布隆索引就可以了。
hudi 索引,Hudi,spark,哈希算法,大数据,hudi

3、Simple Index

简易索引与布隆索引的不同是直接加载分区中所有的parquet数据然后在与当前的数据比较是否存在,实现比较简单。

  • 1、提取所有的分区路径和主键值
  • 2、根据分区路径加载所有涉及的分区路径内的parquet文件数据,主要加载HooieKey和fileID两列数据
  • 3、同布隆索引一样,将原始数据与包含位置信息的查询数据进行做关联,提取位置信息赋值至原始数据上
  • hudi 索引,Hudi,spark,哈希算法,大数据,hudi

4、Global Simple Index

简易全局索引同布隆全局索引一样,需要加载所有分区的parquet 文件数据,构造<HoodieKey,HoodieRecordLocation>Rdd然后进行关联。在简易索引中hoodie.simple.index.update.partition.path配置项也是可以选择是否允许分区数据变更。数据文件比较多数据量很大,这个过程会很耗时。

5、HBase Index

将索引映射存储在外部hbase表中,为全局索引。在HBase索引中,文件和索引是分开在特定的情况下可能有一致性问题

HBase索引实现步骤如下:

  • 1、连接HBase数据库

  • 2、批量请求Hbase数据库

  • 3、检查get获取数据是否为有效索引,这时Hudi会连接元数据检查commit时间是否有效,若无效currentLocation将不会被赋值。检查是否为有效索引的目的是当索引更新一半,导致Hbase宕机导致任务失败,保证不会加载过期的索引。避免Hbase索引和数据不一致导致数据进入错误的分区。

  • 检查是否开启允许分区变更

  • hudi 索引,Hudi,spark,哈希算法,大数据,hudi
    另一个值得理解的关键方面是全局索引和非全局索引之间的区别。布隆和简单索引都有全局选项 - hoodie.index.type=GLOBAL_BLOOM 和 hoodie.index.type=GLOBAL_SIMPLE。 HBase 索引本质上是一个全局索引。

  • 全局索引:全局索引强制跨表的所有分区的键的唯一性,即保证表中对于给定的记录键恰好存在一条记录。 全局索引提供了更强的保证,但更新/删除成本随着表 O(表大小)的大小而增长,这对于较小的表可能仍然是可以接受的。

  • 非全局索引:另一方面,默认索引实现仅在特定分区内强制执行此约束。 可以想象,非全局索引依赖于编写器在更新/删除期间为给定的记录键提供相同的一致分区路径,但可以提供更好的性能,因为索引查找操作变为 O(更新/删除的记录数) 并且可以很好地扩展写入量。

6、InMemoryHashIndex(仅适用测试环境)

​ 内存索引目前Spark的实现只是构造一个ConcurrentMap在内存中,不会加载parquet文件中的索引,当调用tagLocation方法会在map中判断key值是否存在

7、Bucket Index (0.11.0版本引入)

​ 字节跳动引入,引入原因:初始使用bloom过滤器,数据量达到30TB,约5千亿条记录分布在40000个file Group中,bloom Filter Index假阳性很频繁。Hudi 需要确定该 Record Key 是否真的存在这个操作需要读取文件里的实际数据一条一条做对比,而实际数据量规模很大,这会导致查询 Record Key 跟 File ID 的映射关系代价非常大,因此造成了索引的性能下滑。

为什么没有使用Hbase替代?

​ 业务方不希望引入 HBase 这一额外依赖,且担心运维 Hbase 过程中存在新的问题,认为 Hbase Index 整体不够轻量,因此在整个业务场景中也无法作为 Bloom Filter 索引的替代。

设计原理

​ Bucket Index 是一种基于哈希的索引,借鉴了数据库里的 Hash Index。给定 n 个桶, 用 Hash 函数决定某个记录属于哪个桶。最终所有分区被分成 N 个桶,每个桶对应一个 File Group。

相比较 Bloom Filter Index 来说,Hash Index 在逻辑层面提供了 Record Key 跟 File Group 的映射关系,不存在假阳性问题。相同 key 的数据一定是落在同一个桶里面。最终一分区内的结构如下,目前一个 Partition 里面 Bucket 和 File Group 是一一对应的关系。
hudi 索引,Hudi,spark,哈希算法,大数据,hudi
数据写入原理

Bucket Index 的实际写入流程可以参考下面的过程示意图。以下面的实时插入场景为例,某业务批次新增了 5 条记录,并且需要 Upsert 到已有的分区 partition=20220203 中,对已有数据根据主键 Record 做一个更新,保留最新的数据。整个过程可以用下面的示意图表示:
hudi 索引,Hudi,spark,哈希算法,大数据,hudi

  1. 在建表时先预估表的单个分区数据存储大小,设置一个分桶数 numBuckets。
  2. 在数据插入前,首先生成 n 个 File ID, 将 File ID 的前 8 位替换成 bucketId 的数字
  • 00000000-e929-4327-8b0c-7d0d66091321
  • 00000001-e3cd-4756-b311-863803a6cdaf
  • 00000002-c4ed-4418-90d4-6e348f380636
  • 00000003-c7bd-4916-78c5-6g787g090636

在插入过程中,最重要的一步就是标记每条新插入的记录属于哪个文件 File Group,然后找到对应的 File Group 去更新或者合并。在目前的设计中, 分桶数跟 File Group 是一一对应的映射关系,因此找到每条 Record 对应的桶 ID ,即可确定 Record Key 跟 File Group 的映射关系。

在具体实现中,我们会对更新数据的索引键计算哈希,再对分桶数取模快速定位到每个 Record 对应的桶,整个过程如下面的 Hash 函数所示:

hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets

其中 hashKeyFields 可以由用户指定,是 Record Key 的一个子集,当默认不指定时,会以 Record Key 本身作为 hash 键。在计算好后,每条记录即可知道即将写入的桶。

经过索引层之后,每条数据都会带有一个 File ID,引擎会根据 File ID 进行一次 Shuffle,将相同 File ID 的数据导入到同一个子任务中。对于 COW 表而言,更新 Update 部分需要和已有的 BaseFile 合并生成新的 BaseFile。而 MOR 表将 Update 的数据直接写入对应 File Group 的 delta log,Insert 部分生成新的 BaseFile,最终完成该批次数据的 Upsert。

由此可见,整个过程中 Bucket Index 不需要对现有的数据进行扫描组成类似 Bloom Filter 一样的过滤器,因此可以省去整个定位 File Group 的查询时间,定位 File Group 的时间也不会随着已有 Record 条数的增加而导致性能下降。同时分桶操作会在每个桶内对分桶列排序,排序后的数据一般能获得更高的压缩率,也能节省存储。文章来源地址https://www.toymoban.com/news/detail-632640.html

到了这里,关于Hudi的7种索引的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(44)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(40)
  • 问题:Spark SQL 读不到 Flink 写入 Hudi 表的新数据,打开新 Session 才可见

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(55)
  • Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作

    在之前的文章Apache Hudi初探(六)(与spark的结合) 中,我们没有过多的解释Spark中 hudi Compaction 的实现,在这里详细说一下 注意:在hudi中有同步,异步Compaction的概念,为了保证写入的及时性和数据读取的时效性,hudi在一步compaction的过程中会引入一个后台线程进行compaction,所以异

    2024年02月15日
    浏览(35)
  • spark集成hudi

    启动spark-shell 2 hudi内置数据生成器,生成10条json数据 3加载到DF,写入hudi,实现简单etl处理 4读取存储数据及注册临时表

    2024年02月07日
    浏览(34)
  • Hudi的7种索引

    Bloom Index (default) 使用根据记录键构建的bloom过滤器,也可以使用记录键范围修剪候选文件.原理为计算RecordKey的hash值然后将其存储到bitmap中,为避免hash冲突一般选择计算3次 HoodieKey 主键信息 :主要包含recordKey 和patitionPath 。recordkey 是由hoodie.datasource.write.recordkey.field 配置项根

    2024年02月14日
    浏览(35)
  • Hudi的核心概念 —— 索引(Index)

    Hudi 通过索引机制提供高效的 upserts,具体是将给定的 hoodie key(record key(记录键) + partition path)与文件 id(文件组)建立唯一映射。这种映射关系,数据第一次写入文件后保持不变, 所以,一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDAT

    2024年02月14日
    浏览(38)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(47)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 启动Spark Shell

    2024年01月24日
    浏览(38)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包