Hudi(7):Hudi集成Spark之spark-sql方式

这篇具有很好参考价值的文章主要介绍了Hudi(7):Hudi集成Spark之spark-sql方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0. 相关文章链接

1. 创建表

1.1. 启动spark-sql

1.2. 建表参数

1.3. 创建非分区表

1.4. 创建分区表

1.5. 在已有的hudi表上创建新表

1.6. 通过CTAS (Create Table As Select)建表

2. 插入数据

2.1. 向非分区表插入数据

2.2. 向分区表动态分区插入数据

2.3. 向分区表静态分区插入数据

2.4. 使用bulk_insert插入数据

3. 查询数据

3.1. 查询

3.2. 时间旅行查询

4. 更新数据

4.1. update

4.2. MergeInto

5. 删除数据

6. 覆盖数据

7. 修改表结构(Alter Table)

8. 修改分区

9. 存储过程(Procedures)


0. 相关文章链接

 Hudi文章汇总 

1. 创建表

1.1. 启动spark-sql

# 启动spark-sql之前需要先启动Hive的Metastore
nohup hive --service metastore & 

#针对Spark 3.2
spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# 如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下

1.2. 建表参数

参数名

默认值

说明

primaryKey

uuid

表的主键名,多个字段用逗号分隔。

同 hoodie.datasource.write.recordkey.field

preCombineField

表的预合并字段。

同 hoodie.datasource.write.precombine.field

type

cow

创建的表类型:

type = 'cow'

type = 'mor'

同hoodie.datasource.write.table.type

1.3. 创建非分区表

  • 创建一个cow表,默认primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;
  • 创建一个mor非分区表
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

1.4. 创建分区表

创建一个cow分区外部表,指定primaryKey和preCombineField

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

1.5. 在已有的hudi表上创建新表

不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。

  • 非分区表
create table hudi_existing_tbl0 
using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
  • 分区表
create table hudi_existing_tbl1 
using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

1.6. 通过CTAS (Create Table As Select)建表

为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作。

  • 通过CTAS创建cow非分区表,不指定preCombineField 
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
;
  • 通过CTAS创建cow分区表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 
    1 as id
    , 'a1' as name
    , 10 as price
    , 1000 as ts
    , '2021-12-01' as dt
;
  • 通过CTAS从其他表加载数据
# 创建内部表
create table parquet_mngd 
using parquet 
location 'file:///tmp/parquet_dataset/*.parquet';

# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 
using hudi 
location 'file:/tmp/hudi/hudi_tbl/' 
options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
)
partitioned by (datestr) 
as 
select * from parquet_mngd
;

2. 插入数据

默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

2.1. 向非分区表插入数据

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

2.2. 向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

2.3. 向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

2.4. 使用bulk_insert插入数据

hudi支持使用bulk_insert作为写操作的类型,只需要设置两个配置:

hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode

-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002

3. 查询数据

3.1. 查询

select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

3.2. 时间旅行查询

Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。

-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';


-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;

-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;

4. 更新数据

4.1. update

更新操作需要指定preCombineField。

  • 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
  • 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

4.2. MergeInto

  • 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;


-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

5. 删除数据

  • 语法:
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
  • 案例:
delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

6. 覆盖数据

  • 使用INSERT_OVERWRITE类型的写操作覆盖分区表
  • 使用INSERT_OVERWRITE_TABLE类型的写操作插入覆盖非分区表或分区表(动态分区)

1)insert overwrite 非分区表 

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

2)通过动态分区insert overwrite table到分区表

insert overwrite table hudi_cow_pt_tbl1 select 10, 'a10', 1100, '2021-12-09', '11';

3)通过静态分区insert overwrite 分区表

insert overwrite hudi_cow_pt_tbl1 partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

7. 修改表结构(Alter Table)

  • 语法:
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
  • 案例:
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

8. 修改分区

  • 语法:
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier
  • 案例:
--show partition:
show partitions hudi_cow_pt_tbl1;

--drop partition:
alter table hudi_cow_pt_tbl1 drop partition (dt='2021-12-09', hh='10');
  • 注意:show partition结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。

