Hudi(四)集成Flink(2)

这篇具有很好参考价值的文章主要介绍了Hudi(四)集成Flink(2)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

6、读取方式

6.1、流读(Streaming Query)

        当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

1、WITH参数

名称

Required

默认值

说明

read.streaming.enabled

false

false

设置 true 开启流读模式

read.start-commit

false

最新 commit

指定 'yyyyMMddHHmmss' 格式的起始 commit(闭区间)

read.streaming.skip_compaction

false

false

流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:

1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费)

2) changelog 模式下保证语义正确性

0.11开始,以上两个问题已经通过保留 compaction instant time 修复

clean.retain_commits

false

10

cleaner最多保留的历史commits数,大于此数量的历史commits会被清理掉,changelog模式下,这个参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留50分钟的时间。

        注意:当参数read.streaming.skip_compaction打开并且streaming reader消费落后于clean.retain_commits数时,流读可能会丢失数据。从0.11开始,compaction不会再变更record的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。 

CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默认60s
);


insert into t5 select * from sourceT;

select * from t5;

6.2、增量读取(Incremental Query)

        从 0.10.0 开始支持。

        如果有增量读取 batch 数据的需求,增量读取包含三种场景。

        (1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

        (2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

        (3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

        WITH 参数

名称

Required

默认值

说明

read.start-commit

false

默认从最新 commit

支持 earliest 从最早消费

read.end-commit

false

默认到最新 commit

7、限流

        如果将全量数据(百亿数量级)和增量先同步到kafka,再通过flink流式消费的方式将库表数据直接导成hoodie表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的partition随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
        WITH参数

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速

8、写入方式

8.1、CDC数据同步

        CDC数据保存了完整的数据库变更,当前可通过两种途径将数据导入hudi:

Hudi(四)集成Flink(2)

        第一种:通过cdc-connector直接对接DB的binlog将数据导入hudi,优点是不依赖消息队列,缺点是对db server造成压力。
        第二种:对接cdc format消费kafka数据导入hudi,优点是可扩展性强,缺点是依赖kafka。
        注意:如果上游数据无法保证顺序,需要指定write.precombine.field字段

8.2、离线批量导入

        如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1、原理

  1. 批量导入省去了avro的序列化以及数据的merge过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  2. bulk_insert需要在Batch Execuiton Mode下执行更高效,Batch模式默认会按照partition path排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。
    1. SET execution.runtime-mode=batch;
    2. SET execution.checkpointing.interval=0;
  3. bulk_insert write task的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task的并发数就是划分的bucket数,当然每个bucket在写到文件大小上限(parquet 120MB)的时候会rollover到新的文件句柄,所以最后:写文件数量>=bulk_insert write task数。

2、WITH参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

8.3、全量接增量

        如果已经有全量的离线Hoodie表,需要接上实时写入,并且保证数据不重复,可以开启index bootstrap功能。
        如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

        WITH参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

*

设置正则表达式进行分区筛选,默认为加载全部分区

9、写入模式

9.1、Changelog模式

        如果希望Hoodie保留消息的所有变更(I/-U/U/D),之后接上Flink引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过流读MOR表可以消费到所有的变更记录。

1、WITH参数

名称

Required

默认值

说明

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

        批(快照)读仍然会合并所有的中间结果,不管format是否已存储中间状态。

        开启changelog.enabled参数后,中间的变更也只是Best Effort:异步的压缩任务会将中间变更合并成1条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader,比如调整压缩的两个参数:

        compaction.delta_commits:5 

        compaction.delta_seconds: 3600

        说明:
        Changelog 模式开启流读的话,要在 sql-client 里面设置参数:
        set sql-client.execution.result-mode=tableau; 
        或者
        set sql-client.execution.result-mode=changelog;
        否则中间结果在读的时候会被直接合并。(参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#running-sql-queries)

2、流读 changelog

        仅在0.10.0支持,本feature为实验性。

        开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费,我们可以通过流读ODS层changelog接上ETL逻辑写入到DWD层,如下图的pipeline:

Hudi(四)集成Flink(2)

        流读的时候我们要注意changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3、案例演示

-- 使用changelog
CREATE TABLE t6(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);


insert into t6 values(1,1);
insert into t6 values(1,2);

select * from t6;  可以获取最新的数据,一条
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 可以获取2条


-- 不使用changelog
CREATE TABLE t6_v(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 只会获取1条,读时合并

9.2、Append模式

        从0.10开始支持
        对于INSERT模式:

  • MOR默认会apply小文件策略:会追加写avro log文件
  • COW每次直接写新的parquet文件,没有小文件策略

        Hudi支持丰富的Clustering策略,优化INSERT模式下的小文件问题:

1、Inline Clustering

        只有Copy On Write表支持该模式

名称

Required

默认值

说明

write.insert.cluster

false

false

是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2、Async Clustering

        从 0.12 开始支持

(1)WITH参数

名称

Required

默认值

说明

clustering.schedule.enabled

false

false

是否在写入时定时异步调度 clustering plan,默认关闭

clustering.delta_commits

false

4

调度 clsutering plan 的间隔 commits,

clustering.schedule.enabled 为 true 时生效

clustering.async.enabled

false

false

是否异步执行 clustering plan,默认关闭

clustering.tasks

false

4

Clustering task 执行并发

clustering.plan.strategy.target.file.max.bytes

false

1024 * 1024 * 1024

Clustering 单文件目标大小,默认 1GB

clustering.plan.strategy.small.file.limit

false

600

小于该大小的文件才会参与 clustering,默认600MB

clustering.plan.strategy.sort.columns

false

N/A

支持指定特殊的排序字段

clustering.plan.partition.filter.mode

false

NONE

支持

NONE:不做限制

RECENT_DAYS:按时间(天)回溯

SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

        支持定制化的 clustering 策略。

名称

Required

默认值

说明

clustering.plan.partition.filter.mode

false

NONE

支持

  • NONE:不做限制
  • RECENT_DAYS:按时间(天)回溯
  • SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

clustering.plan.strategy.cluster.begin.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定开始 partition(inclusive)

clustering.plan.strategy.cluster.end.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定结束 partition(incluseve)

clustering.plan.strategy.partition.regex.pattern

false

N/A

正则表达式过滤 partitions

clustering.plan.strategy.partition.selected

false

N/A

显示指定目标 partitions,支持逗号 , 分割多个 partition

10、Bucket索引

        从0.11开始支持

        默认的flink流式写入使用state存储索引信息:primarykey到fileId的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,避免了索引的存储和查询开销。

1、WITH参数

名称

Required

默认值

说明

index.type

false

FLINK_STATE

设置BUCKET开启Bucket索引功能

hoodie.bucket.index.hash.field

false

主键

可以设置成主键的子集

hoodie.bucket.index.num.buckets

false

4

默认每个partition的bucket数,当前设置后则不可再变更。

2、和state索引的对比:
(1)bucket index没有state的存储计算开销,性能较好。
(2)bucket index无法扩buckets,state index则可以依据文件的大小动态扩容。
(3)bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。

11、Hudi Catalog

        将表的元数据持久化。从0.12.0开始支持,通过catalog可以管理flink创建的表,避免重复建表操作,另外hms模式的catalog支持自动补全hive同步参数。

        DFS模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'mode'='dfs' 
  );

         Hms模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'hive.conf.dir' = '${hive-site.xml 所在的目录}',
    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性
  );	

1、With参数

名称

Required

默认值

说明

catalog.path

true

--

默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:${catalog.path}/${db_name}/${table_name}

default-database

false

default

默认的 database 名

hive.conf.dir

false

--

hive-site.xml 所在的目录,只在 hms 模式下生效

mode

false

dfs

支持 hms模式通过 hive 管理元数据

table.external

false

false

是否创建外部表,只在 hms 模式下生效

2、使用dfs方式 

1、创建sql-client初始化sql文件

vim /opt/module/flink-1.13.2/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '/tmp/hudi_catalog',
    'mode'='dfs' 
  );

USE CATALOG hoodie_catalog;

2、 指定sql-client启动时加载sql文件

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

需要先创建库 

Hudi(四)集成Flink(2)

3、建库建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

4、退出sql-client,重新进入,表信息还在文章来源地址https://www.toymoban.com/news/detail-466294.html

use test;
show tables;
select * from t2;

到了这里,关于Hudi(四)集成Flink(2)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(40)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(44)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(47)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(44)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 启动Spark Shell

    2024年01月24日
    浏览(38)
  • 阿里云 EMR 基于 Paimon 和 Hudi 构建 Streaming Lakehouse

    01 背景信息 数据湖与传统的数据仓库相比,可以更灵活地处理各种类型的数据,并支持高度可扩展的存储,通常被用于大数据分析。为了支持准实时乃至实时的数据处理,数据湖需要能够快速地接收和存储数据(数据入湖),同时提供低延迟的查询性能以满足分析需求。 A

    2024年01月20日
    浏览(41)
  • 基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:为实现基于数据湖的流批一体,采用业内主流技术栈hudi、flink、CDH(hive、spark)。flink使用sql client与hive的catalog打通,可以与hive共享元数据,使用sql client可操作hive中的表,实现批流一体;flink与hudi集成可以实现数据实时入湖;hudi与hive集成可以实现湖仓一体,用flink实

    2024年02月12日
    浏览(57)
  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事项:一般都是用基于Flink的Hive Catalog,使用HMS存储表模型数据 1、集成方式 (1)下载jar包 下载地址 (2)启动FlinkSQL ①StandLone模式启动 ②Flink On Yarn模式启动 2、基本使用 2.1、创建catalog 核心:可创建hive、hadoop、自定义等目录,创建模板如下 type : 必须的 iceberg 。(必需

    2024年02月08日
    浏览(42)
  • 【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)

    简介:     FlinkCDC读取Mysql数据源,程序中使用了自定义反序列化器,完整的Flink结构,开箱即用。 本工程提供 1、项目源码及详细注释,简单修改即可用在实际生产代码 2、成功编译截图 3、自己编译过程中可能出现的问题 4、mysql建表语句及测试数据 5、修复FlinkCDC读取Mysql数

    2024年02月07日
    浏览(39)
  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包