数据湖Iceberg-FlinkSQL集成(5)

这篇具有很好参考价值的文章主要介绍了数据湖Iceberg-FlinkSQL集成(5)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


数据湖Iceberg-简介(1)
数据湖Iceberg-存储结构(2)
数据湖Iceberg-Hive集成Iceberg(3)
数据湖Iceberg-SparkSQL集成(4)
数据湖Iceberg-FlinkSQL集成(5)
数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
数据湖Iceberg-Flink DataFrame集成(7)

数据湖Iceberg-FlinkSQL集成

环境准备

Flink与Iceberg的版本对应关系如下

Flink 版本 Iceberg 版本
1.11 0.9.0 – 0.12.1
1.12 0.12.0 – 0.13.1
1.13 0.13.0 – 1.0.0
1.14 0.13.0 – 1.1.0
1.15 0.14.0 – 1.1.0
1.16 1.1.0 – 1.1.0

jar包下载地址

https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/

在里面选择自己的版本即可,这里我使用的是flink 1.14.3 iceberg1.1.0版本

具体下载地址:https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/1.1.0/iceberg-flink-runtime-1.14-1.1.0.jar

jar包上传到Flink lib目录下

[root@ lib]# pwd
/opt/flink/lib
[root@ lib]# ll
total 252612
-rw-r--r-- 1 flink flink     85584 Jan 11  2022 flink-csv-1.14.3.jar
-rw-r--r-- 1 flink flink 136054094 Jan 11  2022 flink-dist_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink    153145 Jan 11  2022 flink-json-1.14.3.jar
-rw-rw-r-- 1 flink flink  43317025 Jan 13 13:54 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 flink flink   7709731 Aug 22  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 flink flink  39633410 Jan 11  2022 flink-table_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink   29256108 Apr 21 09:21 iceberg-flink-runtime-1.14-1.1.0.jar
-rw-r--r-- 1 flink flink    112758 May  3  2013 javax.ws.rs-api-2.0.jar
-rw-r--r-- 1 flink flink    208006 Jan  9  2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 flink flink    301872 Jan  9  2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 flink flink   1790452 Jan  9  2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 flink flink     24279 Jan  9  2022 log4j-slf4j-impl-2.17.1.jar

修改flink-conf.yaml配置

修改或添加以下配置

# 禁用 ClassLoader 检查
classloader.check-leaked-classloader: false

# 每个 TaskManager 上任务槽数量,这里为 4
taskmanager.numberOfTaskSlots: 4

# 状态后端使用 RocksDB
state.backend: rocksdb

# 每隔 30000 毫秒进行一次检查点
execution.checkpointing.interval: 30000

# 指定检查点保存的目录,这里为 HDFS 上的目录
state.checkpoints.dir: hdfs://hadoop1:8020/ckps

# 启用增量式检查点,这将在一定程度上提高性能
state.backend.incremental: true

启动flink-sql

注意:

刚刚Flink修改完需要重启Flink

输入 ./sql-client.sh embedded或者./sql-client

sql-client.sh embedded 是启动 Flink SQL Client 时指定的一种模式,即嵌入式模式。

在嵌入式模式下,Flink SQL Client 会自动启动一个 Flink 集群,无需手动启动,直接在命令行中交互式地输入 SQL 命令进行查询和操作。同时,Flink SQL Client 会将所有的数据和表定义存储在本地内存中,这意味着不支持持久化数据和高可用性。

相反,如果您使用的是独立模式,Flink SQL Client 会连接到一个已经运行的 Flink 集群。在这种模式下,需要首先手动启动 Flink 集群,并将 Flink SQL Client 配置为连接到该集群。独立模式相对嵌入式模式更加灵活和可扩展,但启动和配置过程可能需要更多的时间和精力。

总之,嵌入式模式适用于快速原型设计和小规模数据探索,而独立模式适用于生产环境和大规模数据处理。

数据湖Iceberg-FlinkSQL集成(5)

创建和使用Catalog

创建语法说明

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 
  • type: 必须是iceberg。(必须)
  • catalog-type: 内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。(可选)
  • catalog-impl: 自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。(可选)
  • property-version: 描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本为1。(可选)
  • cache-enabled: 是否启用目录缓存,默认值为true。(可选)
  • cache.expiration-interval-ms: 本地缓存catalog条目的时间(以毫秒为单位);负值,如-1表示没有时间限制,不允许设为0。默认值为-1。(可选)