9. 存储过程(Procedures)

  • 语法:
--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)

--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)
  • 案例(可用的存储过程:Procedures | Apache Hudi):
--show commit's info
call show_commits(table => 'hudi_cow_pt_tbl1', limit => 10);

注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 文章来源地址https://www.toymoban.com/news/detail-457069.html


到了这里,关于Hudi(7):Hudi集成Spark之spark-sql方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hudi0.14.0 集成 Spark3.2.3(IDEA编码方式)

    本次在IDEA下使用Scala语言进行开发,具体环境搭建查看文章 IDEA 下 Scala Maven 开发环境搭建。 1.1 添加maven依赖 创建Maven工程,pom文件:

    2024年01月24日
    浏览(44)
  • spark-sql

    [root@localhost bin]# ./spark-sql Error: Failed to load class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. You need to build Spark with -Phive and -Phive-thriftserver. 24/02/22 00:23:20 INFO ShutdownHookManager: Shutdown hook called 24/02/22 00:23:20 INFO Shutd

    2024年02月22日
    浏览(42)
  • Spark-SQL小结

    目录 一、RDD、DataFrame、DataSet的概念、区别联系、相互转换操作   1.RDD概念   2.DataFrame概念   3.DataSet概念   4.RDD、DataFrame、DataSet的区别联系   5.RDD、DataFrame、DataSet的相互转换操作    1 RDD-DataFrame、DataSet    2  DataFrame-RDD,DataSet    3 DataSet-RDD,DataFrame 二、Spark-SQL连接JDBC的方式

    2024年02月09日
    浏览(44)
  • Spark参数配置和调优,Spark-SQL、Config

    一、Hive-SQL / Spark-SQL参数配置和调优 二、shell脚本spark-submit参数配置 三、sparkSession中配置参数

    2024年02月13日
    浏览(47)
  • spark-sql字段血缘实现

    Apache Spark是一个开源的大数据处理框架,它提供了一种高效、易于使用的方式来处理大规模数据集。在Spark中,数据是通过DataFrame和Dataset的形式进行操作的,这些数据结构包含了一系列的字段(也称为列)。字段血缘是Spark中的一个关键概念,它帮助我们理解数据的来源和流

    2024年02月02日
    浏览(53)
  • spark集成hudi

    启动spark-shell 2 hudi内置数据生成器,生成10条json数据 3加载到DF,写入hudi,实现简单etl处理 4读取存储数据及注册临时表

    2024年02月07日
    浏览(34)
  • Spark-SQL连接Hive的五种方法

    若使用Spark内嵌的Hive,直接使用即可,什么都不需要做(在实际生产活动中,很少会使用这一模式) 步骤: 将Hive中conf/下的hive-site.xml拷贝到Spark的conf/目录下; 把Mysql的驱动copy到jars/目录下; 如果访问不到hdfs,则将core-site.xml和hdfs-site.xml拷贝到conf/目录下; 重启spark-shell;

    2024年02月16日
    浏览(43)
  • spark-sql: insert overwrite分区表问题

    用spark-sql,insert overwrite分区表时发现两个比较麻烦的问题: 从目标表select出来再insert overwrite目标表时报错:Error in query: Cannot overwrite a path that is also being read from. 从其他表select出来再insert overwrite目标表时,其他分区都被删除了. 印象中这两个问题也出现过,但凭经验和感觉,

    2024年02月11日
    浏览(48)
  • 在 spark-sql / spark-shell / hive / beeline 中粘贴 sql、程序脚本时的常见错误

    《大数据平台架构与原型实现:数据中台建设实战》一书由博主历时三年精心创作,现已通过知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描

    2024年02月14日
    浏览(36)
  • spark-sql处理json字符串的常用函数

    整理了spark-sql处理json字符串的几个函数: 1 get_json_object 解析不含数组的 json   2 from_json  解析json 3 schema_of_json 提供生成json格式的方法 4 explode   把JSONArray转为多行 get_json_object(string json_string, string path) :适合最外层为{}的json解析。  第一个参数是json对象变量,也就是含j

    2023年04月08日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包