火山引擎在行为分析场景下的ClickHouse JOIN优化

这篇具有很好参考价值的文章主要介绍了火山引擎在行为分析场景下的ClickHouse JOIN优化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

背景

火山引擎增长分析DataFinder基于ClickHouse来进行行为日志的分析,ClickHouse的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

事件表:存储用户行为数据,以用户ID分shard存储。

--列出了主要的字段信息
CREATE TABLE tob_apps_all
(
    `tea_app_id`                UInt32,  --应用ID
    `device_id`             String DEFAULT '', --设备ID
    `time`                  UInt64,--事件日志接受时间
    `event`                 String,--事件名称
    `user_unique_id`        String,--用户ID
    `event_date`            Date,--事件日志日期,由time转换而来
    `hash_uid`              UInt64 --用户ID hash过后的id,用来join降低内存消耗
)│

用户表:存储用户的属性数据,以用户ID分shard存储。

--列出了主要的字段信息
CREATE TABLE users_unique_all
(
    `tea_app_id`            UInt32,            --应用ID
    `user_unique_id`        String DEFAULT '', -- 用户ID
    `device_id`             String DEFAULT '', -- 用户最近的设备ID
    `hash_uid`              UInt64,--用户ID hash过后的id,用来join降低内存消耗
    `update_time`           UInt64,--最近一次更新时间
    `last_active_date`      Date   --用户最后活跃日期
)

设备表:存储设备相关的数据,以设备ID分shard存储。

--列出了主要的字段信息
CREATE TABLE devices_all
(
    `tea_app_id`            UInt32,            --应用ID
    `device_id`             String DEFAULT '', --设备ID    
    `update_time`           UInt64,            --最近一次更新时间
    `last_active_date`      Date               --用户最后活跃日期
)

业务对象表:存储业务对象相关的数据,每个shard存储全量的数据

--列出了主要的字段信息
CREATE TABLE rangers.items_all
(
    `tea_app_id`            UInt32,
    `hash_item_id`          Int64,
    `item_name`             String, --业务对象名称。比如商品
    `item_id`               String, --业务对象ID。比如商品id 1000001
    `last_active_date`      Date
) 

业务挑战

火山引擎在行为分析场景下的ClickHouse JOIN优化

随着接入应用以及应用的DAU日益增加,ClickHouse表的事件量增长迅速;并且基于行为数据需要分析的业务指标越来越复杂,需要JOIN的表增多;我们遇到有一些涉及到JOIN的复杂SQL执行效率低,内存和CPU资源占用高,导致分析接口响应时延和错误率增加。

关于Clickhouse的JOIN

在介绍优化之前,先介绍一下基本的ClickHouse JOIN的类型和实现方式

分布式JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

基本执行过程:

  1. 一个Clickhouse节点作为Coordinator节点,给每个节点分发子查询,子查询sql(tob_apps_all替换成本地表,users_unique_all保持不变依然是分布式表)

  2. 每个节点执行Coordinator分发的sql时,发现users_unique_all是分布式表,就会去所有节点上去查询以下SQL(一共有N*N。N为shard数量)

    SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')

  3. 每个节点从其他N-1个节点拉取2中子查询的全部数据,全量存储(内存or文件),进行本地JOIN

  4. Coordinator节点从每个节点拉取3中的结果集,然后做处理返回给client

存在的问题:

  1. 子查询数量放大

  2. 每个节点都全量存储全量的数据

分布式Global JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')

基本执行过程:

  1. 一个Clickhouse节点作为Coordinator节点,分发查询。在每个节点上执行sql(tob_apps_all替换成本地表,右表子查询替换成别名ut)

  2. Coordinator节点去其他节点拉取users_unique_all的全部数据,然后分发到全部节点(作为1中别名表ut的数据)

  3. 每个节点都会存储全量的2中分发的数据(内存or文件),进行本地local join

  4. Coordinator节点从每个节点拉取3中的结果集,然后做处理返回给client

存在的问题:

  1. 每个节点都全量存储数据

  2. 如果右表较大,分发的数据较大,会占用网络带宽资源

本地JOIN

SQL里面只有本地表的JOIN,只会在当前节点执行

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
    (SELECT device_id,
         hash_uid
    FROM rangers.users_unique
    WHERE tea_app_id = 268411
            AND last_active_date>='2022-08-06') ut
    ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
        AND event='app_launch'
        AND event_date='2022-08-06' 

Hash join

  • 右表全部数据加载到内存,再在内存构建hash table。key为joinkey

  • 从左表分批读取数据,从右表hash table匹配数据

  • 优点是:速度快 缺点是:右表数据量大的情况下占用内存