Hive Catalog

  • 上传hive connector到flink的lib中

下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar

  • 重启Flink集群,进入sql-client

如果不上传jar包重启服务,后续可能会遇到这种错误

[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException
  • 创建Hive Catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://bigdata-24-195:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);

注意:这里指定的warehouse目录hive用户需要有权限访问,否则后续创建库或者表会失败

Flink SQL> CREATE DATABASE iceberg_db;
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException

详细异常日志可以在hive日志中查看

  • uri: Hive metastore的thrift uri。(必选)

  • clients:Hive metastore客户端池大小,默认为2。(可选)

  • warehouse: 数仓目录。

  • hive-conf-dir:包含hive-site.xml配置文件的目录路径,hive-site.xml中hive.metastore.warehouse.dir 的值会被warehouse覆盖。

  • hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目录路径。

  • 查看catalogs

Flink SQL> show CATALOGS ;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    hive_catalog |
+-----------------+
2 rows in set
  • 进入catalogs
use catalog hive_catalog;

Hadoop Catalog

Iceberg还支持HDFS中基于目录的catalog,可以使用’catalog-type’='hadoop’配置。

  • 创建catalog
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
  'property-version'='1'
);

use catalog hadoop_catalog;
create database iceberg_db;

在目录中可以看到我们创建的库和catalog

数据湖Iceberg-FlinkSQL集成(5)

配置sql-client初始化文件

配置初始化文件后,后续启动,不用每次重新启动都创建catalog

$FLINK_HOME/conf目录下创建sql-client-init.sql文件

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
  'property-version'='1'
);

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://bigdata-24-195:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);

USE CATALOG hive_catalog;

后续启动sql-client时,加上 -i sql文件路径 即可完成catalog的初始化,并进入hive_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql

DDL语句

创建数据库

CREATE DATABASE iceberg_db;
USE iceberg_db;

创建表

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

建表命令现在支持最常用的flink建表语法,包括:

  • PARTITION BY (column1, column2, …):配置分区,apache flink还不支持隐藏分区。
  • COMMENT ‘table document’:指定表的备注
  • WITH (‘key’=‘value’, …):设置表属性
    目前,不支持计算列、watermark(支持主键)。
创建分区表
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample1` (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg支持隐藏分区,但Apache flink不支持在列上通过函数进行分区,现在无法在flink DDL中支持隐藏分区。

使用LIKE语法建表

LIKE语法用于创建一个与另一个表具有相同schema、分区和属性的表。

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample2` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  `hive_catalog`.`iceberg_db`.`sample_like` LIKE `hive_catalog`.`iceberg_db`.`sample2`;
查看表结构

默认在FlinkSQL中无法查看到表结构

Flink SQL> desc formatted sample_like;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "sample_like" at line 1, column 16.
Was expecting one of:
    <EOF> 
    "." ...
    

我们可以在hive中查看iceberg的表结构

0: jdbc:hive2://bigdata-24-194:2181,bigdata-2> desc formatted sample_like;
INFO  : Compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.077 seconds
INFO  : Executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.108 seconds
INFO  : OK
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| id                            | bigint                                             |                                                    |
| data                          | string                                             |                                                    |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | iceberg_db                                         | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hdfs                                               | NULL                                               |
| CreateTime:                   | Fri Apr 22 14:16:17 CST 2023                       | NULL                                               |
| LastAccessTime:               | Sun Dec 14 04:03:18 CST 1969                       | NULL                                               |
| Retention:                    | 2147483647                                         | NULL                                               |
| Location:                     | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like | NULL                                               |
| Table Type:                   | EXTERNAL_TABLE                                     | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | EXTERNAL                                           | TRUE                                               |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"data\",\"required\":false,\"type\":\"string\"}]} |
|                               | metadata_location                                  | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like/metadata/00000-0750a26c-8b26-4417-9f27-7786a5775026.metadata.json |
|                               | numFiles                                           | 1                                                  |
|                               | snapshot-count                                     | 0                                                  |
|                               | table_type                                         | ICEBERG                                            |
|                               | totalSize                                          | 1225                                               |
|                               | transient_lastDdlTime                              | 1682057777                                         |
|                               | uuid                                               | 4e0be931-e962-4a94-a176-3969012647c1               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                                               |
| InputFormat:                  | org.apache.hadoop.mapred.FileInputFormat           | NULL                                               |
| OutputFormat:                 | org.apache.hadoop.mapred.FileOutputFormat          | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
32 rows selected (0.289 seconds)

