本文通过示例介绍如何处理ClickHouse实时更新。OLAP数据库并不欢迎数据变更操作,ClickHouse也不例外,和其他OLAP产品一样,刚开始ClickHouse甚至不支持更新,更新能力是后来才加上的,但是按照ClickHouse方式增加的。当前ClickHouse更新是异步的,使得在交互应用中难以使用。有很多场景中用户需要修改已存在的数据并期望立即看到,ClickHouse如何满足这样需求呢。
ClickHouse更新历史
早在2016年,当时ClickHouse不支持数据修改,ClickHouse团队发布文章“ClickHouse如何更新数据” ,仅使用特殊的插入结构用于模拟更新,数据最终有分区删除。
在GDPR(General Data Protection Regulation)的压力下,ClickHouse团队在2018年发布了更新和删除。该文章ClickHouse中的更新和删除,仍然是Altinity博客中阅读量最大的文章之一。这些异步、非原子更新使用通过ALTER TABLE UPDATE 语句实现的,并可能打乱大量数据,对于批量操作和少量更新且不急于看到最终结果场景是有用的。标准的SQL更新仍没有,尽管它们每年都出现在产品路线中。如果我们确实需要实时更新,则必须使用其他方法实现。下面基于实际应用场景对比ClickHouse不同实现方式。
使用场景
假设系统产生各类报警信息,用户和机器学习算法不断地查询数据库获取新的报警信息并确认,确认操作需要修改报警记录,一旦确认则报警记录不会再次出现在用户视图中,这看起来像是ClickHouse所不熟悉的OLTP操作。
因为不能使用更新,需要使用插入新记录代替。一旦数据库中有两个记录,我们需要有效方式获得最新的记录。下面尝试三种方式实现。
ReplacingMergeTree
首先创建表存储报警信息:
CREATE TABLE alerts(
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data String,
acked UInt8 DEFAULT 0,
ack_time DateTime DEFAULT toDateTime(0),
ack_user LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);
为了简化,所有报警信息打包放在alert_data列中,实际可能包括几十个或几百个列信息。另外alert_id在示例中是随机字符串。
注意ReplacingMergeTree引擎,是基于order by 子句指定的字段判断重复,如果两条记录重复,则最新记录保留,最新记录有ack_time字段决定。去重操作在后端合并操作中执行,不会立刻发生,不会保证什么时间发生。所以需要关心一致性查询结果。ClickHouse有特殊语法实现,后面就要提及。
在运行查询之前,需要填充表一些数据,我们生成1000个租户的10M报警数据:
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
toUInt32(rand(1)%1000+1) AS tenant_id,
randomPrintableASCII(64) as alert_id,
toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);
接下来,让我们确认99%的警报,为’ ack_user ‘、’ ack_user ‘和’ ack_time '列提供新值。不是更新,而是插入新行。
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data, 1 as acked,
concat('user', toString(rand()%1000)) as ack_user, now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;
如果现在查询表,可以看到:
SELECT count() FROM alerts
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec.
显然已确认和未确认的行都在表中,替代还没有发生。为了看到最后数据,可以使用final关键字。
SELECT count() FROM alerts FINAL
┌──count()─┐
│ 10000000 │
└──────────┘
1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)
数量是正确的,但看查询时间,使用final 关键字ClickHouse必须扫描所有行,并在查询时按排序键进行合并。虽然获得正确结果,但开销很大。现在看仅过滤尚未确认的行是否性能更好。
SELECT count() FROM alerts FINAL WHERE NOT acked
┌─count()─┐
│ 101940 │
└─────────┘
1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.)
查询时间和处理的数据量没有明显差异,尽管计数要小得多。过滤无助于加快查询速度。随着表大小的增长,成本可能会更大。它无法扩展。
注意:为了可读性,所有的查询和查询时间都像在“clickhouse-client”中运行一样。事实上,我们多次尝试查询,以确保结果一致,并与“clickhouse-benchmark”实用程序进行确认。
查询全部表意义不大,那么在我们的示例中还能使用ReplacingMergeTree引擎吗?下面选择随机某个租户,即该租户下所有为确认的报警信息,想象该用户正在看展示屏幕。因为alert_data是随机数,这里仅计算下校验和,就是为了看下结果:
SELECT
count(),
sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)
相当快,278毫秒查询所有未确认数据。为什么这次快?过滤条件不同,tenant_id是主键的一部分,所以ClickHouse能在final之前过滤数据,这时ReplacingMergeTree有效的。
下面尝试查询某用户已确认的数据。列的基数是相同的——我们有1000个用户,可以试试user451。
SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)
这个查询很慢,因为没有使用索引,ClickHouse扫描了19.04M行记录。我们不能增加ack_user作为索引,这样会破坏ReplacingMergeTree 语义。我们尝试采用prewhere:
SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)
PREWHERE是ClickHouse以不同方式应用过滤器的特殊方式。通常ClickHouse足够聪明,可以自动将条件移动到PREWHERE,所以用户不应该在意。但这个示例需要我们显示指定。
Aggregate Functions
ClickHouse提供了大量的聚集函数,最新版本超过100多个,结合12个聚集合并器,极大地满足了用户需求。我们的示例仅需要三个函数:‘argMax’, ‘max’ and ‘any’.
下面使用argMax聚集函数查询相同的租户:
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
argMax(alert_data, ack_time) alert_data,
argMax(acked, ack_time) acked,
max(ack_time) ack_time_,
argMax(ack_user, ack_time) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)
结果相同,但性能提升了4倍。这是ClickHouse聚集能力。缺点是查询变得复杂,但我们可以简化。我们注意到,当确认报警信息时,仅需要更新三列:
- acked: 0 => 1
- ack_tiem: 0 => now()
- ack_user: ‘’ => ‘user1’
三个值都在增加,所以可以使用max代替argMax。既然不改变alert_data, 就不需要任何实际聚集函数,ClickHouse提供any函数实现该功能,它返回任何一个值,减少额外开销。
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
any(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)
查询变得简单了,速度也快了一点!原因是使用’ any ‘函数,ClickHouse不需要计算’ alert_data ‘列上的’ max ’ !
AggregatingMergeTree
AggregatingMergeTree 是ClickHouse最强特性之一。结合物化视图能够实现实时数据聚合。既然前面示例需要使用聚集函数,直接适应AggregatingMergeTree 会更好吗?实际提升不明显。因为每行仅更新一次,仅有两行需要聚合为一组。对于这种场景,AggregatingMergeTree 不是最佳选项。
但可以结合需求变点戏法。需求中报警信息首先插入非确认信息,然后变成确认信息,一旦用户确认了,三个字段需要修改。如果其他列不重复存储会节约空间、提升性能。
首先创建AggregatingMergeTree 表引擎,使用max聚合函数。代替使用max,可以any,但需要列为非空,any会选择非空值。
DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data SimpleAggregateFunction(any, String),
acked SimpleAggregateFunction(max, UInt8),
ack_time SimpleAggregateFunction(max, DateTime),
ack_user SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);
既然原始数据是随机的,我们就使用已存在的表数据进行填充。像之前一样进行两次插入,分别为非确认报警信息和确认信息。
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max
SELECT tenant_id, alert_id, timestamp,
'' as alert_data,
acked, ack_time, ack_user
FROM alerts WHERE acked;
注意,对于alert_data字段插入空字符串,因为不需要存储两次。聚合函数会获取非空值,其他列保持缺省值不变。一旦有了数据,现在检查下数据大小:
SELECT
table,
sum(rows) AS r,
sum(data_compressed_bytes) AS c,
sum(data_uncompressed_bytes) AS uc,
uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
│ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘
我们几乎没有压缩,多亏了随机字符串。但是aggregate要小两倍,因为我们不需要存储两次’ alerts_data '。现在让我们尝试对聚合表进行查询:
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
max(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts_amt_max
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)
多亏了AggregatingMergeTree,我们处理的数据更少了(40MB比之前的82MB),而且现在效率更高了。
实现更新
ClickHouse将尽其所能在后台合并数据,删除重复行并执行聚合。但有时需要强制合并数据,为了释放磁盘空间。如使用OPTIMIZE FINAL 语句,但该语句是阻塞的、且昂贵的操作。因此不能频繁执行,让我们看看它是否对查询性能有任何影响。
OPTIMIZE TABLE alerts FINAL
Ok.
0 rows in set. Elapsed: 105.675 sec.
OPTIMIZE TABLE alerts_amt_max FINAL
Ok.
0 rows in set. Elapsed: 70.121 sec.
执行后两者数量相同:
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
│ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘
不同方法之间的性能差异变得不那么显著。以下是汇总表:
** After inserts** | After OPTIMIZE FINAL | |
---|---|---|
ReplacingMergeTree FINAL | 0.278 | 0.037 |
argMax | 0.059 | 0.034 |
any/max | 0.055 | 0.029 |
AggregatingMergeTree | 0.036 | 0.026 |
总结
ClickHouse提供丰富的工具集处理实时更新,如:ReplacingMergeTree, CollapsingMergeTree (本文未提及), AggregatingMergeTree 和aggregate 函数。所有这些方法都有三个共性:
- 数据通过插入新版本进行修改,插入在ClickHouse中很快
- 有多种有效方法实现类似OLTP中的更新语义
- 实际修改不会立刻发生
具体选择哪种方法依赖具体应用场景。ReplacingMergeTree对用户来说是最直接、方便,但一般用于数据量为中小量级或数据仅通过主键查询场景。使用聚集函数更灵活,性能也不错,但需要写相对复杂的查询。AggregatingMergeTree可以节约空间,仅保留修改列。这些都是ClickHouse DB设计师的好工具,可以在需要的时候使用。文章来源:https://www.toymoban.com/news/detail-554243.html
参考文档:https://dzone.com/articles/handling-real-time-updates-in-clickhouse文章来源地址https://www.toymoban.com/news/detail-554243.html
到了这里,关于ClickHouse如何处理实时更新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!