Merge join

  • 对右表排序,内部 block 切分,超出内存部分 flush 到磁盘上,内存大小通过参数设定

  • 左表基于 block 排序,按照每个 block 依次与右表 merge

  • 优点是:能有效控制内存 缺点是:大数据情况下速度会慢

优先使用hash join当内存达到一定阈值后再使用merge join,优先满足性能要求

解决方案

火山引擎在行为分析场景下的ClickHouse JOIN优化

避免JOIN

数据预生成

数据预生成(由Spark/Flink或者Clickhouse物化视图产出数据),形成大宽表,基于单表的查询是ClickHouse最为擅长的场景

我们有个指标,实现的SQL比较复杂(如下),每次实时查询很耗时,我们单独建了一个表table,由Spark每日构建出这个指标,查询时直接基于table查询

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{'amount'}) AS value, count(1) AS cnt, value*cnt AS multiple
    FROM tob_apps_all et GLOBAL ANY LEFT JOIN
        (SELECT hash_uid AS join_key,int_profiles{'$ab_time_34'}*1000 AS first_time
        FROM users_unique_all
        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19' AND first_time is NOT null) upt
            ON et.hash_uid=upt.join_key
        WHERE (查询条件)
        GROUP BY  uc1,event_date)
GROUP BY event_date;

数据量2300W,查询时间由7秒->0.008秒。当然这种方式,需要维护额外的数据构建任务。总的思路就是不要让ClickHouse实时去JOIN

火山引擎在行为分析场景下的ClickHouse JOIN优化

使用IN代替JOIN

JOIN需要基于内存构建hash table且需要存储右表全部的数据,然后再去匹配左表的数据。而IN查询会对右表的全部数据构建hash set,但是不需要匹配左表的数据,且不需要回写数据到block

比如

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
    (SELECT hash_uid AS join_key
    FROM users_unique_all
    WHERE app_id = 10000000
            AND last_active_date >= '2022-01-01') upt
    ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
GROUP BY  event_date 

可以改成如下形式:

SELECT event_date,
         count()
FROM tob_apps_all
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
        AND hash_uid global IN 
    (SELECT hash_uid
    FROM users_unique_all
    WHERE (tea_app_id = 10000000)
            AND (last_active_date >= '2022-01-01') )
 GROUP BY event_date

如果需要从右表提取出属性到外层进行计算,则不能使用IN来代替JOIN

相同的条件下,上面的测试SQL,由JOIN时的16秒优化到了IN查询时的11秒

火山引擎在行为分析场景下的ClickHouse JOIN优化

更快的JOIN

优先本地JOIN

数据预先相同规则分区

也就是Colocate JOIN。优先将需要关联的表按照相同的规则进行分布,查询时就不需要分布式的JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

比如事件表tob_apps_all和用户表users_unique_all都是按照用户ID来分shard存储的,相同的用户的两个表的数据都在同一个shard上,因此这两个表的JOIN就不需要分布式JOIN了

distributed_perfect_shard这个settings key是字节内部ClickHouse支持的,设置过这个参数,指定执行计划时就不会再执行分布式JOIN了

基本执行过程:

  1. 一个ClickHouse节点作为Coordinator节点,分发查询。在每个节点上执行sql(tob_apps_all、users_unique_all替换成本地表)

  2. 每个节点都执行1中分发的本地表join的SQL(这一步不再分发右表全量的数据)

  3. 数据再回传到coordinator节点,然后返回给client

数据冗余存储

如果一个表的数据量比较小,可以不分shard存储,每个shard都存储全量的数据,例如我们的业务对象表。查询时,不需要分布式JOIN,直接在本地进行JOIN即可

SELECT count()
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT item_id
    FROM items_all 
    WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

例如这个SQL,items_all表每个shard都存储同样的数据,这样也可以避免分布式JOIN带来的查询放大和全表数据分发问题

更少的数据

不论是分布式JOIN还是本地JOIN,都需要尽量让少的数据参与JOIN,既能提升查询速度也能减少资源消耗

SQL下推

ClickHouse对SQL的下推做的不太好,有些复杂的SQL下推会失效。因此,我们手动对SQL做了下推,目前正在测试基于查询优化器来帮助实现下推优化,以便让SQL更加简洁

下推的SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06'
        AND 用户属性条件1  OR 用户属性条件2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1

对应的不下推的SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM rangers.users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
AND (ut.用户属性条件1  OR ut.用户属性条件2)
settings distributed_perfect_shard=1

可以看到,不下推的SQL更加简洁,直接基于JOIN过后的宽表进行过滤。但是ClickHouse可能会将不满足条件的users_unique_all数据也进行JOIN