修改表

Flink SQL 目前只支持修改表属性和表名,其他的暂不支持,需要使用API才可以修改

修改表属性
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` SET ('write.format.default'='avro');
修改表名
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` RENAME TO `hive_catalog`.`iceberg_db`.`new_sample`;

删除表

DROP TABLE `hive_catalog`.`iceberg_db`.`sample`;

不会删除具体数据,HDFS中文件还继续存在

查询数据

Flink On Yarn的问题

这里遇到个问题,先在这直接说了,刚刚只是操作元数据所以没遇到这个问题,在插入和查询都会遇到这个问题

我使用的是Ambari集成的Flink,运行模式为Flink on Yarn,直接用查询、插入语句去操作q数据会提示连接拒绝。

这里几个问题点需要注意:

  • 1.当我们使用Flink on Yarn模式提交时需要指定-s yarn-session去运行,如下
bin/sql-client.sh -s yarn-session  -i conf/sql-client-init.sql 

指定 -s yarn-session 后,Flink会在当前服务器/tmp/.yarn-properties-flink文件找到运行的yarn-session任务,去提交。

内容如下:

# cat /tmp/.yarn-properties-flink   
#Generated YARN properties file
#Fri Apr 21 11:11:44 CST 2023
dynamicPropertiesString=
applicationID=application_1675237371712_0532
  • 2.Ambari 默认Flink on Yarn 提交是使用的Flink用户,我们提交任务是使用HDFS用户,还是会导致提交失败

解决方法:kill之前使用flink用户提交的任务,使用HDFS启动Flink on Yarn任务

这里偷懒了,直接手动kill,手动启动了,有时间可以改下Ambari启动Flink方法,这样才一劳永逸

Flink on yarn 启动命令如下

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop/conf:/usr/hdp/3.1.5.0-152/hadoop/lib/*:/usr/hdp/3.1.5.0-152/hadoop/.//*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/./:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/.//*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/lib/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/.//*:/usr/hdp/3.1.5.0-152/hadoop-yarn/./:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/.//*:/usr/hdp/3.1.5.0-152/tez/*:/usr/hdp/3.1.5.0-152/tez/lib/*:/usr/hdp/3.1.5.0-152/tez/conf:/usr/hdp/3.1.5.0-152/tez/conf_llap:/usr/hdp/3.1.5.0-152/tez/doc:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-2.8-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib:/usr/hdp/3.1.5.0-152/tez/man:/usr/hdp/3.1.5.0-152/tez/tez-api-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-common-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-dag-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-examples-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-history-parser-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-javadoc-tools-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-job-analyzer-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-mapreduce-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-protobuf-history-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-internals-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-library-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-tests-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/ui:/usr/hdp/3.1.5.0-152/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/gcs-connector-hadoop3-1.9.17.3.1.5.0-152-shaded.jar:/usr/hdp/3.1.5.0-152/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-aws-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-datalake-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-hdfs-client-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.5.0-152/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.5.0-152/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.5.0-152/tez/lib/tez.tar.gz; 

/opt/flink/bin/yarn-session.sh -d -nm flinkapp-from-ambari -n 1 -s 1 -jm 768 -tm 1024 -qu default >> /var/log/flink/flink-setup.log &

Batch模式
SET execution.runtime-mode = batch;
select * from sample;
Streaming模式
-- 设置 Flink 的运行模式为流式计算
SET execution.runtime-mode = streaming;

-- 启用动态表选项
SET table.dynamic-table-options.enabled=true;

-- 设置 Flink SQL 执行结果的输出格式为 Tableau
SET sql-client.execution.result-mode=tableau;

-- 查询 Hive Catalog 中的 Iceberg 表 sample 中的所有数据
select * from hive_catalog.iceberg_db.sample;

SET table.dynamic-table-options.enabled=true;

从 1.11 开始,用户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (…) 语句内定义的 table options。

基本语法为:

table_path /*+ OPTIONS(key=val [, key=val]*) */
动态参数的使用没有语境限制,只要是引用表的地方都可以追加定义。在指定的表后面追加的动态参数会自动追加到原表定义中

