浅谈 ByteHouse Projection 优化实践

这篇具有很好参考价值的文章主要介绍了浅谈 ByteHouse Projection 优化实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

预聚合是 OLAP 系统中常用的一种优化手段,在通过在加载数据时就进行部分聚合计算,生成聚合后的中间表或视图,从而在查询时直接使用这些预先计算好的聚合结果,提高查询性能,实现这种预聚合方法大多都使用物化视图来实现。

Clickhouse 社区实现的 Projection 功能类似于物化视图,原始的概念来源于 Vertica,在原始表数据加载时,根据聚合 SQL 定义的表达式,计算写入数据的聚合数据与原始数据同步写入存储。在数据查询的过程中,如果查询 SQL 通过匹配分析可以通过聚合数据计算得到,直接查询聚合数据减少计算开销,大幅提升查询性能。

Clickhouse Projection 是针对物化视图现有问题,在查询匹配,数据一致性上扩展了使用场景:

  • 支持 normal projection,按照不同列进行数据重排,对于不同条件快速过滤数据
  • 支持 aggregate projection, 使用聚合查询在源表上直接定义出预聚合模型
  • 查询分析能根据查询代价,自动选择最优 Projection 进行查询优化,无需改写查询
  • projeciton 数据存储于原始 part 目录下,在任一时刻针对任一数据变换操作均提供一致性保证
  • 维护简单,不需另外定义新表,在原始表添加 projection 属性

ByteHouse 是火山引擎基于 ClickHouse 研发的一款分析型数据库产品,是同时支持实时和离线导入的自助数据分析平台,能够对 PB 级海量数据进行高效分析。具备真实时分析、存储-计算分离、多级资源隔离、云上全托管服务四大特点,为了更好的兼容社区的 projection 功能,扩展 projection 使用场景,ByteHouse 对 Projection 进行了匹配场景和架构上进行了优化。在 ByteHouse 商业客户性能测试 projection 的性能测试,在 1.2 亿条的实际生产数据集中进行测试,查询并发能力提升 10~20 倍,下面从 projeciton 在优化器查询改写和基于 ByteHouse 框架改进两个方面谈一谈目前的优化工作。

Projection 使用

为了提高 ByteHouse 对社区有很好的兼容性,ByteHouse 保留了原有语法的支持,projection 操作分为创建,删除,物化,删除数据几个操作。为了便于理解后面的优化使用行为分析系统例子作为分析的对象。

语法

-- 新增projection定义
ALTER TABLE [db].table ADD PROJECTION name ( SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY] )

-- 删除projection定义并且删除projection数据 
ALTER TABLE [db].table DROP PROJECTION name 

-- 物化原表的某个partition数据
ALTER TABLE [db.]table MATERIALIZE PROJECTION name IN PARTITION partition_name

-- 删除projection数据但不删除projection定义
ALTER TABLE [db.]table CLEAR PROJECTION name IN PARTITION partition_name 

实例

CREATE DATABASE IF NOT EXISTS tea_data;

创建原始数据表
CREATE TABLE tea_data.events(
  app_id UInt32,
  user_id UInt64,
  event_type UInt64,
  cost UInt64,
  action_duration UInt64,
  display_time UInt64,
  event_date Date
) ENGINE = CnchMergeTree PARTITION BY toDate(event_date)
ORDER BY
  (app_id, user_id, event_type);

创建projection前写入2023-05-28分区测试数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-28 05:11:55'
FROM system.numbers LIMIT 100000;

创建聚合projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_1
(
SELECT
    app_id,
    user_id,
    event_date,
    sum(action_duration)
    GROUP BY app_id,
    user_id, event_date
);

创建projection后写入2023-05-29分区测试数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-29 05:11:55'
FROM system.numbers LIMIT 100000;

Note:CnchMergeTree是Bytehouse特有的引擎

Query Optimizer 扩展 Projection 改写

Bytehouse 优化器