我们使用中有一个复杂的case,用户表过滤条件不下推有1千万+,SQL执行了3000秒依然执行超时,而做了下推之后60秒内就执行成功了

火山引擎在行为分析场景下的ClickHouse JOIN优化

Clickhouse引擎层优化

一个SQL实际在Clickhouse如何执行,对SQL的执行时间和资源消耗至关重要。社区版的Clickhouse在执行模型和SQL优化器上还要改进的空间,尤其是复杂SQL以及多JOIN的场景下

执行模型优化

社区版的Clickhouse目前还是一个两阶段执行的执行模型。第一阶段,Coordinator在收到查询后,将请求发送给对应的Worker节点。第二阶段,Worker节点完成计算,Coordinator在收到各Worker节点的数据后进行汇聚和处理,并将处理后的结果返回。

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

有以下几个问题:

  1. 第二阶段的计算比较复杂时,Coordinator的节点计算压力大,容易成为瓶颈

  2. 不支持shuffle join,hash join时右表为大表时构建慢,容易OOM

  3. 对复杂查询的支持不友好

字节跳动ClickHouse团队为了解决上述问题,改进了执行模型,参考其他的分布式数据库引擎(例如Presto等),将一个复杂的Query按数据交换情况切分成多个 Stage,各Stage之间则通过Exchange完成数据交换。根据Stage依赖关系定义拓扑结构,产生DAG图,并根据DAG图调度Stage。例如两表Join,会先调度左右表读取Stage,之后再调度Join这个Stage,Join的Stage依赖于左右表的Stage。

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

举个例子

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id, 
    dt.hash_did AS device_hashid
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_did
    FROM devices_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10

Stage执行模型基本过程(可能的):

  1. 读取tob_apps_all数据,按照join key(hash_uid)进行shuffle,数据分发到每个节点。这是一个Stage

  2. 读取users_unique_all数据,按照join key(hash_uid)进行shuffle,数据分发到每个节点。这是一个Stage

  3. 上述两个表的数据,在每个节点上的数据进行本地join,然后再按照join key(device_id)进行shuffle。这是一个Stage

  4. 读取devices_all数据,按照join key(device_id)进行shuffle,这是一个Stage

  5. 第3步、第4步的数据,相同join key(device_id)的数据都在同一个节点上,然后进行本地JOIN,这是一个Stage

  6. 汇总数据,返回limit 10的数据。这是一个Stage

统计效果如下:

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

查询优化器

有了上面的stage的执行模型,可以灵活调整SQL的执行顺序,字节跳动Clickhouse团队自研了查询优化器,根据优化规则(基于规则和代价预估)对SQL的执行计划进行转换,一个执行计划经过优化规则后会变成另外一个执行计划,能够准确的选择出一条效率最高的执行路径,然后构建Stage的DAG图,大幅度降低查询时间

下图描述了整个查询的执行流程,从 SQL parse 到执行期间所有内容全部进行了重新实现(其中紫色模块),构建了一套完整的且规范的查询优化器。

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

还是上面的三表JOIN的例子,可能的一个执行过程是:

  1. 查询优化器发现users_unique_all表与tob_apps_all表的分shard规则一样(基于用户ID),所以就不会先对表按 join key 进行 shuffle,users_unique与tob_apps直接基于本地表JOIN,然后再按照join key(device_id)进行shuffle。这是一个Stage

  2. 查询优化器根据规则或者代价预估决定设备表devices_all是需要broadcast join还是shuffle join

    如果broadcast join:在一个节点查到全部的device数据,然后分发到其他节点。这是一个Stage

    如果shuffle join:在每个节点对device数据按照join key(device_id)进行shuffle。这是一个Stage

  3. 汇总数据,返回limit 10的数据。这是一个Stage

效果:

可以看到,查询优化器能优化典型的复杂的SQL的执行效率,缩短执行时间

火山引擎在行为分析场景下的ClickHouse JOIN优化

 

总结

ClickHouse最为擅长的领域是一个大宽表来进行查询,多表JOIN时Clickhouse性能表现不佳。作为业内领先的用户分析与运营平台,火山引擎增长分析DataFinder基于海量数据做到了复杂指标能够秒级查询。本文介绍了我们是如何优化Clickhouse JOIN查询的。

主要有以下几个方面:

  1. 减少参与JOIN的表以及数据量

  2. 优先使用本地JOIN,避免分布式JOIN带来的性能损耗

  3. 优化本地JOIN,优先使用内存进行JOIN

  4. 优化分布式JOIN的执行逻辑,依托于字节跳动对ClickHouse的深度定制化

