iceberg的java api使用

这篇具有很好参考价值的文章主要介绍了iceberg的java api使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【前言】

了解一个组件的最好方式是先使用该组件,今天我们就来聊聊如何通过java api对iceberg进行操作。

为什么是选择api进行介绍,而不是更通用的flink、spark、hive等。一方面是觉得flink、spark使用iceberg的介绍网上已经有很多,官网的介绍也比较清晰,而java api的介绍则相对少些;另一方面,不管是flink,spark最终都还是调用这些基本的api完成相关的操作的,因此先从api入手,后续对flink,spark,trino等组件对iceberg的操作原理理解起来也会更容易些。所以就有了本文的内容。

【catalog的创建】

在创建数据库,表之前需要先创建catalog,这里主要介绍hive类型的catalog。

import org.apache.iceberg.hive.HiveCatalog;

HiveCatalog catalog = new HiveCatalog();
catalog.setConf(conf);

Map <String, String> properties = new HashMap<String, String>();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/user/hive/warehouse")
properties.put(CatalogProperties.URI, "thrift://172.16.55.21:9083");
properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.hive.HiveCatalog");

// 初始化catalog
catalog.initialize("hive",properties);

对于hive类型的catalog,主要指定数据库存储位置,以及hive metastore server的URI。

【创建表】

对于iceberg表,可以理解由四部分组成,表结构定义(schema)、分区定义(partitionSpec)、表的属性(properties),以及表的唯一识别信息(identity)即表所属的数据库与表名。创建表时只需要分别制定这些内容即可。

// 定义表结构schema
Schema schema = new Schema(
    Types.NestedField.required(1, "id", Types.IntegerType.get()),
    Types.NestedField.required(2, "name", Types.StringType.get()),
    Types.NestedField.required(3, "birth", Types.StringType.get()));

// 分区定义(以birth字段按月进行分区)
PartitionSpec spec = PartitionSpec.builderFor(schema).month("birth").build();

// 数据库名,表名
TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
// 表的属性
Map<String, String> properties = new HashMap<>();
properties.put("engine.hive.enabled", "true");
// 建表
Table table = catalog.createTable(name, schema, spec, properties);

这里需要注意的是:分区定义中的字段必须是schema中已有的字段,如果在schema中找不到对应的字段,会报错抛异常。

iceberg的java api使用

但是,通过sql方式建表时,分区字段会隐式地加入到表字段定义中,即不用强制写到schema的字段定义。例如通过如下hivesql语句建表:

create table developer(
id int,
name string
)
partitioned by (birth Date)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
tblproperties('engine.hive.enabled'='true');

建表后的情况如下所示:

iceberg的java api使用

【插入数据】

插入数据可以分为3个步骤,首先根据表格式构造对应的数据记录,然后将记录写入到指定格式(parquet、orc等)的文件中,最后将文件列表写入到表中。

// 1. 构建记录
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(ImmutableMap.of("id", 1, "name", "chen", "birth", "2020-03-08"));
builder.add(ImmutableMap.of("id", 2, "name", "yuan", "birth", "2021-03-09"));
builder.add(ImmutableMap.of("id", 3, "name", "jie", "birth", "2023-03-10"));
builder.add(ImmutableMap.of("id", 4, "name", "ma", "birth", "2023-03-11"));
ImmutableList<GenericRecord> records = builder.build();

// 2. 将记录写入parquet文件
String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
    Parquet.writeData(file)
    .schema(schema)
    .createWriterFunc(GenericParquetWriter::buildWriter)
    .overwrite()
    .withSpec(PartitionSpec.unpartitioned())
    .build();
try {
    for (GenericRecord record : builder.build()) {
        dataWriter.write(record);
    }
} finally {
    dataWriter.close();
}

// 3. 将文件写入table中
DataFile dataFile = dataWriter.toDataFile();
table.newAppend().appendFile(dataFile).commit();

这里,对于数据文件的存储位置是有一定规范的,如果没有在指定路径下存放,那么对于其他组件来说(比如表同步到hive后),会出现数据不完整或者查不到的情况。

【行级别的查询数据】

查询是通过构造ScanBuilder,并配合IcebergGenerics.read来完成的。ScanBuilder还可以进行select选择列,以及通过where指定查询条件。

Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(table);
// 查询全部
CloseableIterable<Record> records = scanBuilder.build();
for (Record record : records) {
}
// 指定select列与where条件的查询
//CloseableIterable<Record> records = scanBuilder.select("id", "name").where(Expressions.lessThan("id", Integer.valueOf(10))).build();

【表结构变更】

iceberg所具备的一项特点就是可以对表结构进行变更,例如新增,删除已有字段,字段名或类型的变更,新增分区等。

1)新增列字段

Table table = catalog.loadTable(TableIdentifier.of("iceberg_db", "developer"));
UpdateSchema newSchema = table.updateSchema();
// 字段名, 字段类型
newSchema.addColumn("skill", Type.StringType.get());
updateSchema.commit();

对于已经写入的记录数据,其新增字段的值为NULL。

iceberg的java api使用

当然还可以UpdateSchema进行删除字段、重命名字段、更新字段(类型),调整字段位置等操作。

2)新增分区

通过UpdatePartitionSpec可以进行分区的相关操作。

Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
UpdatePartitionSpec updatePartitionSpec = table.updateSpec();
updatePartitionSpec.addField("skill");
updatePartitionSpec.commit();

【snapshot的操作】

完成表的加载后,可以得到表的所有snapshot信息,也可以删除指定的snapshot,或指定时间之前的snapshot。

