一、Flink Iceberg Connector
Apache Flink 支持直接创建 Iceberg 表,无需在 Flink SQL 中创建显式 Flink 目录。这意味着我们可以通过在 Flink SQL 中指定 ‘connector’=‘iceberg’ 表选项来创建一个 Iceberg 表,与 Flink 官方文档中的用法类似。
在 Flink 中,SQL CREATE TABLE 测试 (…)WITH (‘connector’=‘iceberg’, …) 会在当前 Flink 目录中创建一个 Flink 表(默认使用 GenericInMemoryCatalog),它只是映射到底层的 Iceberg表而不是直接在当前 Flink 目录中维护 Iceberg 表。
为了在 Flink SQL 中使用 SQL 语法 CREATE TABLE test (…)WITH (‘connector’=‘iceberg’, …) 创建表,Flink Iceberg Connector 提供了以下表属性:
- connector:使用恒定的iceberg。
- Catalog-name:用户指定的目录名称。这是必需的,因为连接器没有任何默认值。
- Catalog-type:hive 或 hadoop 用于内置目录(默认为 hive),或者未设置以使用catalog-impl 自定义实现目录。
- Catalog-impl:自定义目录实现的完全限定类名。如果未设置目录类型,则必须设置。
- Catalog-database:手持catalog中的iceberg数据库名称,默认使用当前flink数据库名称。
- Catalog-table:仓库目录中的冰山表名称默认。使用flink CREATE TABLE语句中的表名。
二、在 Hive 目录中管理的表
在执行以下SQL之前,请确保您已经按照快速入门文档正确配置了Flink SQL客户端。
以下SQL将在当前Flink目录中创建一个Flink表,该表映射到iceberg目录中管理的iceberg表default_database.flink_table。
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
如果您想创建一个Flink表映射到Hive目录中管理的不同iceberg表(例如Hive中的hive_db.hive_iceberg_table),那么您可以按如下方式创建Flink表:
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'catalog-database'='hive_db',
'catalog-table'='hive_iceberg_table',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
向 Flink 表写入记录时,如果底层目录数据库(上例中的 hive_db)不存在,则会自动创建。
三、在自定义目录中管理表🔗
以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到 com.my.custom.CatalogImpl 类型的自定义目录中管理的iceberg表 default_database.flink_table。文章来源:https://www.toymoban.com/news/detail-571369.html
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='custom_prod',
'catalog-impl'='com.my.custom.CatalogImpl',
-- More table properties for the customized catalog
'my-additional-catalog-config'='my-value',
...
);
四、一个完整的例子
以Hive目录为例:文章来源地址https://www.toymoban.com/news/detail-571369.html
CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'uri'='thrift://localhost:9083',
'warehouse'='file:///path/to/warehouse'
);
INSERT INTO flink_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC');
SET execution.result-mode=tableau;
SELECT * FROM flink_table;
+----+------+
| id | data |
+----+------+
| 1 | AAA |
| 2 | BBB |
| 3 | CCC |
+----+------+
3 rows in set
到了这里,关于Iceberg从入门到精通系列之十六:Flink Iceberg Connector的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!