立即跳转火山引擎DataFinder官网了解详情文章来源地址https://www.toymoban.com/news/detail-426089.html

到了这里,关于火山引擎在行为分析场景下的ClickHouse JOIN优化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • AI行为分析预警系统 opencv

    AI行为分析预警系统通过python+opencv网络模型Ai视觉智能分析技术,AI行为分析预警系统可以对实际场景下如车间、电力场景、化工场景、工业生产场景下的人员作业操作行为规范进行有针对性的定制开发,根据每个项目的不同的识别预警需求。OpenCV可以在不同的系统平台上使用

    2024年02月02日
    浏览(33)
  • 矿井人员视频行为分析算法 opencv

    矿井人员视频行为分析算法通过opencv+python网络模型技术,矿井人员视频行为分析算法实时监测人员的作业行为,并与安全标准进行比对,可以及时发现不符合安全要求的行为,预防事故的发生。OpenCV的全称是Open Source Computer Vision Library,是一个跨平台的计算机视觉处理开源软

    2024年02月16日
    浏览(27)
  • AI图像行为分析算法 opencv

    AI图像行为分析算法通过python+opencv深度学习框架对现场操作行为进行全程实时分析,AI图像行为分析算法通过人工智能视觉能够准确判断出现场人员的作业行为是否符合SOP流程规定,并对违规操作行为进行自动抓拍告警。OpenCV是一个基于Apache2.0许可(开源)发行的跨平台计算

    2024年02月10日
    浏览(23)
  • Spark项目实战—电商用户行为分析

    我们看看在实际的工作中如何使用这些 API 实现具体的需求。这些需求是电商网站的真实需求,所以在实现功能前,咱们必须先将数据准备好。 上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要 包含用户的 4 种行为:搜索,点击,下单,

    2024年02月08日
    浏览(30)
  • 数仓用户行为数据分析

    分层优点:复杂的东西可以简单化、解耦(屏蔽层作用)、提高复用、方便管理 SA 贴源  数据组织结构与源系统保持一致 shm 历史层  针对不同特征的数据做不同算法,目的都是为了得到一份完整的数据 PDM 明细层 做最细粒度的数据明细,最高扩展性和灵活性,企业级的数据

    2024年02月08日
    浏览(27)
  • java校园行为分析预警管理系统

    目  录 摘  要    II ABSTRACT    III 第一章 绪论    1 1.1研究背景    1 1.2选题目的    1 1.3本文研究内容    2 第二章  开发技术介绍    3 2.1开发工具介绍    3 2.2 JAVA技术介绍    3 2.3 MYSQL数据库介绍    4 第三章  系统需求分析    6 3.1可行性分析    6 3.1.1技术可

    2023年04月08日
    浏览(38)
  • 火山引擎 ByteHouse:只需 2 个方法,增强 ClickHouse 数据导入能力

    更多技术交流、求职机会,欢迎关注 字节跳动数据平台微信公众号,回复【1】进入官方交流群   作为企业数字化建设的必备要素,易用的数据引擎能帮助企业提升数据使用效率,更好提升数据应用价值,夯实数字化建设基础。 数据导入是衡量 OLAP 引擎性能及易用性的重要标

    2024年02月08日
    浏览(29)
  • 大数据课程综合实验案例:网站用户行为分析

    大数据课程实验案例:网站用户行为分析,由厦门大学数据库实验室团队开发,旨在满足全国高校大数据教学对实验案例的迫切需求。本案例涉及数据预处理、存储、查询和可视化分析等数据处理全流程所涉及的各种典型操作,涵盖Linux、MySQL、Hadoop、HBase、Hive、Sqoop、R、Ec

    2024年02月06日
    浏览(80)
  • GSEQ行为序列分析软件学习汇总

    1、GSEQ软件是做什么的? 2、GSEQ软件如何使用? 2、GSEQ软件前期需要在“记事本”中编写“程式码”需要将所有行为数据编码之后,将编码行为序列粘贴到GSEQ软件中去,如果数据量很大,这个过程就非常繁杂了,是否可以简化这个数据处理流程? GSEQ是一款交互序列分析软件

    2024年01月25日
    浏览(27)
  • 大数据实战 --- 淘宝用户行为数据分析

    目录 开发环境  数据描述 功能需求 数据准备 数据清洗 用户行为分析 找出有价值的用户 Hadoop+Hive+Spark+HBase 启动Hadoop :start-all.sh 启动zookeeper :zkServer.sh start 启动Hive : nohup hiveserver2 1/dev/null 21 beeline -u jdbc:hive2://192.168.152.192:10000 启动Hbase : start-hbase.sh hbase shell 启动Spark :s

    2023年04月22日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包