Hudi(17):Hudi集成Flink之写入方式

这篇具有很好参考价值的文章主要介绍了Hudi(17):Hudi集成Flink之写入方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0. 相关文章链接

1. CDC 数据同步

1.1. 准备MySQL表

1.2. flink读取mysql binlog并写入kafka

1.3. flink读取kafka数据并写入hudi数据湖

1.4. 使用datafaker插入数据

1.5. 统计数据入Hudi情况

1.6. 实时查看数据入湖情况

2. 离线批量导入

2.1. 原理

2.2. WITH 参数

2.3. 案例

3. 全量接增量

3.1. WITH 参数

3.2. 使用流程

3.3. 说明


0. 相关文章链接

 Hudi文章汇总 

1. CDC 数据同步

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi:

Hudi(17):Hudi集成Flink之写入方式

  • 第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。
  • 第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。

注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。

1.1. 准备MySQL表

注意:需要提前开启MySQL的binlog

测试表建表语句如下所示:

create database test;
use test;
create table stu3 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

1.2. flink读取mysql binlog并写入kafka

步骤一:创建MySQL表(使用flink-sql创建MySQL源的sink表)

create table stu3_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop1',
  'port' = '3306',
  'username' = 'root',
  'password' = 'aaaaaa',
  'database-name' = 'test',
  'table-name' = 'stu3'
);

步骤二:创建Kafka表(使用flink-sql创建MySQL源的sink表)

create table stu3_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'upsert-kafka'
  ,'topic' = 'cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);

步骤三:将mysql binlog日志写入kafka(flink-sql内部操作)

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;

1.3. flink读取kafka数据并写入hudi数据湖

步骤一:创建kafka源表(使用flink-sql创建以kafka为源端的表)

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
);

步骤二:创建hudi目标表(使用flink-sql创建以hudi为目标端的表)

create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
partitioned by (`school`)
with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school'
);

步骤三:将kafka数据写入到hudi中(flink-sql内部操作)

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;

1.4. 使用datafaker插入数据

datafaker安装及说明:datafaker --- 测试数据生成工具-阿里云开发者社区

步骤一:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步骤二:生成10000条数据并写入到mysql中的test.stu3表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu3 10000 --meta meta.txt

注意:如果要再次生成测试数据,则需要修改meta.txt将自增id中的1改为比10000大的数,不然会出现主键冲突情况。

1.5. 统计数据入Hudi情况

create table stu3_binlog_hudi_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

select count(*) from stu3_binlog_hudi_view;  

1.6. 实时查看数据入湖情况

create table stu3_binlog_hudi_streaming_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school',
  'read.streaming.enabled' = 'true'
  );

 
select * from  stu3_binlog_hudi_streaming_view;

2. 离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

2.1. 原理

  • 批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  • bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。
SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval = 0;
  • bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

2.2. WITH 参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

2.3. 案例

步骤一:MySQL建表

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

步骤二:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步骤三:使用datafaker生成10万条数据并写入到mysql中的test.stu4表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu4 100000 --meta meta.txt

注意:
    如果要再次生成测试数据,
    则需要将meta.txt中的自增id改为比100000大的数,
    不然会出现主键冲突情况。

步骤四:Flink SQL client 创建myql数据源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'aaaaaa',
  'table-name' = 'stu4'
);

步骤五:Flink SQL client创建hudi表

 create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
);

-- 注意:核心点是其中的 write.option 配置为 bulk_insert

步骤六:Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

3. 全量接增量

  • 如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。
  • 如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

3.1. WITH 参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

*

设置正则表达式进行分区筛选,默认为加载全部分区

3.2. 使用流程

  1. CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
  2. 设置 index.bootstrap.enabled = true开启索引加载功能
  3. flink conf 中设置 checkpoint 失败容忍 execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)
  4. 等待第一次 checkpoint 成功,表示索引加载完成
  5. 索引加载完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint)
  6. 重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

3.3. 说明

  1. 索引加载是阻塞式,所以在索引加载过程中 checkpoint 无法完成
  2. 索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来
  3. 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度
  4. 第一次checkpoint成功就表示索引已经加载完成,后续从 checkpoint 恢复时无需再次加载索引

注意:在当前的0.12版本,以上划横线的部分已经不再需要了。(0.9 cherry pick 分支之后)


注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 文章来源地址https://www.toymoban.com/news/detail-450810.html


到了这里,关于Hudi(17):Hudi集成Flink之写入方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(29)
  • 问题:Spark SQL 读不到 Flink 写入 Hudi 表的新数据,打开新 Session 才可见

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(43)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(35)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(33)
  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(31)
  • Hudi0.14.0集成Spark3.2.3(Spark Shell方式)

    1.1 启动Spark Shell

    2024年01月24日
    浏览(25)
  • flink1.17.0 集成kafka,并且计算

    flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。 这里我使用的kafka版本是3.2.0,部署的方法可以参考, kafka部署 启动后查看java进程是否存在,存在后执行下一步。

    2024年02月09日
    浏览(31)
  • hadoop3.2.4集成flink 1.17.0

    flink安装部署有三种方式 local:单机模式,尽量不使用 standalone: flink自带集群,资源管理由flink集群管理,开发环境测试使用,不需要hadoop集群 flink on yarn: 把资源管理交给yarn实现,计算机资源统一由Haoop YARN管理,生产环境测试,需要先启动hadoop集群。(这里分为可以继续细分

    2024年02月17日
    浏览(37)
  • 基于数据湖的流批一体:flink1.15.3与Hudi0.12.1集成,并配置基于CDH6.3.2的hive catalog

    前言:为实现基于数据湖的流批一体,采用业内主流技术栈hudi、flink、CDH(hive、spark)。flink使用sql client与hive的catalog打通,可以与hive共享元数据,使用sql client可操作hive中的表,实现批流一体;flink与hudi集成可以实现数据实时入湖;hudi与hive集成可以实现湖仓一体,用flink实

    2024年02月12日
    浏览(44)
  • CDH6.3.2 集成 Flink 1.17.0 失败过程

    目录 一:下载Flink,并制作parcel包 1.相关资源下载 2. 修改配置 准备工作一: 准备工作二: 3. 开始build 二:开始在CDH页面分发激活  三:CDH添加Flink-yarn 服务  四:启动不起来的问题解决 五:CDH6.3.2集群集成zookeeper3.6.3 六:重新适配Flink服务 环境说明: cdh版本:cdh6.3.2 组件版本信

    2024年01月17日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包