从当前快照读取所有记录,然后从该快照读取增量数据
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

返回格式(会根据新增数据持续滚动)

Flink SQL> SELECT * FROM hive_catalog.iceberg_db.sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
2023-04-21 15:43:33,819 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at bigdata-24-194/172.16.24.194:8050
2023-04-21 15:43:33,821 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-04-21 15:43:33,822 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-04-21 15:43:33,830 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata-24-196:6588 of application 'application_1675237371712_0532'.
+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                   34 |                          aeefb |
| +I |                   34 |                             ae |
| +I |                    1 |                              a |
| +I |                    1 |                              a |
| +I |                   34 |                            aee |
| +I |                   34 |                    aeeefdsfafb |

读取指定快照id(不包含)后的增量数据
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
  • monitor-interval: 连续监控新提交数据文件的时间间隔(默认为10s)。

  • start-snapshot-id: 流作业开始的快照id。

注意:如果是无界数据流式upsert进iceberg表(读kafka,upsert进iceberg表),那么再去流读iceberg表会存在读不出数据的问题。如果无界数据流式append进iceberg表(读kafka,append进iceberg表),那么流读该iceberg表可以正常看到结果。

插入数据

INSERT INTO
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` SELECT id, data from sample2;
INSERT OVERWRITE

仅支持Flink的Batch模式

SET execution.runtime-mode = batch;

INSERT OVERWRITE sample VALUES (1, 'a');

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
UPSERT

当将数据写入v2表格式时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。

建表时指定

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample5` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);

插入时指定

INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'a'),(2,'b'),(3,'c');

结果:

+----+-------------+--------------------------------+
| op |          id |                           data |
+----+-------------+--------------------------------+
| +I |           1 |                              a |
| +I |           2 |                              b |
| +I |           3 |                              c |
+----+-------------+--------------------------------+

插入

INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'abc')); 

结果

+----+-------------+--------------------------------+
| op |          id |                           data |
+----+-------------+--------------------------------+
| +I |           1 |                            abc |
| +I |           2 |                              b |
| +I |           3 |                              c |
+----+-------------+--------------------------------+

插入的表,format-version需要为2。

OVERWRITE和UPSERT不能同时设置。在UPSERT模式下,如果对表进行分区,则分区字段必须也是主键。

读取Kafka流,upsert插入到iceberg表中

下载:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.3/flink-sql-connector-kafka_2.12-1.14.3.jar

将jar包放到$FLINK_HOME/lib目录下,重启Flink On Yarn

这里先说一个大坑,Iceberg现阶段的一个Bug

Kafka表必须要在default_catalog.default_database下,即catalog名为default_catalog,数据库(命名空间)为default_database下,否则kafka类型的表读取不到数据。

如果都在我们自己创建的catalog下创建,则执行INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;后,在Flink任务中看不到一个持续执行的Flink Job,而正常执行该命令Flink会执行一个持续执行的任务,去消费kafka数据写入Iceberg,正常情况如下图:

数据湖Iceberg-FlinkSQL集成(5)

所以这里我们kafka表在default_catalog.default_database下,写入数据的表在我们自己创建的hadoop_catalog.iceberg_db

create table default_catalog.default_database.kafka1(
  id int,
  data string
) with (
  'connector' = 'kafka'
  ,'topic' = 'ttt'
  ,'properties.zookeeper.connect' = '172.16.24.194:2181'
  ,'properties.bootstrap.servers' = '172.16.24.194:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg1'
  ,'scan.startup.mode'='earliest-offset'
);

CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample6` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);


INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;

此时我们往Kafka发送数据:

{"id":123,"data":"llalalala"}
{"id":1123,"data":"asdfasfds"}

查看表中数据可以看到写入成功

select * from hadoop_catalog.iceberg_db.sample6;

数据湖Iceberg-FlinkSQL集成(5)

再次发送数据

{"id":123,"data":"JastData"}

查看表中数据,发现修改成功

数据湖Iceberg-FlinkSQL集成(5)文章来源地址https://www.toymoban.com/news/detail-425526.html

与Flink集成的不足

支持的特性 Flink 备注
SQL create catalog
SQL create database
SQL create table
SQL create table like
SQL alter table 只支持修改表属性,不支持更改列和分区
SQL drop_table
SQL select 支持流式和批处理模式
SQL insert into 支持流式和批处理模式
SQL insert overwrite
DataStream read
DataStream append
DataStream overwrite
Metadata tables 支持Java API,不支持Flink SQL
Rewrite files action
  • 不支持创建隐藏分区的Iceberg表。
  • 不支持创建带有计算列的Iceberg表。
  • 不支持创建带watermark的Iceberg表。
  • 不支持添加列,删除列,重命名列,更改列。
  • Iceberg目前不支持Flink SQL 查询表的元数据信息,需要使用Java API 实现。

到了这里,关于数据湖Iceberg-FlinkSQL集成(5)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事项:一般都是用基于Flink的Hive Catalog,使用HMS存储表模型数据 1、集成方式 (1)下载jar包 下载地址 (2)启动FlinkSQL ①StandLone模式启动 ②Flink On Yarn模式启动 2、基本使用 2.1、创建catalog 核心:可创建hive、hadoop、自定义等目录,创建模板如下 type : 必须的 iceberg 。(必需

    2024年02月08日
    浏览(40)
  • ETL简介:数据集成与应用

    在当今大数据时代,组织和企业需要处理和分析庞大的数据量。ETL(Extract, Transform, Load)是一种重要的数据集成和处理方法,它在数据管理和决策支持中起着关键作用。本文将介绍ETL的基本概念、作用和关键组成部分,以帮助读者了解ETL的重要性和应用领域。 ETL是指数据提取

    2024年02月12日
    浏览(35)
  • Iceberg从入门到精通系列之三:创建Iceberg表、修改表结构、插入数据、删除表

    Hive语法创建分区表,不会在元数据创建分区,而是将分区数据转换为Iceberg标识分区。 这种情况下不能使用Iceberg的分区转换,例如:days(timestamp),如果想要使用Iceberg格式表的分区转换标识分区,需要使用Spark或者Flink引擎创建表。 只支持HiveCatalog表修改表属性,Iceberg表属性和

    2024年02月11日
    浏览(84)
  • Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据

    仅支持Flink的Batch模式 当将数据写入v2表格时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。 建表时指定 UPSERT模式下,如果对表进行分区,则分区字段必须是主键。 Batch模式: Streaming模式: 从当前快照读取所有记录,然后从该快照读取增量数据 读取指定快照id(不包

    2024年02月12日
    浏览(47)
  • FlinkSql 如何实现数据去重?

    很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

    2024年02月10日
    浏览(34)
  • 火山引擎 Iceberg 数据湖的应用与实践

    在云原生计算时代,云存储使得海量数据能以低成本进行存储,但是这也给如何访问、管理和使用这些云上的数据提出了挑战。而 Iceberg 作为一种云原生的表格式,可以很好地应对这些挑战。本文将介绍火山引擎在云原生计算产品上使用 Iceberg 的实践,和大家分享高效查询、

    2024年02月09日
    浏览(33)
  • 数据湖08:Apache Iceberg原理和功能介绍

     系列专题:数据湖系列文章         在使用不同的引擎进行大数据计算时,需要将数据根据计算引擎进行适配。这是一个相当棘手的问题,为此出现了一种新的解决方案: 介于上层计算引擎和底层存储格式之间的一个中间层 。这个中间层不是数据存储的方式,只是定义

    2023年04月09日
    浏览(52)
  • [实战-10]FlinkSql 如何实现数据去重?

    很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

    2024年02月03日
    浏览(38)
  • 【大数据】Apache Iceberg 概述和源代码的构建

    我们在使用不同的引擎进行大数据计算时,需要将数据根据计算引擎进行适配。这是一个相当棘手的问题,为此出现了一种新的解决方案:介于上层计算引擎和底层存储格式之间的一个中间层。这个中间层不是数据存储的方式,只是定义了数据的元数据组织方式,并向计算引

    2024年02月09日
    浏览(42)
  • Flink + Iceberg打造流批一体的数据湖架构

    一、背景 1、数据仓库架构         从Hive表 出仓 到外部系统(ClickHouse、Presto、ES等)带来的复杂性和存储开发等额外代价,尽量减少这种场景出仓的必要性。 痛点:传统 T+1 任务 海量的TB级 T+ 1 任务延迟导致下游数据产出时间不稳定。 任务遇到故障重试恢复代价昂贵 数

    2024年02月04日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包