目录
0. 相关文章链接
1. 创建表
1.1. 启动spark-sql
1.2. 建表参数
1.3. 创建非分区表
1.4. 创建分区表
1.5. 在已有的hudi表上创建新表
1.6. 通过CTAS (Create Table As Select)建表
2. 插入数据
2.1. 向非分区表插入数据
2.2. 向分区表动态分区插入数据
2.3. 向分区表静态分区插入数据
2.4. 使用bulk_insert插入数据
3. 查询数据
3.1. 查询
3.2. 时间旅行查询
4. 更新数据
4.1. update
4.2. MergeInto
5. 删除数据
6. 覆盖数据
7. 修改表结构(Alter Table)
8. 修改分区
9. 存储过程(Procedures)
0. 相关文章链接
Hudi文章汇总
1. 创建表
1.1. 启动spark-sql
# 启动spark-sql之前需要先启动Hive的Metastore
nohup hive --service metastore &
#针对Spark 3.2
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# 如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下
1.2. 建表参数
参数名 |
默认值 |
说明 |
primaryKey |
uuid |
表的主键名,多个字段用逗号分隔。 同 hoodie.datasource.write.recordkey.field |
preCombineField |
表的预合并字段。 同 hoodie.datasource.write.precombine.field |
|
type |
cow |
创建的表类型: type = 'cow' type = 'mor' 同hoodie.datasource.write.table.type |
1.3. 创建非分区表
- 创建一个cow表,默认primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;
- 创建一个mor非分区表
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
1.4. 创建分区表
创建一个cow分区外部表,指定primaryKey和preCombineField
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
1.5. 在已有的hudi表上创建新表
不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。
- 非分区表
create table hudi_existing_tbl0
using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
- 分区表
create table hudi_existing_tbl1
using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';
1.6. 通过CTAS (Create Table As Select)建表
为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作。
- 通过CTAS创建cow非分区表,不指定preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select
1 as id
, 'a1' as name
, 10 as price
;
- 通过CTAS创建cow分区表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select
1 as id
, 'a1' as name
, 10 as price
, 1000 as ts
, '2021-12-01' as dt
;
- 通过CTAS从其他表加载数据
# 创建内部表
create table parquet_mngd
using parquet
location 'file:///tmp/parquet_dataset/*.parquet';
# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2
using hudi
location 'file:/tmp/hudi/hudi_tbl/'
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr)
as
select * from parquet_mngd
;
2. 插入数据
默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。
2.1. 向非分区表插入数据
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
2.2. 向分区表动态分区插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
2.3. 向分区表静态分区插入数据
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
2.4. 使用bulk_insert插入数据
hudi支持使用bulk_insert作为写操作的类型,只需要设置两个配置:
hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode
-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
1 a1_2 20.0 1002
3. 查询数据
3.1. 查询
select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0
3.2. 时间旅行查询
Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。
-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;
create table hudi_cow_pt_tbl1 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';
-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;
-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;
-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;
4. 更新数据
4.1. update
更新操作需要指定preCombineField。
- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
- 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;
-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';
4.2. MergeInto
- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
- 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
merge into hudi_cow_pt_tbl1 as target
using (
select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
5. 删除数据
- 语法:
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
- 案例:
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';
6. 覆盖数据
- 使用INSERT_OVERWRITE类型的写操作覆盖分区表
- 使用INSERT_OVERWRITE_TABLE类型的写操作插入覆盖非分区表或分区表(动态分区)
1)insert overwrite 非分区表
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
2)通过动态分区insert overwrite table到分区表
insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';
3)通过静态分区insert overwrite 分区表文章来源:https://www.toymoban.com/news/detail-457069.html
insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;
7. 修改表结构(Alter Table)
- 语法:
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName
-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
- 案例:
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
8. 修改分区
- 语法:
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )
-- Show Partitions
SHOW PARTITIONS tableIdentifier
- 案例:
--show partition:
show partitions hudi_cow_pt_tbl1;
--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');
- 注意:show partition结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。
9. 存储过程(Procedures)
- 语法:
--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)
--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)
- 案例(可用的存储过程:Procedures | Apache Hudi):
--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);
注:其他Hudi相关文章链接由此进 -> Hudi文章汇总 文章来源地址https://www.toymoban.com/news/detail-457069.html
到了这里,关于Hudi(7):Hudi集成Spark之spark-sql方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!