Hudi(16):Hudi集成Flink之读取方式

这篇具有很好参考价值的文章主要介绍了Hudi(16):Hudi集成Flink之读取方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0. 相关文章链接

1. 流读(Streaming Query)

2. 增量读取(Incremental Query)

3. 限流


0. 相关文章链接

 Hudi文章汇总 

1. 流读(Streaming Query)

        当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

WITH参数:

名称

Required

默认值

说明

read.streaming.enabled

false

false

设置 true 开启流读模式

read.start-commit

false

最新 commit

指定 'yyyyMMddHHmmss' 格式的起始 commit(闭区间)

read.streaming.skip_compaction

false

false

流读时是否跳过 compaction commits,跳过 compaction 有两个用途:

1)避免 upsert 语义下重复消费 (compaction instant 为重复数据,如果不跳过,有小概率会重复消费)

2) changelog 模式下保证语义正确性

0.11 开始,以上两个问题已经通过保留 compaction instant time 修复

clean.retain_commits

false

10

cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。

注意:当参数 read.streaming.skip_compaction 打开并且 streaming reader 消费落后于clean.retain_commits 数时,流读可能会丢失数据。从 0.11 开始,compaction 不会再变更 record 的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。

CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默认60s

);


insert into t5 select * from sourceT;

select * from t5;

2. 增量读取(Incremental Query)

Hudi从 0.10.0 开始支持增量读取。如果有增量读取 batch 数据的需求,增量读取包含三种场景

  • Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;
  • Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit;
  • TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明);

WITH 参数:

名称

Required

默认值

说明

read.start-commit

false

默认从最新 commit

支持 earliest 从最早消费

read.end-commit

false

默认到最新 commit

3. 限流

        如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。

WITH 参数:

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速


注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 文章来源地址https://www.toymoban.com/news/detail-457576.html


到了这里,关于Hudi(16):Hudi集成Flink之读取方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(38)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(40)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(44)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(41)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 启动Spark Shell

    2024年01月24日
    浏览(36)
  • 基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:为实现基于数据湖的流批一体,采用业内主流技术栈hudi、flink、CDH(hive、spark)。flink使用sql client与hive的catalog打通,可以与hive共享元数据,使用sql client可操作hive中的表,实现批流一体;flink与hudi集成可以实现数据实时入湖;hudi与hive集成可以实现湖仓一体,用flink实

    2024年02月12日
    浏览(56)
  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事项:一般都是用基于Flink的Hive Catalog,使用HMS存储表模型数据 1、集成方式 (1)下载jar包 下载地址 (2)启动FlinkSQL ①StandLone模式启动 ②Flink On Yarn模式启动 2、基本使用 2.1、创建catalog 核心:可创建hive、hadoop、自定义等目录,创建模板如下 type : 必须的 iceberg 。(必需

    2024年02月08日
    浏览(40)
  • 陀螺仪LSM6DSV16X与AI集成(3)----读取融合算法输出的四元数

    LSM6DSV16X 特性涉及到的是一种低功耗的传感器融合算法(Sensor Fusion Low Power, SFLP). 低功耗传感器融合(SFLP)算法: 该算法旨在以节能的方式结合加速度计和陀螺仪的数据。传感器融合算法通过结合不同传感器的优势,提供更准确、可靠的数据。 6轴游戏旋转向量: SFLP算法能

    2024年02月03日
    浏览(40)
  • 【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)

    简介:     FlinkCDC读取Mysql数据源,程序中使用了自定义反序列化器,完整的Flink结构,开箱即用。 本工程提供 1、项目源码及详细注释,简单修改即可用在实际生产代码 2、成功编译截图 3、自己编译过程中可能出现的问题 4、mysql建表语句及测试数据 5、修复FlinkCDC读取Mysql数

    2024年02月07日
    浏览(36)
  • springboot集成Elasticsearch7.16,使用https方式连接并忽略SSL证书

    千万万苦利用科学上网找到了,记录一下

    2024年02月09日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包