Table table = catalog.loadTable(TableIdentifier.of(dbName, tblName));
for (Snapshot snapshot : table.snapshots()) {
    System.out.println(snapshot.sequenceNumber() + " " + snapshot.snapshotId() + " " + snapshot.parentId() + " " + snapshot.timestampMillis());
}

ExpireSnapshots expireSnapshot = table.expireSnapshots();
expireSnapshot.expireOlderThan(table.currentSnapshot().timestampMillis());
expireSnapshot.commit();

【删除表】

删除表的操作则很简单,通过catalog对表进行删除。

TableIdentifier name = TableIdentifier.of("iceberg_db", "developer");
catalog.dropTable(name, true);

【总结】

本文主要介绍iceberg api的一些基本操作,这里未涉及数据的更新与删除,因为这是一个比较大的知识点。另外,分区的新增,添加新的列这些操作的背后逻辑和iceberg的文件存储格式都有一定的关系,我们后续会逐一介绍。

好了,这就是本文的全部内容,如果觉得本文对您有帮助,请点赞+转发,如果觉得有不正确的地方,也可以拍砖指点,最后,欢迎加我微信交流~文章来源地址https://www.toymoban.com/news/detail-496167.html

到了这里,关于iceberg的java api使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入了解:Java中获取 List中最后一个元素

    在Java编程中,我们经常会使用List来存储一组元素。有时候,我们需要获取List中的最后一个元素。本文将介绍几种获取List中最后一个元素的方法。 List接口提供了一个get()方法,可以通过索引来获取List中的元素。由于List的索引是从0开始的,所以最后一个元素的索引是List的大

    2024年02月07日
    浏览(53)
  • 网站和API支持HTTPS,最好在Nginx上配置

    随着我们网站用户的增多,我们会逐渐意识到HTTPS加密的重要性。在不修改现有代码的情况下,要从HTTP升级到HTTPS,让Nginx支持HTTPS是个很好的选择。今天我们来讲下如何从Nginx入手,从HTTP升级到HTTPS,同时支持静态网站和SpringBoot应用,希望对大家有所帮助! 生成SSL自签名证书

    2024年02月11日
    浏览(36)
  • 一文搞懂!最好用的七大顶级 API 接口测试工具

    现在 API 接口已经成为软件开发重要的组成部分,由于 API 并没有 GUI 图形界面,无法直观的对接口进行测试,所以对于前后端开发来说,找到一套趁手的工具对 API 接口进行测试,了解开发的程序是否符合预期十分重要。 面对批量的 API,手动测试变得非常低效,自动化 API 接

    2024年02月15日
    浏览(48)
  • 微信小程序--》你真的了解小程序组件的使用吗?

    🏍️作者简介:大家好,我是亦世凡华、渴望知识储备自己的一名在校大学生 🛵个人主页:亦世凡华、 🛺系列专栏:微信小程序 🚲座右铭:人生亦可燃烧,亦可腐败,我愿燃烧,耗尽所有光芒。 👀引言        ⚓经过web前端开发的学习,相信大家对于前端开发有了一

    2024年02月01日
    浏览(33)
  • 最好的Vue组件库之Vuetify的入坑指南(持续更新中)

    目录      安装Vuetify      文档结构         快速入门         特性         样式和动画          首先先声明,个人不是什么很牛逼的大佬,只是想向那些想入坑Vuetify的前端新手或者嫌文档太长不知如何入手的人提供一些浅显的建议而已,能让你们稍微少走一些弯

    2024年01月23日
    浏览(37)
  • Java设计模式-前言

     馆长准备了很多学习资料,其中包含 java方面,jvm调优,spring / spring boot /spring cloud ,微服务,分布式,前端,js书籍资料,视频资料,以及各类常用软件工具,破解工具  等资源。请关注“IT技术馆”公众号,进行关注,馆长会每天更新资源和更新技术文章等。请大家多多关

    2024年01月21日
    浏览(42)
  • 每天一个知识点 - 了解和使用super关键字

    super是一个,super和this很类似 可以使用 super() 函数访问父类的构造函数,从而委托父类完成一些初始化的工作 如果子类重写了父类的中某个方法的实现,可以通过使用 super 来引用父类的方法实现        super( )函数是用于调用父类的一个方法,指向了当前对象自

    2024年02月21日
    浏览(50)
  • 微信小程序的启动和渲染过程(加组件分类和组件的基本使用以及API分类)

    关于微信小程序知识点一共做了六个博客,涵盖大部分内容,有想学习的可以按照以下顺序查看 1.微信小程序的启动和渲染过程(加组件分类和组件的基本使用以及API分类) 2.微信小程序wxml的数据和事件的绑定,以及条件和列表的渲染 3.微信小程序wxss相关介绍、全局配置和tabbar知识

    2024年02月11日
    浏览(65)
  • 从头开始用JAVA创建一个自己的简单API并实现第三方调用

            相信大家对这个词汇并不陌生,通俗来说API就是程序之间的接口,在学习和工作中经常会调用别人的API,那么如果我们要做一个自己的API,要如何下手呢。本文将用Spring+JAVA编写一个简单的API,过程可供初学者参考。         为了顾及完全没有经验的小白(比如我

    2024年02月10日
    浏览(55)
  • 数据湖Iceberg介绍和使用(集成Hive、SparkSQL、FlinkSQL)

    概述 为了解决数据存储和计算引擎之间的适配的问题,Netflix开发了Iceberg,2018年11月16日进入Apache孵化器,2020 年5月19日从孵化器毕业,成为Apache的顶级项目。 Iceberg是一个面向海量数据分析场景的 开放表格式(Table Format) 。表格式(Table Format)可以理解为 元数据以及数据文

    2024年02月10日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包