Hudi
传送门
特性
-
可插拔索引机制支持快速的Upsert/Delete
-
支持增量拉取表变更以进行处理
-
支持事务提交及回滚,并发控制
-
支持spark、presto、hive、flink等引擎的sql读写
-
自动管理小文件,数据聚簇、压缩、清理
-
流式摄入,内置CDC
-
内置可扩展存储访问的元数据跟踪
-
向后兼容的方式实现表结构变更的支持
场景
近实时写入、近实时分析、增量的pipline、增量导出
核心概念
基本概念
时间轴
- 由每一个instants组成,每个instant由三个部分组成:
- Action
-
COMMITS
- 提交表示将一批记录原子写入表中。 -
CLEANS
- 后台活动,用于删除表中不再需要的旧版本文件。 -
DELTA_COMMIT
- 增量提交是指将一批记录原子写入 MergeOnRead 类型表,其中部分/全部数据可以只写入增量日志。 -
COMPACTION
- 协调Hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件移动到列格式。在内部,压缩表现为时间轴上的特殊提交 -
ROLLBACK
- 指示提交/增量提交不成功 & 已回滚,删除在此类写入期间生成的任何部分文件 -
SAVEPOINT
- 将某些文件组标记为“已保存”,以便清理程序不会删除它们。它有助于将表还原到时间轴上的某个点,以防发生灾难/数据恢复方案。
-
- Time,通常是时间戳,以操作的开始时间顺序单调增加
- State
-
REQUESTED
- 表示已计划操作,但尚未启动 -
INFLIGHT
- 表示当前正在执行操作 -
COMPLETED
- 表示完成时间轴上的操作
-
- Action
- 时间概念
- Arrival time:数据到达Hudi的时间,commit time
- Event time:record中记录的时间,也就是数据自带的时间
这里之所以不用担心迟到的数据消费不到,是因为此事件有事件自带的时间,自然也会隶属于所在的时间分区
文件布局
-
hudi表
- 元数据区(.hoodie)
- instant1、instant2
- 数据区(分区)
- 分区1:dayFile、logFile
- 分区2:dayFile、logFile
- 元数据区(.hoodie)
-
hudi分区中有文件组的概念,文件组存在唯一标识ID,每个文件组包含多个文件片,一个文件片包含:
- 一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)
- 多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有)
-
hudi采用了多版本并发控制,也就是对应了多版本的数据------多文件片
- compaction操作:合并日志和基本文件以产生新的文件片
- clean操作:清除不使用的/旧的文件片以回收文件系统上的空间
-
Hudi的base file(parquet 文件)在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。只有不在 BloomFilter 的 key 才需要扫描整个文件消灭假阳------索引检测key是否存在
-
Hudi 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包含 magic number、size、content、footer 等信息,用于数据读、校验和过滤。
索引
-
hudi提供了高效的upserts,具体是将hoodie key(record key+partition path)与文件id(文件组)建立唯一映射
-
数据第一次写入文件后保持不变,一个FileGroup包含了一批record的所有版本记录。index用于区分消息是insert还是update
-
此做法的意义在于,当更新的数据到了之后可以快速定位到对应的FileGroup,避免了不必要的更新,只需要在FileGroup内做合并
-
-
索引的类型
index | 原理 | 优点 | 缺点 |
---|---|---|---|
Bloom Index | 默认配置,使用布隆过滤器来判断记录存在与否,也可选使用record key的范围裁剪需要的文件 | 效率高,不依赖外部系统,数据和索引保持一致性 | 因假阳性问题,还需回溯原文件再查找一遍 |
Simple Index | 把update/delete操作的新数据和老数据进行join | 实现最简单,无需额外的资源 | 性能比较差 |
HBase Index | 把index存放在HBase里面。在插入 File Group定位阶段所有task向HBase发送 Batch Get 请求,获取 Record Key 的 Mapping 信息 | 对于小批次的keys,查询效率高 | 需要外部的系统,增加了运维压力 |
Flink State-based Index | HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先计算目标 bucket ID。 | 不同于 BloomFilter Index,避免了每次重复的文件 index 查找 |
-
全局索引/非全局索引
-
全局索引:全局索引在全表的所有分区范围下强制要求键的唯一性,也就是确保对给定的键有且只有一个对应的记录。全局索引提供了更强的保证,但是随着表增大,update/delete 操作损失的性能越高,因此更适用于小表。
-
非全局索引:默认的索引实现,只能保证数据在分区的唯一性。非全局索引依靠写入器为同一个记录的update/delete提供一致的分区路径,同时大幅提高了效率,更适用于大表。
-
HBase索引本质上是一个全局索引,bloom和simple index都有全局选项:
hoodie.index.type=GLOBAL_BLOOM hoodie.index.type=GLOBAL_SIMPLE
-
-
索引的选择
- 大部分的更新会在较新的几个时间分区上,只有小部分会在旧的分区,这种可以使用布隆索引,如果record key是有序的,那就可以通过范围进一步筛选;如果更加高效的使用布隆过滤器进行比对,hudi缓存了输入记录并且使用了自定义的分区器和统计的规律来解决了数据的倾斜,如果假阳性率较高,查询会增加数据的打乱操作,也会根据数据量来调整大小从而达到设定的假阳性率
- 事件流数据,比如从kafka或者其他消息件发出的数据,插入和更新只存在于最新的几个分区中,重复事件较多,所以在入湖之前去重是一个常见的需求;虽然可以使用hbase索引进行去重,但索引存储的消耗还是会随着事件的增长而线性增长,所以有范围裁剪的布隆索引才是最佳的解决方案,可以使用事件时间戳+事件id组成的键作为去重条件
- 对维度表的随机更新,使用布隆裁剪就不合适,直接使用普通简单索引就合适,直接将所有的文件的所需字段连接;也可以采用HBase索引,其对这些表能提供更加优越的查询效率;当遇到分区内数据需要更新时,较为适合采用Merge-On-Read表
表类型
-
Copy On Write
- 只有数据文件/基本文件(.parquet),没有增量日志文件(.log.*)
- 对于每一个新批次的写入都将创建相应数据文件的版本(新的FileSlice),也就是第一次写入文件为fileslice1,第二次更新追加操作就是fileslice2
- data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 是data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并
- cow是在写入期间进行合并,因此会产生一些延时,但是它最大的特点在于简单性,不需要其他表的服务,也相对容易调试
-
Merge On Read
-
可能包含列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log文件)
-
所以对于初始的文件也是追加的avro文件,后续修改追加的文件是avro文件,而且只有在读的时候或者compaction才会合并成列文件
-
compaction可以选择内联或者异步方式,比如可以将压缩的最大增量日志配置为 4。这意味着在进行 4 次增量写入后,将对数据文件进行压缩并创建更新版本的数据文件
-
不同索引写文件会有差异,布隆索引插入还是写入parquet文件,只有更新才会写入avro文件,因为当parquet文件记录了要更新消息的FileGroupID;而对于Flink索引可以直接写入avro文件
-
-
COW与MOR的对比
CopyOnWrite | MergeOnRead | |
---|---|---|
数据延迟 | 高 | 低 |
查询延迟 | 低 | 高 |
Update(I/O) 更新成本 | 高(重写整个Parquet文件) | 低(追加到增量日志) |
Parquet文件大小 | 低(更新成本I/O高) | 较大(低更新成本) |
写放大 | 大 | 低(取决于压缩策略) |
所以,cow适合批次处理,mor适合批流一体但更适合流式计算
查询类型
-
快照查询 Snapshot Queries
-
全量最新数据:cow直接查最新的parquet文件,mor表需要做一个合并------最新全量数据
-
-
增量查询 Incremental Queries
- 可以查询给定commit/delta commit即时操作以来新写入的数据。有效的提供变更流来启用增量数据管道------最新增量数据
-
读优化查询 Read Optimized Queries
-
可查看给定的commit/compact即时操作的表的最新快照。仅将最新文件片的基本/列文件暴露给查询,并保证与非Hudi表相同的列查询性能------并不是全量最新,只是合并时文件
-
-
不同表支持的查询类型
数据写
upsert
- cow
- 先对数据按照record key去重
- 创建索引rocord key+分区路径+文件组id;并且通过索引区分哪些数据是更新,哪些数据是新增(key第一次写入)
- 对于更新的数据,会定位到key所在的最新的文件片的base文件(parquet文件),做合并写到新的文件片内
- 对于新增的数据
- 会先扫面当前分区下的小文件,存在就合并写到新的文件片
- 不存在就写新的FileGroup+FileSlice(文件片)
- mor
- 先对数据按照record key去重(可选)
- 创建索引rocord key+分区路径+文件组id;并且通过索引区分哪些数据是更新,哪些数据是新增(key第一次写入)
- 对于新增的数据
- 如果不可建索引,会尝试合并分区内最小的base file(也就是不包含log 文件的FileSlice),生成新的FileSlice,如果没有base File就新写一个FileGroup + FileSlice + base file
- 如果log file可以建索引,会尝试追加到小的log file内,如果没有log file就新写一个FileGroup + FileSlice + base file
- 对于更新的数据,写对应的file group+file slice,直接追加到最新的log file(如果当时是最小的小文件,会合并base file,生成新的file slice)
- log file的大小达到一定的阈值会滚动生成一个新的文件
insert
-
cow
- 先对 records 按照 record key 去重(可选)
- 不会创建index
- 如果有小的base file文件,就合并生成新的fileslice+base file,否则就写新的fileslice+base file
-
mor
- 先对 records 按照 record key 去重(可选)
- 不会创建index
- 如果log file可创建索引,并且有小的fileslice,尝试追加或写到最新log file,如果log file不可索引,就写一个新的fileslice+base file
insert overwrite
在同一分区中创建新的文件组集,现有的文件组被标记为“删除”,根据新记录的数量创建新的文件组
key的生成策略
record key可以用多个字段组合,parition path支持多个字段组合(多层分区)
删除策略
- 逻辑删除:value字段标记为null
- 物理删除:
- 通过 OPERATION_OPT_KEY 删除所有的输入记录
- 配置 PAYLOAD_CLASS_OPT_KEY =org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录
- 在输入记录添加字段:_hoodie_is_deleted
数据读
-
快照读:读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件
-
增量读:当前的 Spark data source 可以指定消费的起始和结束 commit 时间,读取 commit 增量的数据集。但是内部的实现不够高效:拉取每个 commit 的全部目标文件再按照系统字段 _ hoodie_commit_time_ apply 过滤条件
-
流读:0.8.0 版本的 HUDI Flink writer 支持实时的增量订阅,可用于同步 CDC 数据,日常的数据同步 ETL pipeline。Flink 的 streaming 读做到了真正的流式读取,source 定期监控新增的改动文件,将读取任务下派给读 task。
-
Compaction文章来源:https://www.toymoban.com/news/detail-495857.html
- 没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file
- 有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file
Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。文章来源地址https://www.toymoban.com/news/detail-495857.html
到了这里,关于数据湖——Hudi基本概念的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!