流数据湖平台Apache Paimon(三)Flink进阶使用

这篇具有很好参考价值的文章主要介绍了流数据湖平台Apache Paimon(三)Flink进阶使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

2.9 进阶使用

2.9.1 写入性能

Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量:

增加检查点间隔,或者仅使用批处理模式。

增加写入缓冲区大小。

启用写缓冲区溢出。

如果您使用固定存储桶模式,请重新调整存储桶数量。

2.9.1.1 并行度

建议sink的并行度小于等于bucket的数量,最好相等。

选项 必需的 默认 类型 描述
sink.parallelism No (none) Integer 定义sink的并行度。默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。

2.9.1.2 Compaction

当Sorted Run数量较少时,Paimon writer 将在单独的线程中异步执行压缩,因此记录可以连续写入表中。然而,为了避免Sorted Runs的无限增长,当Sorted Run的数量达到阈值时,writer将不得不暂停写入。下表属性确定阈值。

选项 必需的 默认 类型 描述
num-sorted-run.stop-trigger No (none) Integer 触发停止写入的Sorted Runs次数,默认值为 ‘num-sorted-run.compaction-trigger’ + 1。

当 num-sorted-run.stop-trigger 变大时,写入停顿将变得不那么频繁,从而提高写入性能。但是,如果该值变得太大,则查询表时将需要更多内存和 CPU 时间。如果您担心内存 OOM,请配置sort-spill-threshold。它的值取决于你的内存大小。

2.9.1.3 优先考虑写入吞吐量

如果希望某种模式具有最大写入吞吐量,则可以缓慢而不是匆忙地进行Compaction。可以对表使用以下策略

num-sorted-run.stop-trigger = 2147483647

sort-spill-threshold = 10

此配置将在写入高峰期生成更多文件,并在写入低谷期逐渐合并到最佳读取性能。

2.9.1.4 触发Compaction的Sorted Run数

Paimon使用LSM树,支持大量更新。 LSM 在多次Sorted Runs中组织文件。从 LSM 树查询记录时,必须组合所有Sorted Runs以生成所有记录的完整视图。

过多的Sorted Run会导致查询性能不佳。为了将Sorted Run的数量保持在合理的范围内,Paimon writers 将自动执行Compaction。下表属性确定触发Compaction的最小Sorted Run数。

选项 必需的 默认 类型 描述
num-sorted-run.compaction-trigger No 5 Integer 触发Compaction的Sorted Run数。包括 0 级文件(一个文件一级排序运行)和高级运行(一个一级排序运行)。

2.9.1.5 写入初始化

在write初始化时,bucket的writer需要读取所有历史文件。如果这里出现瓶颈(例如同时写入大量分区),可以使用write-manifest-cache缓存读取的manifest数据,以加速初始化。

2.9.1.6 内存

Paimon writer中主要占用内存的地方有3个:

Writer的内存缓冲区,由单个任务的所有Writer共享和抢占。该内存值可以通过 write-buffer-size 表属性进行调整。

合并多个Sorted Run以进行Compaction时会消耗内存。可以通过 num-sorted-run.compaction-trigger 选项进行调整,以更改要合并的Sorted Run的数量。

如果行非常大,在进行Compaction时一次读取太多行数据可能会消耗大量内存。减少 read.batch-size 选项可以减轻这种情况的影响。

写入列式(ORC、Parquet等)文件所消耗的内存,不可调。

2.9.2 读取性能

2.9.2.1 Full Compaction

配置“full-compaction.delta-commits”在Flink写入中定期执行full-compaction。并且可以确保在写入结束之前分区被完全Compaction。

注意:Paimon 默认处理小文件并提供良好的读取性能。请不要在没有任何要求的情况下配置此Full Compaction选项,因为它会对性能产生重大影响。

2.9.2.2 主键表

对于主键表来说,这是一种“MergeOnRead”技术。读取数据时,会合并多层LSM数据,并行数会受到桶数的限制。虽然Paimon的merge会高效,但是还是赶不上普通的AppendOnly表。