ByteHouse 优化器为业界目前唯一的 ClickHouse 优化器方案。ByteHouse 优化器的能力简单总结如下:

  • RBO:支持:列裁剪、分区裁剪、表达式简化、子查询解关联、谓词下推、冗余算子消除、Outer-JOIN 转 INNER-JOIN、算子下推存储、分布式算子拆分等常见的启发式优化能力。
  • CBO:基于 Cascade 搜索框架,实现了高效的 Join 枚举算法,以及基于 Histogram 的代价估算,对 10 表全连接级别规模的 Join Reorder 问题,能够全量枚举并寻求最优解,同时针对大于 10 表规模的 Join Reorder 支持启发式枚举并寻求最优解。CBO 支持基于规则扩展搜索空间,除了常见的 Join Reorder 问题以外,还支持 Outer-Join/Join Reorder,Magic Set Placement 等相关优化能力。
  • 分布式计划优化:面向分布式 MPP 数据库,生成分布式查询计划,并且和 CBO 结合在一起。相对业界主流实现:分为两个阶段,首先寻求最优的单机版计划,然后将其分布式化。我们的方案则是将这两个阶段融合在一起,在整个 CBO 寻求最优解的过程中,会结合分布式计划的诉求,从代价的角度选择最优的分布式计划。对于 Join/Aggregate 的还支持 Partition 属性展开。
  • 高阶优化能力:实现了 Dynamic Filter pushdown、单表物化视图改写、基于代价的 CTE (公共表达式共享)。

借助 bytehouse 优化器强大的能力,针对 projection 原有实现的几点局限性做了优化,下面我们先来看一下社区在 projection 改写的具体实现。

社区 Projection

改写实现在非优化器执行模式下,对原始表的聚合查询可通过 aggregate projection 加速,即读取 projection 中的预聚合数据而不是原始数据。计算支持了 normal partition 和 projection partition 的混合查询,如果一个 partition 的 projection 还没物化,可以使用原始数据进行计算。
具体改写执行逻辑:

  • 计划阶段
  1. 将原查询计划和已有 projection 进行匹配筛选能满足查询要求的 projection candidates;
  2. 基于最小的 mark 读取数选择最优的 projection candidate;
  3. 对原查询计划中的 ActionDAG 进行改写和折叠,之后用于 projection part 数据的后续计算;
  4. 将当前数据处理阶段提升到 WithMergeableState;
  • 执行阶段
  1. MergeTreeDataSelectExecutor 会将 aggregate 之前的计算进行拆分:对于 normal part,使用原查询计划进行计算;对于 projection part,使用改写后 ActionDAG 构造 QueryPipeline;
  2. 将两份数据合并,用于 aggregate 之后的计算。

Bytehouse 优化器改写实现

优化器会将查询切分为不同的 plan segment 分发到 worker 节点并行执行,segment 之间通过 exchange 交换数据,在 plan segment 内部根据 query plan 构建 pipeline 执行,以下面简单聚合查询为例,说明优化器如何匹配 projection。

Q1: 
SELECT   
app_id,    
user_id,    
sum(action_duration)
FROM tea_data.eventsWHERE event_date = '2023-05-29'
GROUP BY    
app_id,    
user_id

在执行计划阶段优化器尽量的将 TableScan 上层的 Partial Aggregation Step,Projection 和 Filter 下推到 TableScan 中,在将 plan segment 发送到 worker 节点后,在根据查询代价选择合适 projection 进行匹配改写,从下面的执行计划上看,命中 projection 会在 table scan 中直接读取 AggregateFunction(sum, UInt64)的 state 数据,相比于没有命中 projection 的执行计划减少了 AggregaingNode 的聚合运算。

  • Q1 查询计划(optimizer_projection_support=0)
  • Q1 查询计划(optimizer_projection_support=1)
混合读取 Projection

Projection 在创建之后不支持更新 schema,只能创建新的 projection,但是在一些对于 projection schema 变更需求频繁业务场景下,需要同一个查询既能够读取旧 projection 也能读取新 projection,所以在匹配时需要从 partition 维度进行匹配而不是从 projection 定义的维度进行匹配,混合读取不同 projection 的数据,这样会使查询更加灵活,更好的适应业务场景,下面举个具体的实例:

创建新的projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_2
(
SELECT
    app_id,
    sum(action_duration),
    sum(cost)
    GROUP BY app_id
);

写入2023-05-30的数据
INSERT INTO tea_data.events
SELECT
    number / 10,
    number % 100,
    number % 23,
    number % 3434,
    number % 23,
    number % 55,
    '2023-05-30 04:12:43'
FROM system.numbers LIMIT 100000;

执行查询
Q2:
SELECT
    app_id,
    sum(action_duration)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id
  • Q2 执行计划
  • 按照 partition 来匹配 projection
    查询过滤条件 WHERE event_date >= '2023-05-28' 会读取是三个分区的数据, 并且 agg_sum_proj_1, agg_sum_proj_2 都满足 Q2 的查询条件,所以 table scan 会读取 2023-05-28 的原始数据,2023-05-29 会读取 agg_sum_proj_1 的数据,2023-05-30 由于 agg_sum_proj_2 相对于 agg_sum_proj_1 的数据聚合度更高,读取代价较小,选择读取 agg_sum_proj_2 的数据,混合读取不同 projection 的数据。
