ClickHouse如何处理实时更新

这篇具有很好参考价值的文章主要介绍了ClickHouse如何处理实时更新。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文通过示例介绍如何处理ClickHouse实时更新。OLAP数据库并不欢迎数据变更操作,ClickHouse也不例外,和其他OLAP产品一样,刚开始ClickHouse甚至不支持更新,更新能力是后来才加上的,但是按照ClickHouse方式增加的。当前ClickHouse更新是异步的,使得在交互应用中难以使用。有很多场景中用户需要修改已存在的数据并期望立即看到,ClickHouse如何满足这样需求呢。

ClickHouse更新历史

早在2016年,当时ClickHouse不支持数据修改,ClickHouse团队发布文章“ClickHouse如何更新数据” ,仅使用特殊的插入结构用于模拟更新,数据最终有分区删除。

在GDPR(General Data Protection Regulation)的压力下,ClickHouse团队在2018年发布了更新和删除。该文章ClickHouse中的更新和删除,仍然是Altinity博客中阅读量最大的文章之一。这些异步、非原子更新使用通过ALTER TABLE UPDATE 语句实现的,并可能打乱大量数据,对于批量操作和少量更新且不急于看到最终结果场景是有用的。标准的SQL更新仍没有,尽管它们每年都出现在产品路线中。如果我们确实需要实时更新,则必须使用其他方法实现。下面基于实际应用场景对比ClickHouse不同实现方式。

使用场景

假设系统产生各类报警信息,用户和机器学习算法不断地查询数据库获取新的报警信息并确认,确认操作需要修改报警记录,一旦确认则报警记录不会再次出现在用户视图中,这看起来像是ClickHouse所不熟悉的OLTP操作。

因为不能使用更新,需要使用插入新记录代替。一旦数据库中有两个记录,我们需要有效方式获得最新的记录。下面尝试三种方式实现。

ReplacingMergeTree

首先创建表存储报警信息:

CREATE TABLE alerts(
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    String,
  acked         UInt8 DEFAULT 0,
  ack_time      DateTime DEFAULT toDateTime(0),
  ack_user      LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);

为了简化,所有报警信息打包放在alert_data列中,实际可能包括几十个或几百个列信息。另外alert_id在示例中是随机字符串。

注意ReplacingMergeTree引擎,是基于order by 子句指定的字段判断重复,如果两条记录重复,则最新记录保留,最新记录有ack_time字段决定。去重操作在后端合并操作中执行,不会立刻发生,不会保证什么时间发生。所以需要关心一致性查询结果。ClickHouse有特殊语法实现,后面就要提及。

在运行查询之前,需要填充表一些数据,我们生成1000个租户的10M报警数据:

INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
  toUInt32(rand(1)%1000+1) AS tenant_id,
  randomPrintableASCII(64) as alert_id,
  toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
  randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);

接下来,让我们确认99%的警报,为’ ack_user ‘、’ ack_user ‘和’ ack_time '列提供新值。不是更新,而是插入新行。

INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data,  1 as acked, 
       concat('user', toString(rand()%1000)) as ack_user,  now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;

如果现在查询表,可以看到:

SELECT count() FROM alerts
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec. 

显然已确认和未确认的行都在表中,替代还没有发生。为了看到最后数据,可以使用final关键字。

SELECT count() FROM alerts FINAL
┌──count()─┐
│ 10000000 │
└──────────┘
1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)

数量是正确的,但看查询时间,使用final 关键字ClickHouse必须扫描所有行,并在查询时按排序键进行合并。虽然获得正确结果,但开销很大。现在看仅过滤尚未确认的行是否性能更好。

SELECT count() FROM alerts FINAL WHERE NOT acked
┌─count()─┐
│  101940 │
└─────────┘
1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.) 

查询时间和处理的数据量没有明显差异,尽管计数要小得多。过滤无助于加快查询速度。随着表大小的增长,成本可能会更大。它无法扩展。

