目录
0. 相关文章链接
1. CDC 数据同步
1.1. 准备MySQL表
1.2. flink读取mysql binlog并写入kafka
1.3. flink读取kafka数据并写入hudi数据湖
1.4. 使用datafaker插入数据
1.5. 统计数据入Hudi情况
1.6. 实时查看数据入湖情况
2. 离线批量导入
2.1. 原理
2.2. WITH 参数
2.3. 案例
3. 全量接增量
3.1. WITH 参数
3.2. 使用流程
3.3. 说明
0. 相关文章链接
Hudi文章汇总
1. CDC 数据同步
CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi:
- 第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。
- 第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。
注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。
1.1. 准备MySQL表
注意:需要提前开启MySQL的binlog
测试表建表语句如下所示:
create database test;
use test;
create table stu3 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '学生名字',
school varchar(20) not null comment '学校名字',
nickname varchar(20) not null comment '学生小名',
age int not null comment '学生年龄',
class_num int not null comment '班级人数',
phone bigint not null comment '电话号码',
email varchar(64) comment '家庭网络邮箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
1.2. flink读取mysql binlog并写入kafka
步骤一:创建MySQL表(使用flink-sql创建MySQL源的sink表)
create table stu3_binlog(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop1',
'port' = '3306',
'username' = 'root',
'password' = 'aaaaaa',
'database-name' = 'test',
'table-name' = 'stu3'
);
步骤二:创建Kafka表(使用flink-sql创建MySQL源的sink表)
create table stu3_binlog_sink_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
) with (
'connector' = 'upsert-kafka'
,'topic' = 'cdc_mysql_stu3_sink'
,'properties.zookeeper.connect' = 'hadoop1:2181'
,'properties.bootstrap.servers' = 'hadoop1:9092'
,'key.format' = 'json'
,'value.format' = 'json'
);
步骤三:将mysql binlog日志写入kafka(flink-sql内部操作)
insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
1.3. flink读取kafka数据并写入hudi数据湖
步骤一:创建kafka源表(使用flink-sql创建以kafka为源端的表)
create table stu3_binlog_source_kafka(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string
) with (
'connector' = 'kafka',
'topic' = 'cdc_mysql_stu3_sink',
'properties.bootstrap.servers' = 'hadoop1:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup'
);
步骤二:创建hudi目标表(使用flink-sql创建以hudi为目标端的表)
create table stu3_binlog_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'insert',
'write.precombine.field' = 'school'
);
步骤三:将kafka数据写入到hudi中(flink-sql内部操作)
insert into stu3_binlog_sink_hudi
select * from stu3_binlog_source_kafka;
1.4. 使用datafaker插入数据
datafaker安装及说明:datafaker --- 测试数据生成工具-阿里云开发者社区
步骤一:新建meta.txt文件
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
步骤二:生成10000条数据并写入到mysql中的test.stu3表
datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu3 10000 --meta meta.txt
注意:如果要再次生成测试数据,则需要修改meta.txt将自增id中的1改为比10000大的数,不然会出现主键冲突情况。
1.5. 统计数据入Hudi情况
create table stu3_binlog_hudi_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'school'
);
select count(*) from stu3_binlog_hudi_view;
1.6. 实时查看数据入湖情况
create table stu3_binlog_hudi_streaming_view(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.precombine.field' = 'school',
'read.streaming.enabled' = 'true'
);
select * from stu3_binlog_hudi_streaming_view;
2. 离线批量导入
如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。
2.1. 原理
- 批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
- bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。
SET execution.runtime-mode = batch;
SET execution.checkpointing.interval = 0;
- bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。
2.2. WITH 参数
名称 |
Required |
默认值 |
说明 |
write.operation |
true |
upsert |
配置 bulk_insert 开启该功能 |
write.tasks |
false |
4 |
bulk_insert 写 task 的并发,最后的文件数 >=write.tasks |
write.bulk_insert.shuffle_input (从 0.11 开始) |
false |
true |
是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险 |
write.bulk_insert.sort_input (从 0.11 开始) |
false |
true |
是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量 |
write.sort.memory |
128 |
sort 算子的可用 managed memory(单位 MB) |
2.3. 案例
步骤一:MySQL建表
create database test;
use test;
create table stu4 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '学生名字',
school varchar(20) not null comment '学校名字',
nickname varchar(20) not null comment '学生小名',
age int not null comment '学生年龄',
score decimal(4,2) not null comment '成绩',
class_num int not null comment '班级人数',
phone bigint not null comment '电话号码',
email varchar(64) comment '家庭网络邮箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
步骤二:新建meta.txt文件
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
步骤三:使用datafaker生成10万条数据并写入到mysql中的test.stu4表
datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu4 100000 --meta meta.txt
注意:
如果要再次生成测试数据,
则需要将meta.txt中的自增id改为比100000大的数,
不然会出现主键冲突情况。
步骤四:Flink SQL client 创建myql数据源
create table stu4(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'aaaaaa',
'table-name' = 'stu4'
);
步骤五:Flink SQL client创建hudi表
create table stu4_sink_hudi(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'school'
);
-- 注意:核心点是其中的 write.option 配置为 bulk_insert
步骤六:Flink SQL client执行mysql数据插入到hudi中
insert into stu4_sink_hudi select * from stu4;
3. 全量接增量
- 如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。
- 如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。
3.1. WITH 参数
名称 |
Required |
默认值 |
说明 |
index.bootstrap.enabled |
true |
false |
开启索引加载,会将已存表的最新数据一次性加载到 state 中 |
index.partition.regex |
false |
* |
设置正则表达式进行分区筛选,默认为加载全部分区 |
3.2. 使用流程
- CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
- 设置 index.bootstrap.enabled = true开启索引加载功能
flink conf 中设置 checkpoint 失败容忍 execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)等待第一次 checkpoint 成功,表示索引加载完成索引加载完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint)- 重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle
3.3. 说明
索引加载是阻塞式,所以在索引加载过程中 checkpoint 无法完成索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来- 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度
第一次checkpoint成功就表示索引已经加载完成,后续从 checkpoint 恢复时无需再次加载索引
注意:在当前的0.12版本,以上划横线的部分已经不再需要了。(0.9 cherry pick 分支之后)文章来源:https://www.toymoban.com/news/detail-450810.html
注:其他Hudi相关文章链接由此进 -> Hudi文章汇总 文章来源地址https://www.toymoban.com/news/detail-450810.html
到了这里,关于Hudi(17):Hudi集成Flink之写入方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!