原始表 Schema 更新

当对原始表添加新字段(维度或指标 ),对应 projection 不包含这些字段,这时候为了利用 projection 一般情况下需要删除 projection 重新做物化,比较浪费资源,如果优化器匹配算法能正确处理不存在缺省字段,并使用缺省值参与计算就可以解决这个问题。


ALTER TABLE tea_data.events ADD COLUMN device_id String after event_type;
ALTER TABLE tea_data.events ADD COLUMN stay_time UInt64 after device_id;

执行查询
Q3:
SELECT
    app_id,
    device_id,
    sum(action_duration),
    max(stay_time)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id,device_id
  • Q3 执行计划
  • 默认值参与计算
    从查询计划可以看出,即使 agg_sum_proj_1 和 agg_sum_proj_2 并不包含新增的维度字段 device_id,指标字段 stay_time, 仍然可以命中原始的 partiton 的 projection,并且使用默认值来参与计算,这样可以利用旧的 projection 进行查询加速。

Bytehouse Projection 实现

Projection 是按照 Bytehouse 的存算分离架构进行设计的,Projecton 数据由分布式存储统一进行管理,而针对 projection 的查询和计算则在无状态的计算节点上进行。相比于社区版,Bytehouse Projection 实现了以下优势:

  • 对于 Projection 数据的存储节点和计算节点可以独立扩展,即可以根据不同业务对于 Projection 的使用需求,增加存储或者计算节点。
  • 当进行 Projection 查询时,可以根据不同 Projection 的数据查询量来分配计算节点的资源,从而实现资源的隔离和优化,提高查询效率。
  • Projection 的元数据存储十分轻量,在业务数据急剧变化的时候,计算节点可以做到业务无感知扩缩容,无需额外的 Projection 数据迁移。

Projection 数据存储

在 Bytehouse 中,多个 projections 数据与 data 数据存储在一个共享存储文件中。文件的外部数据对 projections 内部的内容没有感知,相当于一个黑盒。当需要读取某个 projection 时,通过 checksums 里面存储的 projection 指针,定位到特定 projection 位置,完成 projection 数据解析与加载。

Write 操作

Projection 写入分为两部分,先在本地做数据写入,产生 part 文件存储在 worker 节点本地,然后通过 dumpAndCommitCnchParts 将数据 dump 到远程共享存储。

  • 写入本地  
    通过 writeTempPart()将 block 写入本地,当写完原始 part 后,循环通过方法 addProjectionPart()将每一个 projection 写入 part 文件夹,并添加到 new_part 中进行管理。
  • dump 到远程存储  
    dumpCnchParts()的时候,按照上述的存储格式,写入完原始 part 中的 bin 和 mark 数据后,循环将每一个 projection 文件夹中的数据写入到共享存储文件中,并记录位置和大小到 checksums,如下:
    写入 header
    写入 data
    写入 projections
    写入 Primary index
    写入 Checksums
    写入 Metainfo
    写入 Unique Key Index
    写入 data footger

Merge 操作

随着时间的推移,针对同一个 partition 会存在越来越多的 parts,而 parts 越多查询过滤时的代价就会越大。因此,Bytehouse 在后台进程中会 merge 同一个 partition 的 parts 组成更大的 part,从而减少 part 的数量提高查询的效率。

  • 对于每一个要 merge 的 part
    对于 part 中的每一列,缓存对应的 segments 到本地
    创建 MergeTreeReaderStreamWithSegmentCache,通过远程文件 buffer 或者本地 segments 的 buffer 初始化
  • 通过 MergingSortedTransform 或 AggregatingSortedTransform 等将 sources 融合成 PipelineExecutingBlockInputStream
  • 创建 MergedBlockOutputStream
  • 对于 projection,进行如下操作
  • 建立每一个 projection 的读取流,本地缓存 buffer 或者远程文件 buffer
  • 原始表 merge 过程,对 parts 中的 projections 进行 merge
  • 通过 dumper 将新的完整 part 存储到远端

Mutate 操作

Bytehouse 采用 MVCC 的方式,针对 mutate 涉及的列,新增一个 delta part 版本存储此次 mutate 涉及到的列。相应地,我们在 mutate 的时候,构造 projection 的 mutate 操作的 inputstream,将 mutate 后的 projection 和原始表数据一起写到同一个 delta part 中。

  • 在 MutationsInterpreter 里面,通过 InterpreterSelectQuery(mutation_ast)获取 BlockInputStream
  • projection 通过 block 和 InterpreterSelectQuery(projection.ast)重新构建