注意:为了可读性,所有的查询和查询时间都像在“clickhouse-client”中运行一样。事实上,我们多次尝试查询,以确保结果一致,并与“clickhouse-benchmark”实用程序进行确认。

查询全部表意义不大,那么在我们的示例中还能使用ReplacingMergeTree引擎吗?下面选择随机某个租户,即该租户下所有为确认的报警信息,想象该用户正在看展示屏幕。因为alert_data是随机数,这里仅计算下校验和,就是为了看下结果:

SELECT 
  count(), 
  sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)
┌─count()─┬─────────────────data─┐
│      9018441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)

相当快,278毫秒查询所有未确认数据。为什么这次快?过滤条件不同,tenant_id是主键的一部分,所以ClickHouse能在final之前过滤数据,这时ReplacingMergeTree有效的。

下面尝试查询某用户已确认的数据。列的基数是相同的——我们有1000个用户,可以试试user451。

SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked
┌─count()─┐
│    9725 │
└─────────┘

1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)

这个查询很慢,因为没有使用索引,ClickHouse扫描了19.04M行记录。我们不能增加ack_user作为索引,这样会破坏ReplacingMergeTree 语义。我们尝试采用prewhere:

SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked

┌─count()─┐
│    9725 │
└─────────┘

1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)

PREWHERE是ClickHouse以不同方式应用过滤器的特殊方式。通常ClickHouse足够聪明,可以自动将条件移动到PREWHERE,所以用户不应该在意。但这个示例需要我们显示指定。

Aggregate Functions

ClickHouse提供了大量的聚集函数,最新版本超过100多个,结合12个聚集合并器,极大地满足了用户需求。我们的示例仅需要三个函数:‘argMax’, ‘max’ and ‘any’.

下面使用argMax聚集函数查询相同的租户:

SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
         argMax(alert_data, ack_time) alert_data, 
         argMax(acked, ack_time) acked,
         max(ack_time) ack_time_,
         argMax(ack_user, ack_time) ack_user
  FROM alerts 
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      9018441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)

结果相同,但性能提升了4倍。这是ClickHouse聚集能力。缺点是查询变得复杂,但我们可以简化。我们注意到,当确认报警信息时,仅需要更新三列:

  • acked: 0 => 1
  • ack_tiem: 0 => now()
  • ack_user: ‘’ => ‘user1’

三个值都在增加,所以可以使用max代替argMax。既然不改变alert_data, 就不需要任何实际聚集函数,ClickHouse提供any函数实现该功能,它返回任何一个值,减少额外开销。

SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
    any(alert_data) alert_data, 
    max(acked) acked, 
    max(ack_time) ack_time,
    max(ack_user) ack_user
  FROM alerts
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      9018441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)

查询变得简单了,速度也快了一点!原因是使用’ any ‘函数,ClickHouse不需要计算’ alert_data ‘列上的’ max ’ !

AggregatingMergeTree

AggregatingMergeTree 是ClickHouse最强特性之一。结合物化视图能够实现实时数据聚合。既然前面示例需要使用聚集函数,直接适应AggregatingMergeTree 会更好吗?实际提升不明显。因为每行仅更新一次,仅有两行需要聚合为一组。对于这种场景,AggregatingMergeTree 不是最佳选项。

但可以结合需求变点戏法。需求中报警信息首先插入非确认信息,然后变成确认信息,一旦用户确认了,三个字段需要修改。如果其他列不重复存储会节约空间、提升性能。

首先创建AggregatingMergeTree 表引擎,使用max聚合函数。代替使用max,可以any,但需要列为非空,any会选择非空值。

DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    SimpleAggregateFunction(any, String),
  acked         SimpleAggregateFunction(max, UInt8),
  ack_time      SimpleAggregateFunction(max, DateTime),
  ack_user      SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);

既然原始数据是随机的,我们就使用已存在的表数据进行填充。像之前一样进行两次插入,分别为非确认报警信息和确认信息。

INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max 
SELECT tenant_id, alert_id, timestamp,
  '' as alert_data, 
  acked, ack_time, ack_user 
FROM alerts WHERE acked;

注意,对于alert_data字段插入空字符串,因为不需要存储两次。聚合函数会获取非空值,其他列保持缺省值不变。一旦有了数据,现在检查下数据大小:

SELECT 
    table, 
    sum(rows) AS r, 
    sum(data_compressed_bytes) AS c, 
    sum(data_uncompressed_bytes) AS uc, 
    uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table

┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts         │ 1903943920926009562210493077101.0058921003373666 │
│ alerts_amt_max │ 1903943910723636061109020481781.0166372782501314 │
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘

我们几乎没有压缩,多亏了随机字符串。但是aggregate要小两倍,因为我们不需要存储两次’ alerts_data '。现在让我们尝试对聚合表进行查询:

SELECT count(), sum(cityHash64(*)) data FROM (
   SELECT tenant_id, alert_id, timestamp, 
          max(alert_data) alert_data, 
          max(acked) acked, 
          max(ack_time) ack_time,
          max(ack_user) ack_user
     FROM alerts_amt_max
   GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;

┌─count()─┬─────────────────data─┐
│      9018441617166277032220 │
└─────────┴──────────────────────┘

1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)

多亏了AggregatingMergeTree,我们处理的数据更少了(40MB比之前的82MB),而且现在效率更高了。

实现更新

ClickHouse将尽其所能在后台合并数据,删除重复行并执行聚合。但有时需要强制合并数据,为了释放磁盘空间。如使用OPTIMIZE FINAL 语句,但该语句是阻塞的、且昂贵的操作。因此不能频繁执行,让我们看看它是否对查询性能有任何影响。

OPTIMIZE TABLE alerts FINAL

Ok.
0 rows in set. Elapsed: 105.675 sec.

OPTIMIZE TABLE alerts_amt_max FINAL

Ok.
0 rows in set. Elapsed: 70.121 sec.

执行后两者数量相同:

┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts         │ 1000000010616223201108594903001.02291465565429 │
│ alerts_amt_max │ 1000000010616223201108594903001.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘

不同方法之间的性能差异变得不那么显著。以下是汇总表:

** After inserts** After OPTIMIZE FINAL
ReplacingMergeTree FINAL 0.278 0.037
argMax 0.059 0.034
any/max 0.055 0.029
AggregatingMergeTree 0.036 0.026

总结

ClickHouse提供丰富的工具集处理实时更新,如:ReplacingMergeTree, CollapsingMergeTree (本文未提及), AggregatingMergeTree 和aggregate 函数。所有这些方法都有三个共性:

  • 数据通过插入新版本进行修改,插入在ClickHouse中很快
  • 有多种有效方法实现类似OLTP中的更新语义
  • 实际修改不会立刻发生

具体选择哪种方法依赖具体应用场景。ReplacingMergeTree对用户来说是最直接、方便,但一般用于数据量为中小量级或数据仅通过主键查询场景。使用聚集函数更灵活,性能也不错,但需要写相对复杂的查询。AggregatingMergeTree可以节约空间,仅保留修改列。这些都是ClickHouse DB设计师的好工具,可以在需要的时候使用。

参考文档:https://dzone.com/articles/handling-real-time-updates-in-clickhouse文章来源地址https://www.toymoban.com/news/detail-554243.html