如果你想在某些场景下查询得足够快,但只能找到较旧的数据,你可以:

配置full-compaction.delta-commits,写入数据时(目前只有Flink)会定期进行full Compaction。

配置“scan.mode”为“compacted-full”,读取数据时,选择full-compaction的快照。读取性能良好。

2.9.2.3 仅追加表

小文件会降低读取速度并影响 DFS 稳定性。默认情况下,当单个存储桶中的小文件超过“compaction.max.file-num”(默认50个)时,就会触发compaction。但是当有多个桶时,就会产生很多小文件。

您可以使用full-compaction来减少小文件。full-compaction将消除大多数小文件。

2.9.2.4 格式

Paimon 对 parquet 读取进行了一些查询优化,因此 parquet 会比 orc 稍快一些。

2.9.3 多Writer并发写入

Paimon的快照管理支持向多个writer写入。

默认情况下,Paimon支持对不同分区的并发写入。推荐的方式是streaming job将记录写入Paimon的最新分区;同时批处理作业(覆盖)将记录写入历史分区。

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

如果需要多个Writer写到同一个分区,事情就会变得有点复杂。例如,不想使用 UNION ALL,那就需要有多个流作业来写入“partial-update”表。参考如下的“Dedicated Compaction Job”。

2.9.3.1 Dedicated Compaction Job

默认情况下,Paimon writer 在写入记录时会根据需要执行Compaction。这对于大多数用例来说已经足够了,但有两个缺点:

这可能会导致写入吞吐量不稳定,因为执行压缩时吞吐量可能会暂时下降。

Compaction会将某些数据文件标记为“已删除”(并未真正删除)。如果多个writer标记同一个文件,则在提交更改时会发生冲突。 Paimon 会自动解决冲突,但这可能会导致作业重新启动。

为了避免这些缺点,用户还可以选择在writer中跳过Compaction,并仅运行专门的作业来进行Compaction。由于Compaction仅由专用作业执行,因此writer可以连续写入记录而无需暂停,并且不会发生冲突。

选项 必需的 默认 类型 描述
write-only No false Boolean 如果设置为 true,将跳过Compaction和快照过期。此选项与独立Compaction一起使用。

Flink SQL目前不支持compaction相关的语句,所以我们必须通过flink run来提交compaction作业。

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–warehouse \

–database \

–table \

[–partition ] \

[–catalog-conf [–catalog-conf …]] \

如果提交一个批处理作业(execution.runtime-mode:batch),当前所有的表文件都会被Compaction。如果您提交一个流作业(execution.runtime-mode: Streaming),该作业将持续监视表的新更改并根据需要执行Compaction。

2.9.4 表管理

2.9.4.1 管理快照

1)快照过期

Paimon Writer每次提交都会生成一个或两个快照。每个快照可能会添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而,标记的数据文件并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。

目前,Paimon Writer在提交新更改时会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。

设置以下表属性:

选项 必需的 默认 类型 描述
snapshot.time-retained No 1 h Duration 已完成快照的最长时间保留。
snapshot.num-retained.min No 10 Integer 要保留的已完成快照的最小数量。
snapshot.num-retained.max No Integer.MAX_VALUE Integer 要保留的已完成快照的最大数量。

注意,保留时间太短或保留数量太少可能会导致如下问题:

批量查询找不到该文件。例如,表比较大,批量查询需要10分钟才能读取,但是10分钟前的快照过期了,此时批量查询会读取到已删除的快照。

表文件上的流式读取作业(没有外部日志系统)无法重新启动。当作业重新启动时,它记录的快照可能已过期。 (可以使用Consumer Id来保护快照过期的小保留时间内的流式读取)。

2)回滚快照

<FLINK_HOME>/bin/flink run \

/path/to/paimon-flink-action-0.5-SNAPSHOT.jar \

rollback-to \

–warehouse \

–database \

–table \

–snapshot \

[–catalog-conf [–catalog-conf …]]

2.9.4.2 管理分区

创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。

判断分区是否过期:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time。比如:

CREATE TABLE T (…) PARTITIONED BY (dt) WITH (

‘partition.expiration-time’ = ‘7 d’,

‘partition.expiration-check-interval’ = ‘1 d’,

‘partition.timestamp-formatter’ = ‘yyyyMMdd’

);

选项 默认 类型 描述
partition.expiration-check-interval 1 h Duration 分区过期的检查间隔。
partition.expiration-time (none) Duration 分区的过期时间间隔。如果分区的生命周期超过此值,则该分区将过期。分区时间是从分区值中提取的。
partition.timestamp-formatter (none) String 用于格式化字符串时间戳的格式化程序。它可以与“partition.timestamp-pattern”一起使用来创建使用指定值的格式化程序。> 默认格式化程序为“yyyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。> 支持多个分区字段,例如“ y e a r − year- yearmonth-$day $hour:00:00”。> 时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。
partition.timestamp-pattern (none) String 可以指定一种模式来从分区获取时间戳。格式化程序模式由“partition.timestamp-formatter”定义。> 默认情况下,从第一个字段读取。> 如果分区中的时间戳是名为“dt”的单个字段,则可以使用“ d t ”。 > 如果它分布在年、月、日和小时的多个字段中,则可以使用“ dt”。> 如果它分布在年、月、日和小时的多个字段中,则可以使用“ dt>如果它分布在年、月、日和小时的多个字段中,则可以使用year- m o n t h − month- monthday h o u r : 00 : 00 ”。 > 如果时间戳位于 d t 和 h o u r 字段中,则可以使用“ hour:00:00”。> 如果时间戳位于 dt 和 hour 字段中,则可以使用“ hour:00:00”>如果时间戳位于dthour字段中,则可以使用dt $hour:00:00”。

2.9.4.3 管理小文件

小文件可能会导致:

稳定性问题:HDFS中小文件过多,NameNode会承受过大的压力。

成本问题:HDFS中的小文件会暂时使用最小1个Block的大小,例如128MB。

查询效率:小文件过多查询效率会受到影响。

1)Flink Checkpoint的影响

使用Flink Writer,每个checkpoint会生成 1-2 个快照,并且checkpoint会强制在 DFS 上生成文件,因此checkpoint间隔越小,会生成越多的小文件。

默认情况下,不仅checkpoint会导致文件生成,writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用 write-buffer-spillable 在 writer 中生成溢出文件,从而在 DFS 中生成更大的文件。

所以,可以设置如下:

增大checkpoint间隔

增加 write-buffer-size 或启用 write-buffer-spillable

2)快照的影响

Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在 Snapshot 过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer 会自动使快照过期。

分区和分桶的影响

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

表数据会被物理分片到不同的分区,里面有不同的桶,所以如果整体数据量太小,单个桶中至少有一个文件,建议你配置较少的桶数,否则会出现也有很多小文件。

3)主键表LSM的影响

LSM 树将文件组织成Sorted Runs的运行。Sorted Runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Runs。

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

默认情况下,Sorted Runs数取决于 num-sorted-run.compaction-trigger,这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。

4)仅追加表的文件的影响

默认情况下,Append-Only 还会进行自动Compaction以减少小文件的数量

对于分桶的 Append-only 表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。

5)Full-Compaction的影响

主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。

建议配置Full-Compaction,在Flink写入时配置‘full-compaction.delta-commits’定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。

2.9.5 缩放Bucket

1)说明

由于总桶数对性能影响很大,Paimon 允许用户通过 ALTER TABLE 命令调整桶数,并通过 INSERT OVERWRITE 重新组织数据布局,而无需重新创建表/分区。当执行覆盖作业时,框架会自动扫描旧桶号的数据,并根据当前桶号对记录进行哈希处理。

– rescale number of total buckets

ALTER TABLE table_identifier SET (‘bucket’ = ‘…’)

– reorganize data layout of table/partition

INSERT OVERWRITE table_identifier [PARTITION (part_spec)]

SELECT …

FROM table_identifier

[WHERE part_spec]

注意:

ALTER TABLE 仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过INSERT OVERWRITE来实现。

重新缩放桶数不会影响读取和正在运行的写入作业。

一旦存储桶编号更改,任何新安排的 INSERT INTO 作业写入未重新组织的现有表/分区将抛出 TableException ,并显示如下类似异常:

Try to write table/partition … with a new bucket num …,

but the previous bucket num is … Please switch to batch mode,

and perform INSERT OVERWRITE to rescale current data layout first.

对于分区表,不同的分区可以有不同的桶号。例如:

ALTER TABLE my_table SET (‘bucket’ = ‘4’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-01’)

SELECT * FROM …;

ALTER TABLE my_table SET (‘bucket’ = ‘8’);

INSERT OVERWRITE my_table PARTITION (dt = ‘2022-01-02’)

SELECT * FROM …;

在覆盖期间,确保没有其他作业写入同一表/分区。

注意:对于启用日志系统的表(例如Kafka),请重新调整主题的分区以保持一致性。

重新缩放存储桶有助于处理吞吐量的突然峰值。假设有一个每日流式ETL任务来同步交易数据。该表的DDL和管道如下所示。

2)官方示例:

​ 如下是正在跑的一个作业:

– 建表

CREATE TABLE verified_orders (

trade_order_id BIGINT,

item_id BIGINT,

item_price DOUBLE,

dt STRING,

PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED

) PARTITIONED BY (dt)

WITH (

‘bucket’ = ‘16’

);

– kafka表

CREATE temporary TABLE raw_orders(

trade_order_id BIGINT,

item_id BIGINT,

item_price BIGINT,

gmt_create STRING,

order_status STRING

) WITH (

‘connector’ = ‘kafka’,

‘topic’ = ‘…’,

‘properties.bootstrap.servers’ = ‘…’,

‘format’ = ‘csv’

);

– 流式插入16个分桶

INSERT INTO verified_orders

SELECT trade_order_id,

​ item_id,

​ item_price,

​ DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

过去几周运行良好。然而,最近数据量增长很快,作业的延迟不断增加。为了提高数据新鲜度,用户可以执行如下操作缩放分桶:

(1)使用保存点暂停流作业

$ ./bin/flink stop \

–savepointPath /tmp/flink-savepoints \

$JOB_ID

(2)增加桶数

ALTER TABLE verified_orders SET (‘bucket’ = ‘32’);

(3)切换到批处理模式并覆盖流作业正在写入的当前分区

SET ‘execution.runtime-mode’ = ‘batch’;

– 假设今天是2022-06-22

– 情况1:没有更新历史分区的延迟事件,因此覆盖今天的分区就足够了

INSERT OVERWRITE verified_orders PARTITION (dt = ‘2022-06-22’)

SELECT trade_order_id,

​ item_id,

​ item_price

FROM verified_orders

WHERE dt = ‘2022-06-22’;

- 情况2:有更新历史分区的延迟事件,但范围不超过3天

INSERT OVERWRITE verified_orders

SELECT trade_order_id,

​ item_id,

​ item_price,

​ dt

FROM verified_orders

WHERE dt IN (‘2022-06-20’, ‘2022-06-21’, ‘2022-06-22’);

(4)覆盖作业完成后,切换回流模式,从保存点恢复(可以增加并行度=新bucket数量)。

SET ‘execution.runtime-mode’ = ‘streaming’;

SET ‘execution.savepoint.path’ = ;

INSERT INTO verified_orders

SELECT trade_order_id,

item_id,

item_price,

DATE_FORMAT(gmt_create, ‘yyyy-MM-dd’) AS dt

FROM raw_orders

WHERE order_status = ‘verified’;

2.10 文件操作理解

2.10.1 插入数据

当我们执行INSERT INTO

CREATE CATALOG paimon WITH (

‘type’ = ‘paimon’,

‘warehouse’ = ‘file:///tmp/paimon’

);

USE CATALOG paimon;