Materialize 物化操作

如下图所示,根据 Bytehouse 的 part 管理方式,针对 mutate 操作或新增物化操作,我们为 part 生成新的 delta part,在下图 part 中,它所管理的三个 projections 由 base part 中的 proj2,delta part#1 中的 proj1',以及 delta part#2 中的 proj3 共同构成。当 parts 加载完成后,delta part#2 会存储 base part 中的 proj2 的指针和 delta part#1 中的 proj1'指针,以及自身的 proj3 指针,对上层提供统一的访问服务。

Worker 端磁盘缓存

目前,CNCH 中针对不同数据设计了不同的缓存类型

  • DiskCacheSegment:管理 bin 和 mark 数据
  • ChecksumsDiskCacheSegment:管理 checksums 数据
  • PrimaryIndexDiskCacheSegment:管理主键索引数据BitMapIndexDiskCacheSegment:管理 bitmap 索引数据
    针对 Projection 中的数据,分别通过上述的 DiskCache,ChecksumsDiskCache 和 PrimaryIndexDiskCache 对 bin,mark,checksums 以及索引进行缓存。

另外,为了加快 Projection 数据的加载过程,我们新增了 MetaInfoDiskCacheSegment 用于缓存 Projection 相关的元数据信息。

实际案例分析

某真实用户场景的数据集,我们利用它对 Projection 性能进行了测试。
该数据集约 1.2 亿条,包含 projection 约 240G 大小,测试机器 80CPU(s) / 376G Mem,配置如下:

  • SET allow_experimental_projection_optimization = 1
  • use_uncompressed_cache = true
  • max_threads = 1
  • log_level = error
  • 开启 Projection 查询并发度 80,关闭 Projection 查询并发度为 30

测试结果

开启 Projection 后,针对 1.2 亿条的数据集,查询性能提升 10~20 倍。

表结构

                                                 
CREATE TABLE user.trades(                                     
 `type` UInt8,
 `status` UInt64,
 `block_hash` String, 
 `sequence_number` UInt64, 
 `block_timestamp` DateTime, 
 `transaction_hash` String, 
 `transaction_index` UInt32, 
 `from_address` String, 
 `to_address` String,
 `value` String,
 `input` String,
 `nonce` UInt64, 
 `contract_address` String,
 `gas` UInt64,
 `gas_price` UInt64,
 `gas_used` UInt64, 
 `effective_gas_price` UInt64, 
 `cumulative_gas_used` UInt64, 
 `max_fee_per_gas` UInt64, 
 `max_priority_fee_per_gas` UInt64, 
 `r` String,
 `s` String,
 `v` UInt64,
 `logs_count` UInt32,
 PROJECTION tx_from_address_hit
  (                                         
    SELECT *                                                
    ORDER BY from_address
  ),                                            
 PROJECTION tx_to_address_hit (                                           
    SELECT *                                                 
    ORDER BY to_address 
 ),                                                  
 PROJECTION tx_sequence_number_hit (                                      
    SELECT *                                              
    ORDER BY sequence_number 
 ),                            
 PROJECTION tx_transaction_hash_hit (                                         
    SELECT *                                                  
    ORDER BY transaction_hash 
 )                         
)
ENGINE=CnchMergeTree()
PRIMARY KEY (transaction_hash, from_address, to_address) 
ORDER BY (transaction_hash, from_address, to_address) 
PARTITION BY toDate(toStartOfMonth(`block_timestamp`));                            

开启Projection

Q1

WITH tx AS ( SELECT * FROM user.trades WHERE from_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC UNION ALL SELECT * FROM user.trades WHERE to_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC, transaction_index DESC ) SELECT * FROM tx LIMIT 100;


Q2

with tx as (select sequence_number, transaction_index, transaction_hash, input from user.trades where from_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc UNION ALL select sequence_number, transaction_index, transaction_hash, input from user.trades where to_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc, transaction_index desc) select sequence_number, transaction_hash, substring(input,1,8) as func_sign from tx order by sequence_number desc, transaction_index desc limit 100 settings max_threads = 1, allow_experimental_projection_optimization = 1, use_uncompressed_cache = true;

关闭 Projection

Q1

Q2
文章来源地址https://www.toymoban.com/news/detail-473094.html

