Iceberg从入门到精通系列之十六:Flink Iceberg Connector

这篇具有很好参考价值的文章主要介绍了Iceberg从入门到精通系列之十六:Flink Iceberg Connector。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Flink Iceberg Connector

Apache Flink 支持直接创建 Iceberg 表,无需在 Flink SQL 中创建显式 Flink 目录。这意味着我们可以通过在 Flink SQL 中指定 ‘connector’=‘iceberg’ 表选项来创建一个 Iceberg 表,与 Flink 官方文档中的用法类似。

在 Flink 中,SQL CREATE TABLE 测试 (…)WITH (‘connector’=‘iceberg’, …) 会在当前 Flink 目录中创建一个 Flink 表(默认使用 GenericInMemoryCatalog),它只是映射到底层的 Iceberg表而不是直接在当​​前 Flink 目录中维护 Iceberg 表。

为了在 Flink SQL 中使用 SQL 语法 CREATE TABLE test (…)WITH (‘connector’=‘iceberg’, …) 创建表,Flink Iceberg Connector 提供了以下表属性:

  • connector:使用恒定的iceberg。
  • Catalog-name:用户指定的目录名称。这是必需的,因为连接器没有任何默认值。
  • Catalog-type:hive 或 hadoop 用于内置目录(默认为 hive),或者未设置以使用catalog-impl 自定义实现目录。
  • Catalog-impl:自定义目录实现的完全限定类名。如果未设置目录类型,则必须设置。
  • Catalog-database:手持catalog中的iceberg数据库名称,默认使用当前flink数据库名称。
  • Catalog-table:仓库目录中的冰山表名称默认。使用flink CREATE TABLE语句中的表名。

二、在 Hive 目录中管理的表

在执行以下SQL之前,请确保您已经按照快速入门文档正确配置了Flink SQL客户端。

以下SQL将在当前Flink目录中创建一个Flink表,该表映射到iceberg目录中管理的iceberg表default_database.flink_table。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

如果您想创建一个Flink表映射到Hive目录中管理的不同iceberg表(例如Hive中的hive_db.hive_iceberg_table),那么您可以按如下方式创建Flink表:

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

向 Flink 表写入记录时,如果底层目录数据库(上例中的 hive_db)不存在,则会自动创建。

三、在自定义目录中管理表🔗

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到 com.my.custom.CatalogImpl 类型的自定义目录中管理的iceberg表 default_database.flink_table。

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

四、一个完整的例子

以Hive目录为例:文章来源地址https://www.toymoban.com/news/detail-571369.html

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='file:///path/to/warehouse'
);

INSERT INTO flink_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC');

SET execution.result-mode=tableau;
SELECT * FROM flink_table;

+----+------+
| id | data |
+----+------+
|  1 |  AAA |
|  2 |  BBB |
|  3 |  CCC |
+----+------+
3 rows in set

到了这里,关于Iceberg从入门到精通系列之十六:Flink Iceberg Connector的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包