CREATE TABLE T (

id BIGINT,

a INT,

b STRING,

dt STRING COMMENT ‘timestamp string in format yyyyMMdd’,

PRIMARY KEY(id, dt) NOT ENFORCED

) PARTITIONED BY (dt);

INSERT INTO T VALUES (1, 10001, ‘varchar00001’, ‘20230501’);

一旦Flink作业完成,记录就会通过成功提交写入Paimon表中。用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回单行。提交过程创建位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。 snapshot-1 处生成的文件布局如下所述:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

snapshot-1 的内容包含快照的元数据,例如清单列表(manifest list)和schema ID:

{

“version” : 3,

“id” : 1,

“schemaId” : 0,

“baseManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0”,

“deltaManifestList” : “manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1”,

“changelogManifestList” : null,

“commitUser” : “7d758485-981d-4b1a-a0c6-d34c3eb254bf”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “APPEND”,

“timeMillis” : 1684155393354,

“logOffsets” : { },

“totalRecordCount” : 1,

“deltaRecordCount” : 1,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

清单列表包含快照的所有更改,baseManifestList 是应用 deltaManifestList 中的更改的基础文件。第一次提交将生成 1 个清单文件(manifest file),并创建 2 个清单列表(manifest list):

./T/manifest:

–deltaManifestList:包含对数据文件执行操作的清单条目列表(上图中的 manifest-list-1-delta)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1

–baseManifestList:空的(上图中的 manifest-list-1-base)

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0

–清单文件:存储快照中数据文件的信息(上图中的manifest-1-0)

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0

跨不同分区插入一批记录:

INSERT INTO T VALUES

(2, 10002, ‘varchar00002’, ‘20230502’),

(3, 10003, ‘varchar00003’, ‘20230503’),

(4, 10004, ‘varchar00004’, ‘20230504’),

(5, 10005, ‘varchar00005’, ‘20230505’),

(6, 10006, ‘varchar00006’, ‘20230506’),

(7, 10007, ‘varchar00007’, ‘20230507’),

(8, 10008, ‘varchar00008’, ‘20230508’),

(9, 10009, ‘varchar00009’, ‘20230509’),

(10, 10010, ‘varchar00010’, ‘20230510’);

第二次提交发生,执行 SELECT * FROM T 将返回 10 行。创建一个新快照,即 snapshot-2,并为我们提供以下物理文件布局:

% ls -atR .

./T:

dt=20230501

dt=20230502

dt=20230503

dt=20230504

dt=20230505

dt=20230506

dt=20230507

dt=20230508

dt=20230509

dt=20230510

snapshot

schema

manifest

./T/snapshot:

LATEST

snapshot-2

EARLIEST

snapshot-1

./T/manifest:

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-1 # delta manifest list for snapshot-2

manifest-list-9ac2-5e79-4978-a3bc-86c25f1a303f-0 # base manifest list for snapshot-2

manifest-f1267033-e246-4470-a54c-5c27fdbdd074-0 # manifest file for snapshot-2

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-1 # delta manifest list for snapshot-1

manifest-list-4ccc-c07f-4090-958c-cfe3ce3889e5-0 # base manifest list for snapshot-1

manifest-2b833ea4-d7dc-4de0-ae0d-ad76eced75cc-0 # manifest file for snapshot-1

./T/dt=20230501/bucket-0:

data-b75b7381-7c8b-430f-b7e5-a204cb65843c-0.orc

# each partition has the data written to bucket-0

./T/schema:

schema-0

截至 snapshot-2 的新文件布局如下所示:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

2.10.2 删除数据

执行如下删除:

DELETE FROM T WHERE dt >= ‘20230503’;

第三次提交发生,它为我们提供了 snapshot-3。现在,列出表下的文件,您会发现没有分区被删除。相反,会为分区 20230503 到 20230510 创建一个新的数据文件:

./T/dt=20230510/bucket-0:

data-b93f468c-b56f-4a93-adc4-b250b3aa3462-0.orc # newer data file created by the delete statement

data-0fcacc70-a0cb-4976-8c88-73e92769a762-0.orc # older data file created by the insert statement

因为我们在第二次提交中插入一条记录(由 +I[10, 10010, ‘varchar00010’, ‘20230510’] 表示),然后在第三次提交中删除该记录。执行 SELECT * FROM T 将返回 2 行,即:

+I[1, 10001, ‘varchar00001’, ‘20230501’]

+I[2, 10002, ‘varchar00002’, ‘20230502’]

截至 snapshot-3 的新文件布局如下所示

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

manifest-3-0包含8个ADD操作类型的manifest条目,对应8个新写入的数据文件。

2.10.3 Compaction

小文件的数量会随着连续快照的增加而增加,这可能会导致读取性能下降。因此,需要进行full compaction以减少小文件的数量。

现在触发full-compaction:

./bin/flink run \

./lib/paimon-flink-action-0.5-SNAPSHOT.jar \

compact \

–path file:///tmp/paimon/default.db/T

所有当前表文件将被压缩,并创建一个新快照,即 snapshot-4,并包含以下信息:

{

“version” : 3,

“id” : 4,

“schemaId” : 0,

“baseManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-0”,

“deltaManifestList” : “manifest-list-9be16-82e7-4941-8b0a-7ce1c1d0fa6d-1”,

“changelogManifestList” : null,

“commitUser” : “a3d951d5-aa0e-4071-a5d4-4c72a4233d48”,

“commitIdentifier” : 9223372036854775807,

“commitKind” : “COMPACT”,

“timeMillis” : 1684163217960,

“logOffsets” : { },

“totalRecordCount” : 38,

“deltaRecordCount” : 20,

“changelogRecordCount” : 0,

“watermark” : -9223372036854775808

}

截至 snapshot-4 的新文件布局如下所示

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

对于分区20230503到20230510,对两个数据文件进行两次DELETE操作

对于分区20230501到20230502,对同一个数据文件进行1次DELETE操作和1次ADD操作。

2.10.4 修改表

执行以下语句来配置full-compaction:

ALTER TABLE T SET (‘full-compaction.delta-commits’ = ‘1’);

它将为 Paimon 表创建一个新schema,即 schema-1,但在下一次提交之前还没有快照实际使用该schema。

2.10.5 过期快照

在快照过期的过程中,首先确定快照的范围,然后将这些快照内的数据文件标记为删除。仅当存在引用特定数据文件的类型为 DELETE 的清单条目时,数据文件才会被标记为删除。此标记可确保该文件不会被后续快照使用并可以安全删除。

假设上图中的所有 4 个快照都即将过期。过期流程如下:

它首先删除所有标记的数据文件,并记录任何更改的存储桶。

然后它会删除所有更改日志文件和关联的清单。

最后,它删除快照本身并写入最早的提示文件。

如果删除过程后有任何目录留空,它们也将被删除。

假设创建了另一个快照 snapshot-5 并触发了快照过期。 snapshot-1 到 snapshot-4 被删除。为简单起见,我们将只关注以前快照中的文件,快照过期后的最终布局如下所示:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

结果,分区20230503至20230510被物理删除。

2.10.6 Flink 流式写入

用 CDC 摄取的示例来说明 Flink Stream Write。本节将讨论更改数据的捕获和写入 Paimon,以及异步Compaction和快照提交和过期背后的机制。

CDC 摄取工作流程以及所涉及的每个组件所扮演的独特角色:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

(1)MySQL CDC Source统一读取快照和增量数据,分别由SnapshotReader读取快照数据和BinlogReader读取增量数据。

(2)Paimon Sink将数据写入桶级别的Paimon表中。其中的CompactManager将异步触发Compaction。

(3)Committer Operator 是一个单例,负责提交和过期快照。

端到端数据流:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

MySQL Cdc Source读取快照和增量数据,并在规范化后将它们发送到下游:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

Paimon Sink 首先将新记录缓冲在基于堆的 LSM 树中,并在内存缓冲区满时将它们刷新到磁盘。请注意,写入的每个数据文件都是Sorted Run。此时,还没有创建清单文件和快照。在 Flink 检查点发生之前,Paimon Sink 将刷新所有缓冲记录并向下游发送可提交消息,该消息在检查点期间由 Committer Operator 读取并提交:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

在检查点期间,Committer Operator 将创建一个新快照并将其与清单列表关联起来,以便该快照包含有关表中所有数据文件的信息:

流数据湖平台Apache Paimon(三)Flink进阶使用,# Paimon,apache,原力计划

稍后可能会发生异步Compaction,CompactManager 生成的提交表包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构造相应的清单条目。在这种情况下,Committer Operator 可能会在 Flink 检查点期间生成两个快照:

一个用于写入数据(Append 类型的快照),

另一个用于compact(Compact 类型的快照)。

如果在检查点间隔期间没有写入数据文件,则只会创建 Compact 类型的快照。 Committer Operator 将检查快照是否过期并执行标记数据文件的物理删除。文章来源地址https://www.toymoban.com/news/detail-708905.html

到了这里,关于流数据湖平台Apache Paimon(三)Flink进阶使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 新一代数据湖存储技术Apache Paimon入门Demo

    目录 前言 1. 什么是 Apache Paimon 一、本地环境快速上手 1、本地Flink伪集群 2、IDEA中跑Paimon Demo 2.1 代码 2.2 IDEA中成功运行 3、IDEA中Stream读写 3.1 流写 3.2 流读(toChangeLogStream) 二、进阶:本地(IDEA)多流拼接测试 要解决的问题: note: 1、\\\'changelog-producer\\\' = \\\'full-compaction\\\' (1)m

    2024年02月08日
    浏览(51)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • Apache Paimon 文件管理

    管理小文件 许多用户关注小文件问题,可能导致以下情况: 稳定性问题:HDFS 中如果存在太多小文件的话会导致 NameNode 压力过大 成本问题:在 HDFS 中,每个小文件都会占用至少一个数据块的大小,例如 128 MB 查询效率:查询过多小文件会影响查询效率 理解 Checkpoint 假设你正

    2024年02月21日
    浏览(42)
  • 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

    Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。 社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何

    2024年02月07日
    浏览(58)
  • 使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris

    现有数据库:mysql 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤 问题:随业务增长

    2023年04月08日
    浏览(56)
  • 如何使用Docker部署Apache+Superset数据平台并远程访问?

    Superset是一款由中国知名科技公司开源的“现代化的企业级BI(商业智能)Web应用程序”,其通过创建和分享dashboard,为数据分析提供了轻量级的数据查询和可视化方案。Superset在数据处理和可视化方面具有强大的功能,能够满足企业级的数据分析需求,并为用户提供直观、灵

    2024年02月04日
    浏览(39)
  • 数据架构的实时分析:Apache Flink 和 Apache Storm 的比较

    实时数据处理在大数据领域具有重要意义,它可以帮助企业更快地获取和分析数据,从而更快地做出决策。随着数据量的增加,传统的批处理方法已经不能满足企业的需求,因此需要使用实时数据处理技术。 Apache Flink 和 Apache Storm 是两个流行的实时数据处理框架,它们都可以

    2024年01月23日
    浏览(57)
  • 使用 Apache Flink 开发实时 ETL

    Apache Flink 是大数据领域又一新兴框架。它与 Spark 的不同之处在于,它是使用流式处理来模拟批量处理的,因此能够提供亚秒级的、符合 Exactly-once 语义的实时处理能力。Flink 的使用场景之一是构建实时的数据通道,在不同的存储之间搬运和转换数据。本文将介绍如何使用 F

    2024年02月05日
    浏览(41)
  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(39)
  • Apache Ranger入门与进阶使用

    ranger是hadoop生态中的权限管理和用户审计插件,ranger丰富的插件数量让它的使用非常广泛,但是苦于官方文档非常少,学习起来就非常麻烦。本篇博客是取各文档之精华所做的,相信你完成的看完后对于ranger会有更多的理解 ranger没有二进制包提供,需要自己手动编译下,请先

    2024年02月14日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包