到了这里,关于浅谈 ByteHouse Projection 优化实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ系统监控、问题排查和性能优化实践

    一、系统监控:RabbitMQ的各项性能指标及监控 Message Rates:消息率包含了publish,deliver/get,ack等方面的数据,反映了消息在系统中流转的情况。 Queue Length:队列长度反映了系统当前的负载情况。如果队列中的消息过多,可能需要增加消费者来处理消息,或者检查消费者是否出

    2024年04月11日
    浏览(56)
  • JVM系统优化实践(23):GC生产环境案例(6)

    您好, 这里是 「 码农镖局 」 CSDN博客,欢迎您来,欢迎您再来~ 在互联网大厂中,对每天亿级流量的日志进行清洗、整理是非常常见的工作。在某个系统中,需要对用户的访问日志做脱敏处理,也就是清洗掉姓名、身份证号、手机号等个人隐私信息后在保存到数据库中或者

    2024年02月15日
    浏览(38)
  • JVM系统优化实践(22):GC生产环境案例(五)

    您好, 这里是 「 码农镖局 」 CSDN博客,欢迎您来,欢迎您再来~ 除了Tomcat、Jetty,另一个常见的可能出现OOM的地方就是微服务架构下的一次RPC调用过程中。笔者曾经经历过的一次OOM就是基于Thrift框架封装出来的一个RPC框架导致的宕机。   也就是当服务A更新后,服务B宕机了

    2024年02月15日
    浏览(49)
  • JVM系统优化实践(20):GC生产环境案例(三)

    您好, 这里是 「 码农镖局 」 CSDN博客,欢迎您来,欢迎您再来~ 某新手开发工程师接到了一个保存Elasticsearch日志的任务,以供后续分析之用。但写代码的时候,误将保存日志的代码段弄成了无限循环,程序启动后,没用多久就崩溃了。 另一名工程师在动态创建类时,没有

    2024年02月16日
    浏览(51)
  • Elasticsearch聚合优化 | 聚合速度提升5倍!

    大多数时候对单个字段的聚合查询还是非常快的, 但是当需要同时聚合多个字段时,就可能会产生大量的分组,最终结果就是占用 Elasticsearch大量内存,从而导致 OOM 的情况发生。 实践应用发现,以下情况都会比较慢: 1)待聚合文档数比较多(千万、亿、十亿甚至更多);

    2024年02月01日
    浏览(50)
  • elasticsearch聚合查询实践

    概念 聚合分类 聚合语法 聚合作用范围及排序 聚合原理及 terms 精准度 聚合实验 桶聚合 指标聚合 Pipeline 聚合 实践一:多商户数据权限聚合分页 实践二:多维度嵌套聚合 实践三:删除 ES 索引重复数据 附:实验环境 用于聚合的字段必须是 exact value ,即 doc_value=true 。分词字

    2024年02月03日
    浏览(47)
  • es 聚合性能优化

    适用场景 :高基数聚合 。高基数聚合场景中的高基数含义:一个字段包含很大比例的唯一值。 本质上就是通过预先加载全局字典到内存中来减少磁盘I/O操作,从而提高查询速度。以空间换时间。 global ordinals 中文翻译成全局序号,是一种数据结构,应用场景如下: 基于 ke

    2024年04月25日
    浏览(30)
  • 敏捷实践 | 浅谈测试金字塔

    之前做测试培训的时候经常会被问到几个问题——我们项目没有自动化测试,老板想让我做,我搞了几个星期 selenium 怎么不行呢?我应该先做 API 测试还是 UI 测试,他们之前关系如何?很多初学者甚至认为自动化测试=UI 自动化=selenium。在学习具体自动化测试技术之前,我们

    2024年02月06日
    浏览(42)
  • 浅谈SQL优化小技巧

    (1)客户端发送一条查询语句到服务器; (2)服务器先查询缓存,如果命中缓存,则立即返回存储在缓存中的数据; (3)未命中缓存后,MySQL通过将SQL语句进行解析,并生成一颗对应的解析树,MySQL解析器将使用MySQL语法进行验证和解析。 ​ 例如,验证是否使用了错

    2024年02月05日
    浏览(43)
  • RDMA性能优化经验浅谈

    首先我们介绍一下RDMA的一些核心概念,当然了,我并不打算写他的API以及调用方式,我们更多关注这些基础概念背后的硬件执行方式和原理,对于这些原理的理解是能够写出高性能RDMA程序的关键。 Memory Region RDMA的网卡(下文以RNIC指代)通过DMA来读写系统内存,由于DMA只能根

    2024年02月10日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包