到了这里,关于ClickHouse如何处理实时更新的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CTF常用工具_实时更新

    近期在做一些ctf题,其中会涉及到许多工具,起初我会使用百度网盘在每一篇博客放置对应的工具,但因网盘上传有上限,所以现在我将练习中所用到所有的工具放置在这篇文章中。 需要下载的小伙伴可随时拿取,分享有效期为永久分享。 常用工具及常用网站为实时分享!

    2024年02月04日
    浏览(36)
  • 【clickhouse】ClickHouse与MySQL之间实时同步数据(MySQL引擎),将MySQL数据实时同步到clickhouse

    参考1:MySQL(通过该配置实现了实时同步) 参考2:experimental MaterializedMySQL 参考3:[experimental] MaterializedMySQL(包含设置 allow_experimental_database_materialized_mysql) MySQL引擎用于将远程的MySQL服务器中的表映射到ClickHouse中,并允许您对表进行INSERT和SELECT查询,以方便您在ClickHouse与MySQL之间进行

    2024年01月16日
    浏览(35)
  • Java面试基础|数据结构 -实时更新

    1.HashMap和ConcurrentHashMap介绍 核心是一个Node数组, 数据结构与hashMap相似 使用CAS操作来实现无锁的更新,提高了并发性。当更新节点时,它会使用CAS来替换节点的值或链接,如果CAS失败,表明有其他线程也在进行修改,当前线程可以重试或锁定节点 对于复杂的结构修改操作

    2024年01月17日
    浏览(37)
  • 实时更新天气微信小程序开发

    1.新建一个天气 weather项目 2.在app.json中创建一个路由页面  当我们点击保存的时候,微信小程序会自动的帮我们创建好页面 3.在weather页面上书写我们的骨架  4.此时我们的页面很怪,因为没有给它添加样式和值。此时我们给它一个样式。(样式写在wxss中) 5.给它值,使用插值

    2024年02月01日
    浏览(80)
  • vue父子组件传值不能实时更新

    最近做项目,遇到个大坑,这会爬出来了,写个总结,避免下次掉坑。 vue父子组件传值不能实时更新问题,父组件将值传给了子组件,但子组件显示的值还是原来的初始值,并没有实时更新,为什么会出现这种问题呢? 出现这个问题,可能有以下两个原因: 一、 父组件没有

    2024年02月16日
    浏览(41)
  • Rust采集天气预报信息并实时更新数据

    最近天气温度时高时低,虽说这是大自然的力量人无法抗拒,不能改变那么我们就做预防工作。今天我将用Rust写一个爬虫程序实现电脑桌面实时更新天气情况,这个是一个底层逻辑,需要多方面配合,不仅要有完善的代码还有爬虫IP试试更新才能保证数据最完整最新。 这是一

    2024年01月19日
    浏览(36)
  • cocos tilemap的setTileGIDAt方法不实时更新

    需要取消勾选 Enable Culling。同时代码添加:markForUpdateRenderData函数。 floor.setTileGIDAt(1024+27,newP.x,newP.y,0);    //中心 floor.markForUpdateRenderData(); 具体问题参考官网说明: Cocos Creator 3.2 手册 - 项目设置

    2024年02月07日
    浏览(34)
  • vue中组件动态传值,实现数据实时更新

    在一些项目需求中需要父组件向子组件动态传值,比如父组件动态通过axios获取数据,然后传给子组件,子组件根据拿到的数据进行展示。 props传值的时候,只会首次传递绑定的值,不会变化 方式1 利用watch监听props变化 方式2 利用ref获取子组件,调用子组件的方法将值传递过

    2024年02月16日
    浏览(36)
  • 小程序弹幕自动滚动实时更新数据功能

    需求 最近遇到的需求,写一个弹幕功能 大致就是实现这样的效果 弹幕轮播,上下两排,一共30个弹幕,30个轮播完毕之后获取新的弹幕数据 实现方法  目前我想到的实现方法是用css的动画来实现这个功能 布局层级 给barrageBox盒子一个相对定位 给barrageList绝对定位 先将list盒子

    2024年01月20日
    浏览(32)
  • 数据大屏--->前端实时更新数据的几种方式

    优点:最大的优点就是实现简单 缺点:(1)无用的请求多,客户端不知道服务端什么时候数据更新,只能不停的向服务端发送请求, (2)数据实时性差:客户端还是需要一段时间(3s)才能拿到最新的数据 优点:解决了短轮询每隔几秒向服务端频繁发送请求的问题; 缺点:(1)服务端资源大量消

    2024年04月17日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包